Skip to content

StreamMetadata

StreamMetadata is a metadata associated with a StreamExecution (and thus with a StreamingQuery).

StreamMetadata can be loaded from a metadata file (e.g., when a StreamExecution is restarted) or created from scratch (e.g., when the streaming query is started for the first time).

StreamMetadata uses json4s-jackson library for JSON persistence.

import org.apache.spark.sql.execution.streaming.StreamMetadata
import org.apache.hadoop.fs.Path
val metadataPath = new Path("metadata")

assert(spark.isInstanceOf[org.apache.spark.sql.SparkSession])

val hadoopConf = spark.sessionState.newHadoopConf()
val sm = StreamMetadata.read(metadataPath, hadoopConf)

assert(sm.isInstanceOf[Option[org.apache.spark.sql.execution.streaming.StreamMetadata]])

Creating Instance

StreamMetadata takes the following to be created:

  • ID (default: a randomly generated UUID)

StreamMetadata is created when:

Loading Stream Metadata

read(
  metadataFile: Path,
  hadoopConf: Configuration): Option[StreamMetadata]

read creates a CheckpointFileManager (with the parent directory of the given metadataFile).

read requests the CheckpointFileManager whether the given metadataFile exists or not.

If the metadata file is available, read tries to unpersist a StreamMetadata from the given metadataFile file. Unless successful (the metadata file was available and the content was properly JSON-encoded), read returns None.

json4s-jackson

read uses org.json4s.jackson.Serialization.read for JSON deserialization.


read is used when:

  • StreamExecution is created (and the metadata checkpoint file is available)

Persisting Metadata

write(
  metadata: StreamMetadata,
  metadataFile: Path,
  hadoopConf: Configuration): Unit

write persists (saves) the given StreamMetadata to the given metadataFile file in JSON format.

json4s-jackson

write uses org.json4s.jackson.Serialization.write for JSON serialization.


write is used when:

  • StreamExecution is created (and the metadata checkpoint file is not available)