EpochCoordinator RPC Endpoint¶
EpochCoordinator
is a ThreadSafeRpcEndpoint
that tracks offsets and epochs (coordinates epochs) by handling <
EpochCoordinator
is <ContinuousExecution
is requested to <
[[messages]] [[EpochCoordinatorMessage]] .EpochCoordinator RPC Endpoint's Messages [cols="30m,70",options="header",width="100%"] |=== | Message | Description
a| CommitPartitionEpoch
- [[CommitPartitionEpoch-partitionId]] Partition ID
- [[CommitPartitionEpoch-epoch]] Epoch
- [[CommitPartitionEpoch-message]] DataSource API V2's
WriterCommitMessage
| [[CommitPartitionEpoch]] Sent out (in one-way asynchronous mode) exclusively when ContinuousWriteRDD
is requested to <
| GetCurrentEpoch | [[GetCurrentEpoch]] Sent out (in request-response synchronous mode) exclusively when EpochMarkerGenerator
thread is requested to <
| IncrementAndGetEpoch | [[IncrementAndGetEpoch]] Sent out (in request-response synchronous mode) exclusively when ContinuousExecution
is requested to <
a| ReportPartitionOffset
- [[ReportPartitionOffset-partitionId]] Partition ID
- [[ReportPartitionOffset-epoch]] Epoch
- [[ReportPartitionOffset-offset]] PartitionOffset
| [[ReportPartitionOffset]] Sent out (in one-way asynchronous mode) exclusively when ContinuousQueuedDataReader
is requested for the <
a| SetReaderPartitions
- [[SetReaderPartitions-numPartitions]] Number of partitions
| [[SetReaderPartitions]] Sent out (in request-response synchronous mode) exclusively when DataSourceV2ScanExec
leaf physical operator is requested for the input RDDs (for a ContinuousReader and is about to create a <
The <InputPartitions
from the ContinuousReader
.
a| SetWriterPartitions
- [[SetWriterPartitions-numPartitions]] Number of partitions
| [[SetWriterPartitions]] Sent out (in request-response synchronous mode) exclusively when WriteToContinuousDataSourceExec
leaf physical operator is requested to <
a| StopContinuousExecutionWrites
| [[StopContinuousExecutionWrites]] Sent out (in request-response synchronous mode) exclusively when ContinuousExecution
is requested to <
|===
[[logging]] [TIP] ==== Enable ALL
logging level for org.apache.spark.sql.execution.streaming.continuous.EpochCoordinatorRef*
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.execution.streaming.continuous.EpochCoordinatorRef*=ALL
Refer to <>.¶
=== [[receive]] Receiving Messages (Fire-And-Forget One-Way Mode) -- receive
Method
[source, scala]¶
receive: PartialFunction[Any, Unit]¶
NOTE: receive
is part of the RpcEndpoint
Contract in Apache Spark to receive messages in fire-and-forget one-way mode.
receive
handles the following messages:
- <
> - <
>
With the <receive
simply swallows messages and does nothing.
=== [[receiveAndReply]] Receiving Messages (Request-Response Two-Way Mode) -- receiveAndReply
Method
[source, scala]¶
receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit]¶
NOTE: receiveAndReply
is part of the RpcEndpoint
Contract in Apache Spark to receive and reply to messages in request-response two-way mode.
receiveAndReply
handles the following messages:
- <
> - <
> - <
> - <
> - <
>
==== [[resolveCommitsAtEpoch]] resolveCommitsAtEpoch
Internal Method
[source, scala]¶
resolveCommitsAtEpoch(epoch: Long): Unit¶
resolveCommitsAtEpoch
...FIXME
NOTE: resolveCommitsAtEpoch
is used exclusively when EpochCoordinator
is requested to handle <
==== [[commitEpoch]] commitEpoch
Internal Method
[source, scala]¶
commitEpoch( epoch: Long, messages: Iterable[WriterCommitMessage]): Unit
commitEpoch
...FIXME
NOTE: commitEpoch
is used exclusively when EpochCoordinator
is requested to <
Creating Instance¶
EpochCoordinator
takes the following to be created:
- [[reader]] ContinuousReader
- [[query]] ContinuousExecution
- [[startEpoch]] Start epoch
- [[session]]
SparkSession
- [[rpcEnv]]
RpcEnv
=== [[create]] Registering EpochCoordinator RPC Endpoint -- create
Factory Method
[source, scala]¶
create( writer: StreamWriter, reader: ContinuousReader, query: ContinuousExecution, epochCoordinatorId: String, startEpoch: Long, session: SparkSession, env: SparkEnv): RpcEndpointRef
create
simply <RpcEnv
to register a RPC endpoint as EpochCoordinator-[id] (where id
is the given epochCoordinatorId
).
create
prints out the following INFO message to the logs:
Registered EpochCoordinator endpoint
NOTE: create
is used exclusively when ContinuousExecution
is requested to <
=== [[internal-properties]] Internal Properties
[cols="30m,70",options="header",width="100%"] |=== | Name | Description
| queryWritesStopped | [[queryWritesStopped]] Flag that indicates whether to drop messages (true
) or not (false
) when requested to <
Default: false
Turned on (true
) when requested to <