Skip to content

StreamingJoinHelper

Calculating State Watermark for Values

getStateValueWatermark(
  attributesToFindStateWatermarkFor: AttributeSet,
  attributesWithEventWatermark: AttributeSet,
  joinCondition: Option[Expression],
  eventWatermark: Option[Long]): Option[Long]

getStateValueWatermark returns None when one of the following holds:

  • Either joinCondition or eventWatermark is empty
  • There are no attributes with watermark delay metadata marker among the given attributesWithEventWatermark

getStateValueWatermark splits And predicates in the given joinCondition expression into a series of the following expressions (and skips the others):

  • LessThan
  • LessThanOrEqual
  • GreaterThan
  • GreaterThanOrEqual

getStateValueWatermark...FIXME


getStateValueWatermark is used when:

getStateWatermarkSafely

getStateWatermarkSafely(
  l: Expression,
  r: Expression): Option[Long]

getStateWatermarkSafely getStateWatermarkFromLessThenPredicate.

In case of a non-fatal exception, getStateWatermarkSafely prints out the following WARN message to the logs and returns None (and hence the "Safely" suffix):

Error trying to extract state constraint from condition [joinCondition]

getStateWatermarkFromLessThenPredicate

getStateWatermarkFromLessThenPredicate(
  leftExpr: Expression,
  rightExpr: Expression,
  attributesToFindStateWatermarkFor: AttributeSet,
  attributesWithEventWatermark: AttributeSet,
  eventWatermark: Option[Long]): Option[Long]

getStateWatermarkFromLessThenPredicate prints out the following DEBUG message to the logs:

All on Left:
[allOnLeftExpr.tree]
[allOnLeftExpr.asCode]

getStateWatermarkFromLessThenPredicate prints out the following DEBUG message to the logs:

Terms extracted from join condition:
  [terms]

getStateWatermarkFromLessThenPredicate prints out the following DEBUG message to the logs:

Constraint term from join condition:  [constraintTerm]

getStateWatermarkFromLessThenPredicate prints out the following DEBUG message to the logs:

Final expression to evaluate constraint:  [exprWithWatermarkSubstituted]

getStateWatermarkFromLessThenPredicate requests the exprWithWatermarkSubstituted to produce a value (evaluate) that is then converted to seconds.

Logging

Enable ALL logging level for org.apache.spark.sql.catalyst.analysis.StreamingJoinHelper logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.catalyst.analysis.StreamingJoinHelper=ALL

Refer to Logging.