Skip to content


Calculating State Watermark for Values

  attributesToFindStateWatermarkFor: AttributeSet,
  attributesWithEventWatermark: AttributeSet,
  joinCondition: Option[Expression],
  eventWatermark: Option[Long]): Option[Long]

getStateValueWatermark returns None when one of the following holds:

  • Either joinCondition or eventWatermark is empty
  • There are no attributes with watermark delay metadata marker among the given attributesWithEventWatermark

getStateValueWatermark splits And predicates in the given joinCondition expression into a series of the following expressions (and skips the others):

  • LessThan
  • LessThanOrEqual
  • GreaterThan
  • GreaterThanOrEqual


getStateValueWatermark is used when:


  l: Expression,
  r: Expression): Option[Long]

getStateWatermarkSafely getStateWatermarkFromLessThenPredicate.

In case of a non-fatal exception, getStateWatermarkSafely prints out the following WARN message to the logs and returns None (and hence the "Safely" suffix):

Error trying to extract state constraint from condition [joinCondition]


  leftExpr: Expression,
  rightExpr: Expression,
  attributesToFindStateWatermarkFor: AttributeSet,
  attributesWithEventWatermark: AttributeSet,
  eventWatermark: Option[Long]): Option[Long]

getStateWatermarkFromLessThenPredicate prints out the following DEBUG message to the logs:

All on Left:

getStateWatermarkFromLessThenPredicate prints out the following DEBUG message to the logs:

Terms extracted from join condition:

getStateWatermarkFromLessThenPredicate prints out the following DEBUG message to the logs:

Constraint term from join condition:  [constraintTerm]

getStateWatermarkFromLessThenPredicate prints out the following DEBUG message to the logs:

Final expression to evaluate constraint:  [exprWithWatermarkSubstituted]

getStateWatermarkFromLessThenPredicate requests the exprWithWatermarkSubstituted to produce a value (evaluate) that is then converted to seconds.


Enable ALL logging level for org.apache.spark.sql.catalyst.analysis.StreamingJoinHelper logger to see what happens inside.

Add the following line to conf/

Refer to Logging.