GenerateUnsafeProjection¶
GenerateUnsafeProjection is a CodeGenerator to generate the bytecode for creating an UnsafeProjection from the given Expressions (i.e. CodeGenerator[Seq[Expression], UnsafeProjection]).
GenerateUnsafeProjection: Seq[Expression] => UnsafeProjection
Binding Expressions to Schema¶
bind(
in: Seq[Expression],
inputSchema: Seq[Attribute]): Seq[Expression]
bind binds the given in expressions to the given inputSchema.
bind is part of the CodeGenerator abstraction.
Creating UnsafeProjection (for Expressions)¶
create(
references: Seq[Expression]): UnsafeProjection // (1)!
create(
expressions: Seq[Expression],
subexpressionEliminationEnabled: Boolean): UnsafeProjection
subexpressionEliminationEnabledflag isfalse
create creates a new CodegenContext.
create...FIXME
create is part of the CodeGenerator abstraction.
Logging¶
Enable ALL logging level for org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection logger to see what happens inside.
Add the following line to conf/log4j2.properties:
log4j.logger.org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection=ALL
Refer to Logging.
Review Me¶
=== [[generate]] Generating UnsafeProjection -- generate Method
[source, scala]¶
generate( expressions: Seq[Expression], subexpressionEliminationEnabled: Boolean): UnsafeProjection
generate <expressions followed by <
generate is used when:
-
UnsafeProjectionfactory object is requested for a UnsafeProjection -
ExpressionEncoderis requested to initialize the internal UnsafeProjection -
FileFormatis requested to build a data reader with partition column values appended -
OrcFileFormatis requested tobuildReaderWithPartitionValues -
ParquetFileFormatis requested to build a data reader with partition column values appended -
GroupedIteratoris requested forkeyProjection -
ObjectOperatoris requested toserializeObjectToRow -
(Spark MLlib)
LibSVMFileFormatis requested tobuildReader -
(Spark Structured Streaming)
StateStoreRestoreExec,StateStoreSaveExecandStreamingDeduplicateExecare requested to execute
=== [[canonicalize]] canonicalize Method
[source, scala]¶
canonicalize(in: Seq[Expression]): Seq[Expression]¶
canonicalize removes unnecessary Alias expressions.
Internally, canonicalize uses ExpressionCanonicalizer rule executor (that in turn uses just one CleanExpressions expression rule).
=== [[create]] Generating JVM Bytecode For UnsafeProjection For Given Expressions (With Optional Subexpression Elimination) -- create Method
[source, scala]¶
create( expressions: Seq[Expression], subexpressionEliminationEnabled: Boolean): UnsafeProjection create(references: Seq[Expression]): UnsafeProjection // <1>
<1> Calls the former create with subexpressionEliminationEnabled flag off
create first creates a CodegenContext and an <expressions.
create creates a code body with public java.lang.Object generate(Object[] references) method that creates a SpecificUnsafeProjection.
[source, java]¶
public java.lang.Object generate(Object[] references) { return new SpecificUnsafeProjection(references); }
class SpecificUnsafeProjection extends UnsafeProjection { ... }
create creates a CodeAndComment with the code body and comment placeholders.
You should see the following DEBUG message in the logs:
DEBUG GenerateUnsafeProjection: code for [expressions]:
[code]
[TIP]¶
Enable DEBUG logging level for org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator logger to see the message above.
log4j.logger.org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator=DEBUG
See CodeGenerator.¶
create requests CodeGenerator to compile the Java source code to JVM bytecode (using Janino).
create requests CodegenContext for references and requests the compiled class to create a SpecificUnsafeProjection for the input references that in the end is the final UnsafeProjection.
(Single-argument) create is part of the CodeGenerator abstraction.
=== [[createCode]] Creating ExprCode for Expressions (With Optional Subexpression Elimination) -- createCode Method
[source, scala]¶
createCode( ctx: CodegenContext, expressions: Seq[Expression], useSubexprElimination: Boolean = false): ExprCode
createCode requests the input CodegenContext to generate a Java source code for code-generated evaluation of every expression in the input expressions.
createCode...FIXME
[source, scala]¶
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext val ctx = new CodegenContext
// Use Catalyst DSL import org.apache.spark.sql.catalyst.dsl.expressions._ val expressions = "hello".expr.as("world") :: "hello".expr.as("world") :: Nil
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection val eval = GenerateUnsafeProjection.createCode(ctx, expressions, useSubexprElimination = true)
scala> println(eval.code)
mutableStateArray1[0].reset();
mutableStateArray2[0].write(0, ((UTF8String) references[0] /* literal */));
mutableStateArray2[0].write(1, ((UTF8String) references[1] /* literal */));
mutableStateArray[0].setTotalSize(mutableStateArray1[0].totalSize());
scala> println(eval.value) mutableStateArray[0]
createCode is used when:
-
CreateNamedStructUnsafeis requested to generate a Java source code -
GenerateUnsafeProjectionis requested to create a UnsafeProjection -
CodegenSupportis requested to prepareRowVar (to generate a Java source code to consume generated columns or row from a physical operator) -
HashAggregateExecis requested to doProduceWithKeys and doConsumeWithKeys -
BroadcastHashJoinExecis requested to genStreamSideJoinKey (when generating the Java source code for joins)