StateStoreCoordinator RPC Endpoint¶
StateStoreCoordinator
keeps track of StateStores on Spark executors (per host and executor ID).
StateStoreCoordinator
is used by StateStoreRDD
when requested to get the location preferences of partitions (based on the location of the stores).
StateStoreCoordinator
is a ThreadSafeRpcEndpoint
RPC endpoint that manipulates <
[[messages]] .StateStoreCoordinator RPC Endpoint's Messages and Message Handlers [cols="30m,70",options="header",width="100%"] |=== | Message | Message Handler
| DeactivateInstances a| [[DeactivateInstances]] Removes <runId
)
Internally, StateStoreCoordinator
finds the StateStoreProviderIds
of the streaming query per queryRunId
and the given runId
and removes them from the <
StateStoreCoordinator
prints out the following DEBUG message to the logs:
Deactivating instances related to checkpoint location [runId]: [storeIdsToRemove]
| GetLocation a| [[GetLocation]] Gives the location of <
You should see the following DEBUG message in the logs:
Got location of the state store [id]: [executorId]
| ReportActiveInstance a| [[ReportActiveInstance]] One-way asynchronous (fire-and-forget) message to register a new <host
and executorId
).
Sent out exclusively when StateStoreCoordinatorRef RPC endpoint reference is requested to reportActiveInstance (when StateStore
utility is requested to look up the StateStore by provider ID when the StateStore
and a corresponding StateStoreProvider
were just created and initialized).
Internally, StateStoreCoordinator
prints out the following DEBUG message to the logs:
Reported state store [id] is active at [executorId]
In the end, StateStoreCoordinator
adds the StateStoreProviderId
to the <
| StopCoordinator a| [[StopCoordinator]] Stops StateStoreCoordinator
RPC Endpoint
You should see the following DEBUG message in the logs:
StateStoreCoordinator stopped
| VerifyIfInstanceActive a| [[VerifyIfInstanceActive]] Verifies if a given <executorId
You should see the following DEBUG message in the logs:
Verified that state store [id] is active: [response]
[[logging]] [TIP] ==== Enable ALL
logging level for org.apache.spark.sql.execution.streaming.state.StateStoreCoordinator
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.execution.streaming.state.StateStoreCoordinator=ALL
Refer to <>.¶
=== [[instances]] instances
Internal Registry
[source,scala]¶
instances: HashMap[StateStoreProviderId, ExecutorCacheTaskLocation]¶
instances
is an internal registry of <ExecutorCacheTaskLocations
(with a host
and a executorId
).
-
A new
StateStoreProviderId
added whenStateStoreCoordinator
is requested to <> -
All
StateStoreProviderIds
of a streaming query are removed whenStateStoreCoordinator
is requested to <>