EpochCoordinator RPC Endpoint¶
EpochCoordinator is a ThreadSafeRpcEndpoint that tracks offsets and epochs (coordinates epochs) by handling <
EpochCoordinator is <ContinuousExecution is requested to <
[[messages]] [[EpochCoordinatorMessage]] .EpochCoordinator RPC Endpoint's Messages [cols="30m,70",options="header",width="100%"] |=== | Message | Description
a| CommitPartitionEpoch
- [[CommitPartitionEpoch-partitionId]] Partition ID
- [[CommitPartitionEpoch-epoch]] Epoch
- [[CommitPartitionEpoch-message]] DataSource API V2's
WriterCommitMessage
| [[CommitPartitionEpoch]] Sent out (in one-way asynchronous mode) exclusively when ContinuousWriteRDD is requested to <
| GetCurrentEpoch | [[GetCurrentEpoch]] Sent out (in request-response synchronous mode) exclusively when EpochMarkerGenerator thread is requested to <
| IncrementAndGetEpoch | [[IncrementAndGetEpoch]] Sent out (in request-response synchronous mode) exclusively when ContinuousExecution is requested to <
a| ReportPartitionOffset
- [[ReportPartitionOffset-partitionId]] Partition ID
- [[ReportPartitionOffset-epoch]] Epoch
- [[ReportPartitionOffset-offset]] PartitionOffset
| [[ReportPartitionOffset]] Sent out (in one-way asynchronous mode) exclusively when ContinuousQueuedDataReader is requested for the <
a| SetReaderPartitions
- [[SetReaderPartitions-numPartitions]] Number of partitions
| [[SetReaderPartitions]] Sent out (in request-response synchronous mode) exclusively when DataSourceV2ScanExec leaf physical operator is requested for the input RDDs (for a ContinuousReader and is about to create a <
The <InputPartitions from the ContinuousReader.
a| SetWriterPartitions
- [[SetWriterPartitions-numPartitions]] Number of partitions
| [[SetWriterPartitions]] Sent out (in request-response synchronous mode) exclusively when WriteToContinuousDataSourceExec leaf physical operator is requested to <
a| StopContinuousExecutionWrites | [[StopContinuousExecutionWrites]] Sent out (in request-response synchronous mode) exclusively when ContinuousExecution is requested to <
|===
[[logging]] [TIP] ==== Enable ALL logging level for org.apache.spark.sql.execution.streaming.continuous.EpochCoordinatorRef* logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.sql.execution.streaming.continuous.EpochCoordinatorRef*=ALL
Refer to <>.¶
=== [[receive]] Receiving Messages (Fire-And-Forget One-Way Mode) -- receive Method
[source, scala]¶
receive: PartialFunction[Any, Unit]¶
NOTE: receive is part of the RpcEndpoint Contract in Apache Spark to receive messages in fire-and-forget one-way mode.
receive handles the following messages:
- <
> - <
>
With the <receive simply swallows messages and does nothing.
=== [[receiveAndReply]] Receiving Messages (Request-Response Two-Way Mode) -- receiveAndReply Method
[source, scala]¶
receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit]¶
NOTE: receiveAndReply is part of the RpcEndpoint Contract in Apache Spark to receive and reply to messages in request-response two-way mode.
receiveAndReply handles the following messages:
- <
> - <
> - <
> - <
> - <
>
==== [[resolveCommitsAtEpoch]] resolveCommitsAtEpoch Internal Method
[source, scala]¶
resolveCommitsAtEpoch(epoch: Long): Unit¶
resolveCommitsAtEpoch...FIXME
NOTE: resolveCommitsAtEpoch is used exclusively when EpochCoordinator is requested to handle <
==== [[commitEpoch]] commitEpoch Internal Method
[source, scala]¶
commitEpoch( epoch: Long, messages: Iterable[WriterCommitMessage]): Unit
commitEpoch...FIXME
NOTE: commitEpoch is used exclusively when EpochCoordinator is requested to <
Creating Instance¶
EpochCoordinator takes the following to be created:
- [[reader]] ContinuousReader
- [[query]] ContinuousExecution
- [[startEpoch]] Start epoch
- [[session]]
SparkSession - [[rpcEnv]]
RpcEnv
=== [[create]] Registering EpochCoordinator RPC Endpoint -- create Factory Method
[source, scala]¶
create( writer: StreamWriter, reader: ContinuousReader, query: ContinuousExecution, epochCoordinatorId: String, startEpoch: Long, session: SparkSession, env: SparkEnv): RpcEndpointRef
create simply <RpcEnv to register a RPC endpoint as EpochCoordinator-[id] (where id is the given epochCoordinatorId).
create prints out the following INFO message to the logs:
Registered EpochCoordinator endpoint
NOTE: create is used exclusively when ContinuousExecution is requested to <
=== [[internal-properties]] Internal Properties
[cols="30m,70",options="header",width="100%"] |=== | Name | Description
| queryWritesStopped | [[queryWritesStopped]] Flag that indicates whether to drop messages (true) or not (false) when requested to <
Default: false
Turned on (true) when requested to <