Skip to content

SQLExecution

spark.sql.execution.id

SQLExecution defines spark.sql.execution.id that is used as the key of the Spark local property in the following:

spark.sql.execution.id is used to track multiple Spark jobs that should all together be considered part of a single execution of a structured query.

import org.apache.spark.sql.execution.SQLExecution
scala> println(SQLExecution.EXECUTION_ID_KEY)
spark.sql.execution.id

spark.sql.execution.id allows "stitching" different Spark jobs (esp. executed on separate threads) as part of one structured query (that you can then see in web UI's SQL tab).

Tip

Use SparkListener (Spark Core) to listen to SparkListenerSQLExecutionStart events and know the execution IDs of structured queries that have been executed in a Spark SQL application.

// "SQLAppStatusListener" idea is borrowed from
// Spark SQL's org.apache.spark.sql.execution.ui.SQLAppStatusListener
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
import org.apache.spark.sql.execution.ui.{SparkListenerDriverAccumUpdates, SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart}
public class SQLAppStatusListener extends SparkListener {
  override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
    case e: SparkListenerSQLExecutionStart => onExecutionStart(e)
    case e: SparkListenerSQLExecutionEnd => onExecutionEnd(e)
    case e: SparkListenerDriverAccumUpdates => onDriverAccumUpdates(e)
    case _ => // Ignore
  }
  def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
    // Find the QueryExecution for the Dataset action that triggered the event
    // This is the SQL-specific way
    import org.apache.spark.sql.execution.SQLExecution
    queryExecution = SQLExecution.getQueryExecution(event.executionId)
  }
  def onJobStart(jobStart: SparkListenerJobStart): Unit = {
    // Find the QueryExecution for the Dataset action that triggered the event
    // This is a general Spark Core way using local properties
    import org.apache.spark.sql.execution.SQLExecution
    val executionIdStr = jobStart.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
    // Note that the Spark job may or may not be a part of a structured query
    if (executionIdStr != null) {
      queryExecution = SQLExecution.getQueryExecution(executionIdStr.toLong)
    }
  }
  def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {}
  def onDriverAccumUpdates(event: SparkListenerDriverAccumUpdates): Unit = {}
}

val sqlListener = new SQLAppStatusListener()
spark.sparkContext.addSparkListener(sqlListener)

Spark jobs without spark.sql.execution.id local property can then be considered to belong to a SQL query execution.

withNewExecutionId

withNewExecutionId[T](
  queryExecution: QueryExecution,
  name: Option[String] = None)(
  body: => T): T

withNewExecutionId executes body query action with a next available execution ID.

withNewExecutionId replaces an existing execution ID, if defined, until the entire body finishes.

withNewExecutionId posts SparkListenerSQLExecutionStart and SparkListenerSQLExecutionEnd events right before and right after executing the body action, respectively.


withNewExecutionId is used when:

withExecutionId

withExecutionId[T](
  sparkSession: SparkSession,
  executionId: String)(
  body: => T): T

withExecutionId executes the body under the given executionId execution identifier.

val rdd = sc.parallelize(0 to 5, numSlices = 2)

import org.apache.spark.TaskContext
def func(ctx: TaskContext, ns: Iterator[Int]): Int = {
  ctx.partitionId
}

def runSparkJobs = {
  sc.runJob(rdd, func _)
}

import org.apache.spark.sql.execution.SQLExecution
SQLExecution.withExecutionId(spark, executionId = "100")(body = runSparkJobs)

withExecutionId is used when: