StreamingJoinHelper¶
Calculating State Watermark for Values¶
getStateValueWatermark(
attributesToFindStateWatermarkFor: AttributeSet,
attributesWithEventWatermark: AttributeSet,
joinCondition: Option[Expression],
eventWatermark: Option[Long]): Option[Long]
getStateValueWatermark
returns None
when one of the following holds:
- Either
joinCondition
oreventWatermark
is empty - There are no attributes with watermark delay metadata marker among the given
attributesWithEventWatermark
getStateValueWatermark
splits And
predicates in the given joinCondition
expression into a series of the following expressions (and skips the others):
LessThan
LessThanOrEqual
GreaterThan
GreaterThanOrEqual
getStateValueWatermark
...FIXME
getStateValueWatermark
is used when:
UnsupportedOperationChecker
is requested to checkForStreamStreamJoinWatermarkStreamingSymmetricHashJoinHelper
is requested for state watermark predicates
getStateWatermarkSafely¶
getStateWatermarkSafely(
l: Expression,
r: Expression): Option[Long]
getStateWatermarkSafely
getStateWatermarkFromLessThenPredicate.
In case of a non-fatal exception, getStateWatermarkSafely
prints out the following WARN message to the logs and returns None
(and hence the "Safely" suffix):
Error trying to extract state constraint from condition [joinCondition]
getStateWatermarkFromLessThenPredicate¶
getStateWatermarkFromLessThenPredicate(
leftExpr: Expression,
rightExpr: Expression,
attributesToFindStateWatermarkFor: AttributeSet,
attributesWithEventWatermark: AttributeSet,
eventWatermark: Option[Long]): Option[Long]
getStateWatermarkFromLessThenPredicate
prints out the following DEBUG message to the logs:
All on Left:
[allOnLeftExpr.tree]
[allOnLeftExpr.asCode]
getStateWatermarkFromLessThenPredicate
prints out the following DEBUG message to the logs:
Terms extracted from join condition:
[terms]
getStateWatermarkFromLessThenPredicate
prints out the following DEBUG message to the logs:
Constraint term from join condition: [constraintTerm]
getStateWatermarkFromLessThenPredicate
prints out the following DEBUG message to the logs:
Final expression to evaluate constraint: [exprWithWatermarkSubstituted]
getStateWatermarkFromLessThenPredicate
requests the exprWithWatermarkSubstituted
to produce a value (evaluate) that is then converted to seconds.
Logging¶
Enable ALL
logging level for org.apache.spark.sql.catalyst.analysis.StreamingJoinHelper
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.catalyst.analysis.StreamingJoinHelper=ALL
Refer to Logging.