ColumnarBatch¶
ColumnarBatch
allows to work with multiple ColumnVectors as a row-wise table for Columnar Scan and Vectorized Query Execution.
Creating Instance¶
ColumnarBatch
takes the following to be created:
ColumnarBatch
immediately creates an internal ColumnarBatchRow.
ColumnarBatch
is created when:
RowToColumnarExec
physical operator is requested to doExecuteColumnar- InMemoryTableScanExec leaf physical operator is requested for a RDD[ColumnarBatch]
OrcColumnarBatchReader
is requested toinitBatch
VectorizedParquetRecordReader
is requested to init a batch- others (PySpark and SparkR)
ColumnarBatchRow¶
ColumnarBatch
creates a ColumnarBatchRow
when created.
Number of Rows¶
int numRows
ColumnarBatch
is given the number of rows (numRows
) when created.
numRows
can also be (re)set using setNumRows (and is often used to reset a ColumnarBatch
to 0
before the end value is set).
numRows
is available using numRows accessor.
Used when:
numRows¶
int numRows()
numRows
returns the number of rows.
numRows
is used when:
FileScanRDD
is requested to compute a partitionColumnarToRowExec
physical operator is requested to executeInMemoryTableScanExec
physical operator is requested for the columnarInputRDDMetricsBatchIterator
is requested fornext
(ColumnarBatch)- DataSourceV2ScanExecBase and FileSourceScanExec physical operators are requested to
doExecuteColumnar
- others (PySpark)
setNumRows¶
void setNumRows(
int numRows)
setNumRows
sets the setNumRows registry to the given numRows
.
setNumRows
is used when:
OrcColumnarBatchReader
is requested tonextBatch
VectorizedParquetRecordReader
is requested to nextBatchRowToColumnarExec
physical operator is requested to doExecuteColumnarInMemoryTableScanExec
physical operator is requested for the columnarInputRDD (and usesDefaultCachedBatchSerializer
toconvertCachedBatchToColumnarBatch
)- others (PySpark and SparkR)
rowIterator¶
Iterator<InternalRow> rowIterator()
rowIterator
...FIXME
rowIterator
is used when:
SparkResult
is requested foriterator
- ColumnarToRowExec physical operator is executed
- others (SparkR and PySpark)
Demo¶
import org.apache.spark.sql.types._
val schema = new StructType()
.add("intCol", IntegerType)
.add("doubleCol", DoubleType)
.add("intCol2", IntegerType)
.add("string", BinaryType)
val capacity = 4 * 1024 // 4k
import org.apache.spark.memory.MemoryMode
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
val columns = schema.fields.map { field =>
new OnHeapColumnVector(capacity, field.dataType)
}
import org.apache.spark.sql.vectorized.ColumnarBatch
val batch = new ColumnarBatch(columns.toArray)
// Add a row [1, 1.1, NULL]
columns(0).putInt(0, 1)
columns(1).putDouble(0, 1.1)
columns(2).putNull(0)
columns(3).putByteArray(0, "Hello".getBytes(java.nio.charset.StandardCharsets.UTF_8))
batch.setNumRows(1)
assert(batch.getRow(0).numFields == 4)