StreamingJoinHelper¶
Calculating State Watermark for Values¶
getStateValueWatermark(
attributesToFindStateWatermarkFor: AttributeSet,
attributesWithEventWatermark: AttributeSet,
joinCondition: Option[Expression],
eventWatermark: Option[Long]): Option[Long]
getStateValueWatermark returns None when one of the following holds:
- Either
joinConditionoreventWatermarkis empty - There are no attributes with watermark delay metadata marker among the given
attributesWithEventWatermark
getStateValueWatermark splits And predicates in the given joinCondition expression into a series of the following expressions (and skips the others):
LessThanLessThanOrEqualGreaterThanGreaterThanOrEqual
getStateValueWatermark...FIXME
getStateValueWatermark is used when:
UnsupportedOperationCheckeris requested to checkForStreamStreamJoinWatermarkStreamingSymmetricHashJoinHelperis requested for state watermark predicates
getStateWatermarkSafely¶
getStateWatermarkSafely(
l: Expression,
r: Expression): Option[Long]
getStateWatermarkSafely getStateWatermarkFromLessThenPredicate.
In case of a non-fatal exception, getStateWatermarkSafely prints out the following WARN message to the logs and returns None (and hence the "Safely" suffix):
Error trying to extract state constraint from condition [joinCondition]
getStateWatermarkFromLessThenPredicate¶
getStateWatermarkFromLessThenPredicate(
leftExpr: Expression,
rightExpr: Expression,
attributesToFindStateWatermarkFor: AttributeSet,
attributesWithEventWatermark: AttributeSet,
eventWatermark: Option[Long]): Option[Long]
getStateWatermarkFromLessThenPredicate prints out the following DEBUG message to the logs:
All on Left:
[allOnLeftExpr.tree]
[allOnLeftExpr.asCode]
getStateWatermarkFromLessThenPredicate prints out the following DEBUG message to the logs:
Terms extracted from join condition:
[terms]
getStateWatermarkFromLessThenPredicate prints out the following DEBUG message to the logs:
Constraint term from join condition: [constraintTerm]
getStateWatermarkFromLessThenPredicate prints out the following DEBUG message to the logs:
Final expression to evaluate constraint: [exprWithWatermarkSubstituted]
getStateWatermarkFromLessThenPredicate requests the exprWithWatermarkSubstituted to produce a value (evaluate) that is then converted to seconds.
Logging¶
Enable ALL logging level for org.apache.spark.sql.catalyst.analysis.StreamingJoinHelper logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.sql.catalyst.analysis.StreamingJoinHelper=ALL
Refer to Logging.