SQLExecution¶
spark.sql.execution.id¶
SQLExecution defines spark.sql.execution.id that is used as the key of the Spark local property in the following:
spark.sql.execution.id is used to track multiple Spark jobs that should all together be considered part of a single execution of a structured query.
import org.apache.spark.sql.execution.SQLExecution
scala> println(SQLExecution.EXECUTION_ID_KEY)
spark.sql.execution.id
spark.sql.execution.id allows "stitching" different Spark jobs (esp. executed on separate threads) as part of one structured query (that you can then see in web UI's SQL tab).
Tip
Use SparkListener (Spark Core) to listen to SparkListenerSQLExecutionStart events and know the execution IDs of structured queries that have been executed in a Spark SQL application.
// "SQLAppStatusListener" idea is borrowed from
// Spark SQL's org.apache.spark.sql.execution.ui.SQLAppStatusListener
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
import org.apache.spark.sql.execution.ui.{SparkListenerDriverAccumUpdates, SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart}
public class SQLAppStatusListener extends SparkListener {
override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
case e: SparkListenerSQLExecutionStart => onExecutionStart(e)
case e: SparkListenerSQLExecutionEnd => onExecutionEnd(e)
case e: SparkListenerDriverAccumUpdates => onDriverAccumUpdates(e)
case _ => // Ignore
}
def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
// Find the QueryExecution for the Dataset action that triggered the event
// This is the SQL-specific way
import org.apache.spark.sql.execution.SQLExecution
queryExecution = SQLExecution.getQueryExecution(event.executionId)
}
def onJobStart(jobStart: SparkListenerJobStart): Unit = {
// Find the QueryExecution for the Dataset action that triggered the event
// This is a general Spark Core way using local properties
import org.apache.spark.sql.execution.SQLExecution
val executionIdStr = jobStart.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
// Note that the Spark job may or may not be a part of a structured query
if (executionIdStr != null) {
queryExecution = SQLExecution.getQueryExecution(executionIdStr.toLong)
}
}
def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {}
def onDriverAccumUpdates(event: SparkListenerDriverAccumUpdates): Unit = {}
}
val sqlListener = new SQLAppStatusListener()
spark.sparkContext.addSparkListener(sqlListener)
Spark jobs without spark.sql.execution.id local property can then be considered to belong to a SQL query execution.
withNewExecutionId¶
withNewExecutionId[T](
queryExecution: QueryExecution,
name: Option[String] = None)(
body: => T): T
withNewExecutionId executes body query action with a next available execution ID.
withNewExecutionId replaces an existing execution ID, if defined, until the entire body finishes.
withNewExecutionId posts SparkListenerSQLExecutionStart and SparkListenerSQLExecutionEnd events right before and right after executing the body action, respectively.
withNewExecutionId is used when:
Datasetis requested to withNewExecutionId, withNewRDDExecutionId, Dataset.withActionQueryExecutionis requested to eagerlyExecuteCommands- others (in Spark Thrift Server and Spark Structured Streaming)
withExecutionId¶
withExecutionId[T](
sparkSession: SparkSession,
executionId: String)(
body: => T): T
withExecutionId executes the body under the given executionId execution identifier.
val rdd = sc.parallelize(0 to 5, numSlices = 2)
import org.apache.spark.TaskContext
def func(ctx: TaskContext, ns: Iterator[Int]): Int = {
ctx.partitionId
}
def runSparkJobs = {
sc.runJob(rdd, func _)
}
import org.apache.spark.sql.execution.SQLExecution
SQLExecution.withExecutionId(spark, executionId = "100")(body = runSparkJobs)
withExecutionId is used when:
BroadcastExchangeExecphysical operator is requested to prepare for execution (and initializes relationFuture)SubqueryExecphysical operator is requested to prepare for execution (and initializes relationFuture)