InMemoryTableScanExec Leaf Physical Operator¶
InMemoryTableScanExec
is a leaf physical operator that represents an InMemoryRelation logical operator at execution time.
InMemoryTableScanExec
is <
[[creating-instance]] InMemoryTableScanExec
takes the following to be created:
- [[attributes]] Attribute expressions
- [[predicates]] Predicate expressions
- [[relation]] InMemoryRelation logical operator
InMemoryTableScanExec
is a ColumnarBatchScan that <
InMemoryTableScanExec
supports <
// Sample DataFrames
val tokens = Seq(
(0, "playing"),
(1, "with"),
(2, "InMemoryTableScanExec")
).toDF("id", "token")
val ids = spark.range(10)
// Cache DataFrames
tokens.cache
ids.cache
val q = tokens.join(ids, Seq("id"), "outer")
scala> q.explain
== Physical Plan ==
*Project [coalesce(cast(id#5 as bigint), id#10L) AS id#33L, token#6]
+- SortMergeJoin [cast(id#5 as bigint)], [id#10L], FullOuter
:- *Sort [cast(id#5 as bigint) ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(cast(id#5 as bigint), 200)
: +- InMemoryTableScan [id#5, token#6]
: +- InMemoryRelation [id#5, token#6], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
: +- LocalTableScan [id#5, token#6]
+- *Sort [id#10L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#10L, 200)
+- InMemoryTableScan [id#10L]
+- InMemoryRelation [id#10L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Range (0, 10, step=1, splits=8)
val q = spark.range(4).cache
val plan = q.queryExecution.executedPlan
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
val inmemoryScan = plan.collectFirst { case exec: InMemoryTableScanExec => exec }.get
assert(inmemoryScan.supportCodegen == inmemoryScan.supportsBatch)
[[supportCodegen]] InMemoryTableScanExec
supports Java code generation only if <
[[inputRDDs]] InMemoryTableScanExec
gives the single <WholeStageCodegenExec
physical operator is WholeStageCodegenExec.md#doExecute[executed]).
[[enableAccumulatorsForTest]] [[spark.sql.inMemoryTableScanStatistics.enable]] InMemoryTableScanExec
uses spark.sql.inMemoryTableScanStatistics.enable
flag (default: false
) to enable accumulators (that seems to be exclusively for testing purposes).
[[internal-registries]] .InMemoryTableScanExec's Internal Properties (e.g. Registries, Counters and Flags) [cols="1,2",options="header",width="100%"] |=== | Name | Description
| [[columnarBatchSchema]] columnarBatchSchema
| Schema of a columnar batch
Used exclusively when InMemoryTableScanExec
is requested to <
| [[stats]] stats
| PartitionStatistics of the <
Used when InMemoryTableScanExec
is requested for <
=== [[vectorTypes]] vectorTypes
Method
[source, scala]¶
vectorTypes: Option[Seq[String]]¶
NOTE: vectorTypes
is part of spark-sql-ColumnarBatchScan.md#vectorTypes[ColumnarBatchScan Contract] to...FIXME.
vectorTypes
uses spark.sql.columnVector.offheap.enabled internal configuration property to select the name of the concrete column vector (OnHeapColumnVector or OffHeapColumnVector).
vectorTypes
gives as many column vectors as the attribute expressions.
supportsBatch Flag¶
supportsBatch: Boolean
supportsBatch
is part of the ColumnarBatchScan abstraction.
supportsBatch
is enabled when all of the following holds:
-
spark.sql.inMemoryColumnarStorage.enableVectorizedReader configuration property is enabled
-
The output schema of the InMemoryRelation uses primitive data types only BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType
-
The number of nested fields in the output schema of the InMemoryRelation is at most spark.sql.codegen.maxFields internal configuration property
=== [[partitionFilters]] partitionFilters
Property
[source, scala]¶
partitionFilters: Seq[Expression]¶
NOTE: partitionFilters
is a Scala lazy value which is computed once when accessed and cached afterwards.
partitionFilters
...FIXME
NOTE: partitionFilters
is used when...FIXME
=== [[filteredCachedBatches]] Applying Partition Batch Pruning to Cached Column Buffers (Creating MapPartitionsRDD of Filtered CachedBatches) -- filteredCachedBatches
Internal Method
[source, scala]¶
filteredCachedBatches(): RDD[CachedBatch]¶
filteredCachedBatches
requests <RDD[CachedBatch]
).
filteredCachedBatches
takes the cached column buffers (as a RDD[CachedBatch]
) and transforms the RDD per partition with index (i.e. RDD.mapPartitionsWithIndexInternal
) as follows:
-
Creates a partition filter as a new GenPredicate for the <
> expressions (concatenated together using And
binary operator and the schema) -
Requests the generated partition filter
Predicate
toinitialize
-
Uses spark.sql.inMemoryColumnarStorage.partitionPruning internal configuration property to enable partition batch pruning and filtering out (skipping)
CachedBatches
in a partition based on column stats and the generated partition filterPredicate
Note
When spark.sql.inMemoryColumnarStorage.partitionPruning internal configuration property is disabled, filteredCachedBatches
does nothing and simply passes all CachedBatch
elements along.
filteredCachedBatches
is used when InMemoryTableScanExec
is requested for the inputRDD internal property.
=== [[statsFor]] statsFor
Internal Method
[source, scala]¶
statsFor(a: Attribute)¶
statsFor
...FIXME
NOTE: statsFor
is used when...FIXME
=== [[createAndDecompressColumn]] createAndDecompressColumn
Internal Method
createAndDecompressColumn(
cachedColumnarBatch: CachedBatch): ColumnarBatch
createAndDecompressColumn
takes the number of rows in the input CachedBatch
.
createAndDecompressColumn
requests OffHeapColumnVector or OnHeapColumnVector to allocate column vectors (with the number of rows and columnarBatchSchema) per the spark.sql.columnVector.offheap.enabled internal configuration flag.
createAndDecompressColumn
creates a ColumnarBatch for the allocated column vectors (as an array of ColumnVector
).
createAndDecompressColumn
sets the number of rows in the columnar batch.
For every <createAndDecompressColumn
requests ColumnAccessor
to decompress
the column.
createAndDecompressColumn
registers a callback to be executed on a task completion that will close the ColumnarBatch
.
In the end, createAndDecompressColumn
returns the ColumnarBatch
.
NOTE: createAndDecompressColumn
is used exclusively when InMemoryTableScanExec
is requested for the <
=== [[inputRDD]] Creating Input RDD of Internal Rows -- inputRDD
Internal Property
[source, scala]¶
inputRDD: RDD[InternalRow]¶
NOTE: inputRDD
is a Scala lazy value which is computed once when accessed and cached afterwards.
inputRDD
firstly <RDD[CachedBatch]
).
With <inputRDD
finishes with a new MapPartitionsRDD
(using RDD.map
) by <
CAUTION: Show examples of <
[source, scala]¶
// Demo: A MapPartitionsRDD in the RDD lineage val q = spark.range(4).cache val plan = q.queryExecution.executedPlan import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec val inmemoryScan = plan.collectFirst { case exec: InMemoryTableScanExec => exec }.get
// supportsBatch flag is on since the schema is a single column of longs assert(inmemoryScan.supportsBatch)
val rdd = inmemoryScan.inputRDDs.head scala> rdd.toDebugString res2: String = (8) MapPartitionsRDD[5] at inputRDDs at
With <inputRDD
firstly <RDD[CachedBatch]
).
NOTE: Indeed. inputRDD
<RDD[CachedBatch]
) twice which seems unnecessary.
In the end, inputRDD
creates a new MapPartitionsRDD
(using RDD.map
) with a ColumnarIterator
applied to all cached columnar batches that is created as follows:
. For every CachedBatch
in the partition iterator adds the total number of rows in the batch to <
. Requests GenerateColumnAccessor
to generate the Java code for a ColumnarIterator
to perform expression evaluation for the given <
. Requests ColumnarIterator
to initialize
[source, scala]¶
// Demo: A MapPartitionsRDD in the RDD lineage (supportsBatch flag off) import java.sql.Date import java.time.LocalDate val q = Seq(Date.valueOf(LocalDate.now)).toDF("date").cache val plan = q.queryExecution.executedPlan
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec val inmemoryScan = plan.collectFirst { case exec: InMemoryTableScanExec => exec }.get
// supportsBatch flag is off since the schema uses java.sql.Date assert(inmemoryScan.supportsBatch == false)
val rdd = inmemoryScan.inputRDDs.head scala> rdd.toDebugString res2: String = (1) MapPartitionsRDD[12] at inputRDDs at
NOTE: inputRDD
is used when InMemoryTableScanExec
is requested for the <
=== [[doExecute]] Executing Physical Operator (Generating RDD[InternalRow]) -- doExecute
Method
[source, scala]¶
doExecute(): RDD[InternalRow]¶
doExecute
branches off per <
With <doExecute
creates a WholeStageCodegenExec.md#creating-instance[WholeStageCodegenExec] (with the InMemoryTableScanExec
physical operator as the WholeStageCodegenExec.md#child[child] and WholeStageCodegenExec.md#codegenStageId[codegenStageId] as 0
) and requests it to SparkPlan.md#execute[execute].
Otherwise, when <doExecute
simply gives the <
doExecute
is part of the SparkPlan abstraction.
=== [[buildFilter]] buildFilter
Property
[source, scala]¶
buildFilter: PartialFunction[Expression, Expression]¶
NOTE: buildFilter
is a Scala lazy value which is computed once when accessed and cached afterwards.
buildFilter
is a Scala https://www.scala-lang.org/api/2.11.11/#scala.PartialFunction[PartialFunction] that accepts an expressions/Expression.md[Expression] and produces an expressions/Expression.md[Expression], i.e. PartialFunction[Expression, Expression]
.
[[buildFilter-expressions]] .buildFilter's Expressions [cols="1,2",options="header",width="100%"] |=== | Input Expression | Description
And |
---|
Or |
---|
EqualTo |
---|
EqualNullSafe |
---|
LessThan |
---|
LessThanOrEqual |
---|
GreaterThan |
---|
GreaterThanOrEqual |
---|
IsNull |
---|
IsNotNull |
---|
| In
with a non-empty spark-sql-Expression-In.md#list[list] of spark-sql-Expression-Literal.md[Literal] expressions | For every Literal
expression in the expression list, buildFilter
creates an And
expression with the lower and upper bounds of the <Literal
.
In the end, buildFilter
joins the And
expressions with Or
expressions. |===
NOTE: buildFilter
is used exclusively when InMemoryTableScanExec
is requested for <
=== [[innerChildren]] innerChildren
Method
[source, scala]¶
innerChildren: Seq[QueryPlan[_]]¶
NOTE: innerChildren
is part of catalyst/QueryPlan.md#innerChildren[QueryPlan Contract] to...FIXME.
innerChildren
...FIXME
Performance Metrics¶
Key | Name (in web UI) | Description |
---|---|---|
numOutputRows | number of output rows | Number of output rows |