StateStoreCoordinator RPC Endpoint¶
StateStoreCoordinator keeps track of StateStores on Spark executors (per host and executor ID).
StateStoreCoordinator is used by StateStoreRDD when requested to get the location preferences of partitions (based on the location of the stores).
StateStoreCoordinator is a ThreadSafeRpcEndpoint RPC endpoint that manipulates <
[[messages]] .StateStoreCoordinator RPC Endpoint's Messages and Message Handlers [cols="30m,70",options="header",width="100%"] |=== | Message | Message Handler
| DeactivateInstances a| [[DeactivateInstances]] Removes <runId)
Internally, StateStoreCoordinator finds the StateStoreProviderIds of the streaming query per queryRunId and the given runId and removes them from the <
StateStoreCoordinator prints out the following DEBUG message to the logs:
Deactivating instances related to checkpoint location [runId]: [storeIdsToRemove]
| GetLocation a| [[GetLocation]] Gives the location of <
You should see the following DEBUG message in the logs:
Got location of the state store [id]: [executorId]
| ReportActiveInstance a| [[ReportActiveInstance]] One-way asynchronous (fire-and-forget) message to register a new <host and executorId).
Sent out exclusively when StateStoreCoordinatorRef RPC endpoint reference is requested to reportActiveInstance (when StateStore utility is requested to look up the StateStore by provider ID when the StateStore and a corresponding StateStoreProvider were just created and initialized).
Internally, StateStoreCoordinator prints out the following DEBUG message to the logs:
Reported state store [id] is active at [executorId]
In the end, StateStoreCoordinator adds the StateStoreProviderId to the <
| StopCoordinator a| [[StopCoordinator]] Stops StateStoreCoordinator RPC Endpoint
You should see the following DEBUG message in the logs:
StateStoreCoordinator stopped
| VerifyIfInstanceActive a| [[VerifyIfInstanceActive]] Verifies if a given <executorId
You should see the following DEBUG message in the logs:
Verified that state store [id] is active: [response]
[[logging]] [TIP] ==== Enable ALL logging level for org.apache.spark.sql.execution.streaming.state.StateStoreCoordinator logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.sql.execution.streaming.state.StateStoreCoordinator=ALL
Refer to <>.¶
=== [[instances]] instances Internal Registry
[source,scala]¶
instances: HashMap[StateStoreProviderId, ExecutorCacheTaskLocation]¶
instances is an internal registry of <ExecutorCacheTaskLocations (with a host and a executorId).
-
A new
StateStoreProviderIdadded whenStateStoreCoordinatoris requested to <> -
All
StateStoreProviderIdsof a streaming query are removed whenStateStoreCoordinatoris requested to <>