ColumnarBatch¶
ColumnarBatch allows to work with multiple ColumnVectors as a row-wise table for Columnar Scan and Vectorized Query Execution.
Creating Instance¶
ColumnarBatch takes the following to be created:
ColumnarBatch immediately creates an internal ColumnarBatchRow.
ColumnarBatch is created when:
RowToColumnarExecphysical operator is requested to doExecuteColumnar- InMemoryTableScanExec leaf physical operator is requested for a RDD[ColumnarBatch]
OrcColumnarBatchReaderis requested toinitBatchVectorizedParquetRecordReaderis requested to init a batch- others (PySpark and SparkR)
ColumnarBatchRow¶
ColumnarBatch creates a ColumnarBatchRow when created.
Number of Rows¶
int numRows
ColumnarBatch is given the number of rows (numRows) when created.
numRows can also be (re)set using setNumRows (and is often used to reset a ColumnarBatch to 0 before the end value is set).
numRows is available using numRows accessor.
Used when:
numRows¶
int numRows()
numRows returns the number of rows.
numRows is used when:
FileScanRDDis requested to compute a partitionColumnarToRowExecphysical operator is requested to executeInMemoryTableScanExecphysical operator is requested for the columnarInputRDDMetricsBatchIteratoris requested fornext(ColumnarBatch)- DataSourceV2ScanExecBase and FileSourceScanExec physical operators are requested to
doExecuteColumnar - others (PySpark)
setNumRows¶
void setNumRows(
int numRows)
setNumRows sets the setNumRows registry to the given numRows.
setNumRows is used when:
OrcColumnarBatchReaderis requested tonextBatchVectorizedParquetRecordReaderis requested to nextBatchRowToColumnarExecphysical operator is requested to doExecuteColumnarInMemoryTableScanExecphysical operator is requested for the columnarInputRDD (and usesDefaultCachedBatchSerializertoconvertCachedBatchToColumnarBatch)- others (PySpark and SparkR)
rowIterator¶
Iterator<InternalRow> rowIterator()
rowIterator...FIXME
rowIterator is used when:
SparkResultis requested foriterator- ColumnarToRowExec physical operator is executed
- others (SparkR and PySpark)
Demo¶
import org.apache.spark.sql.types._
val schema = new StructType()
.add("intCol", IntegerType)
.add("doubleCol", DoubleType)
.add("intCol2", IntegerType)
.add("string", BinaryType)
val capacity = 4 * 1024 // 4k
import org.apache.spark.memory.MemoryMode
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
val columns = schema.fields.map { field =>
new OnHeapColumnVector(capacity, field.dataType)
}
import org.apache.spark.sql.vectorized.ColumnarBatch
val batch = new ColumnarBatch(columns.toArray)
// Add a row [1, 1.1, NULL]
columns(0).putInt(0, 1)
columns(1).putDouble(0, 1.1)
columns(2).putNull(0)
columns(3).putByteArray(0, "Hello".getBytes(java.nio.charset.StandardCharsets.UTF_8))
batch.setNumRows(1)
assert(batch.getRow(0).numFields == 4)