GenerateUnsafeProjection¶
GenerateUnsafeProjection
is a CodeGenerator to generate the bytecode for creating an UnsafeProjection from the given Expressions (i.e. CodeGenerator[Seq[Expression], UnsafeProjection]
).
GenerateUnsafeProjection: Seq[Expression] => UnsafeProjection
Binding Expressions to Schema¶
bind(
in: Seq[Expression],
inputSchema: Seq[Attribute]): Seq[Expression]
bind
binds the given in
expressions to the given inputSchema
.
bind
is part of the CodeGenerator abstraction.
Creating UnsafeProjection (for Expressions)¶
create(
references: Seq[Expression]): UnsafeProjection // (1)!
create(
expressions: Seq[Expression],
subexpressionEliminationEnabled: Boolean): UnsafeProjection
subexpressionEliminationEnabled
flag isfalse
create
creates a new CodegenContext.
create
...FIXME
create
is part of the CodeGenerator abstraction.
Logging¶
Enable ALL
logging level for org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
logger to see what happens inside.
Add the following line to conf/log4j2.properties
:
log4j.logger.org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection=ALL
Refer to Logging.
Review Me¶
=== [[generate]] Generating UnsafeProjection -- generate
Method
[source, scala]¶
generate( expressions: Seq[Expression], subexpressionEliminationEnabled: Boolean): UnsafeProjection
generate
<expressions
followed by <
generate
is used when:
-
UnsafeProjection
factory object is requested for a UnsafeProjection -
ExpressionEncoder
is requested to initialize the internal UnsafeProjection -
FileFormat
is requested to build a data reader with partition column values appended -
OrcFileFormat
is requested tobuildReaderWithPartitionValues
-
ParquetFileFormat
is requested to build a data reader with partition column values appended -
GroupedIterator
is requested forkeyProjection
-
ObjectOperator
is requested toserializeObjectToRow
-
(Spark MLlib)
LibSVMFileFormat
is requested tobuildReader
-
(Spark Structured Streaming)
StateStoreRestoreExec
,StateStoreSaveExec
andStreamingDeduplicateExec
are requested to execute
=== [[canonicalize]] canonicalize
Method
[source, scala]¶
canonicalize(in: Seq[Expression]): Seq[Expression]¶
canonicalize
removes unnecessary Alias
expressions.
Internally, canonicalize
uses ExpressionCanonicalizer
rule executor (that in turn uses just one CleanExpressions
expression rule).
=== [[create]] Generating JVM Bytecode For UnsafeProjection For Given Expressions (With Optional Subexpression Elimination) -- create
Method
[source, scala]¶
create( expressions: Seq[Expression], subexpressionEliminationEnabled: Boolean): UnsafeProjection create(references: Seq[Expression]): UnsafeProjection // <1>
<1> Calls the former create
with subexpressionEliminationEnabled
flag off
create
first creates a CodegenContext and an <expressions
.
create
creates a code body with public java.lang.Object generate(Object[] references)
method that creates a SpecificUnsafeProjection
.
[source, java]¶
public java.lang.Object generate(Object[] references) { return new SpecificUnsafeProjection(references); }
class SpecificUnsafeProjection extends UnsafeProjection { ... }
create
creates a CodeAndComment
with the code body and comment placeholders.
You should see the following DEBUG message in the logs:
DEBUG GenerateUnsafeProjection: code for [expressions]:
[code]
[TIP]¶
Enable DEBUG
logging level for org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator
logger to see the message above.
log4j.logger.org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator=DEBUG
See CodeGenerator.¶
create
requests CodeGenerator
to compile the Java source code to JVM bytecode (using Janino).
create
requests CodegenContext
for references and requests the compiled class to create a SpecificUnsafeProjection
for the input references that in the end is the final UnsafeProjection.
(Single-argument) create
is part of the CodeGenerator abstraction.
=== [[createCode]] Creating ExprCode for Expressions (With Optional Subexpression Elimination) -- createCode
Method
[source, scala]¶
createCode( ctx: CodegenContext, expressions: Seq[Expression], useSubexprElimination: Boolean = false): ExprCode
createCode
requests the input CodegenContext
to generate a Java source code for code-generated evaluation of every expression in the input expressions
.
createCode
...FIXME
[source, scala]¶
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext val ctx = new CodegenContext
// Use Catalyst DSL import org.apache.spark.sql.catalyst.dsl.expressions._ val expressions = "hello".expr.as("world") :: "hello".expr.as("world") :: Nil
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection val eval = GenerateUnsafeProjection.createCode(ctx, expressions, useSubexprElimination = true)
scala> println(eval.code)
mutableStateArray1[0].reset();
mutableStateArray2[0].write(0, ((UTF8String) references[0] /* literal */));
mutableStateArray2[0].write(1, ((UTF8String) references[1] /* literal */));
mutableStateArray[0].setTotalSize(mutableStateArray1[0].totalSize());
scala> println(eval.value) mutableStateArray[0]
createCode
is used when:
-
CreateNamedStructUnsafe
is requested to generate a Java source code -
GenerateUnsafeProjection
is requested to create a UnsafeProjection -
CodegenSupport
is requested to prepareRowVar (to generate a Java source code to consume generated columns or row from a physical operator) -
HashAggregateExec
is requested to doProduceWithKeys and doConsumeWithKeys -
BroadcastHashJoinExec
is requested to genStreamSideJoinKey (when generating the Java source code for joins)