Skip to content

ColumnarBatch

ColumnarBatch allows to work with multiple ColumnVectors as a row-wise table for Columnar Scan and Vectorized Query Execution.

Creating Instance

ColumnarBatch takes the following to be created:

ColumnarBatch immediately creates an internal ColumnarBatchRow.

ColumnarBatch is created when:

ColumnarBatchRow

ColumnarBatch creates a ColumnarBatchRow when created.

Number of Rows

int numRows

ColumnarBatch is given the number of rows (numRows) when created.

numRows can also be (re)set using setNumRows (and is often used to reset a ColumnarBatch to 0 before the end value is set).

numRows is available using numRows accessor.

Used when:

numRows

int numRows()

numRows returns the number of rows.


numRows is used when:

setNumRows

void setNumRows(
  int numRows)

setNumRows sets the setNumRows registry to the given numRows.


setNumRows is used when:

  • OrcColumnarBatchReader is requested to nextBatch
  • VectorizedParquetRecordReader is requested to nextBatch
  • RowToColumnarExec physical operator is requested to doExecuteColumnar
  • InMemoryTableScanExec physical operator is requested for the columnarInputRDD (and uses DefaultCachedBatchSerializer to convertCachedBatchToColumnarBatch)
  • others (PySpark and SparkR)

rowIterator

Iterator<InternalRow> rowIterator()

rowIterator...FIXME


rowIterator is used when:

Demo

import org.apache.spark.sql.types._
val schema = new StructType()
  .add("intCol", IntegerType)
  .add("doubleCol", DoubleType)
  .add("intCol2", IntegerType)
  .add("string", BinaryType)

val capacity = 4 * 1024 // 4k
import org.apache.spark.memory.MemoryMode
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
val columns = schema.fields.map { field =>
  new OnHeapColumnVector(capacity, field.dataType)
}

import org.apache.spark.sql.vectorized.ColumnarBatch
val batch = new ColumnarBatch(columns.toArray)

// Add a row [1, 1.1, NULL]
columns(0).putInt(0, 1)
columns(1).putDouble(0, 1.1)
columns(2).putNull(0)
columns(3).putByteArray(0, "Hello".getBytes(java.nio.charset.StandardCharsets.UTF_8))
batch.setNumRows(1)

assert(batch.getRow(0).numFields == 4)