Skip to content

DatasetManager

DatasetManager is a global manager of materialize datasets (tables and persistent views) right after a pipeline update.

DatasetManager

Materialization

Materialization is a process of publishing tables and persistent views to session TableCatalog (Spark SQL) and SessionCatalog (Spark SQL), for tables and persistent views, respectively.

Scala object

DatasetManager is an object in Scala which means it is a class that has exactly one instance (itself). A Scala object is created lazily when it is referenced for the first time.

Learn more in Tour of Scala.

Materialize Datasets

materializeDatasets(
  resolvedDataflowGraph: DataflowGraph,
  context: PipelineUpdateContext): DataflowGraph

materializeDatasets finds the tables to be refreshed among the tables of the given resolved DataflowGraph.

All the tables to be refreshed ignored

The first collection of tables to be refreshed is completely ignored from the tables to be refreshed.

materializeDatasets materializes every table found to be refreshed or fully refreshed (that are referred to as materialized in the code).

In the end, materializeDatasets materializeViews.


materializeDatasets is used when:

Materialize Table

materializeTable(
  resolvedDataflowGraph: DataflowGraph,
  table: Table,
  isFullRefresh: Boolean,
  context: PipelineUpdateContext): Table

materializeTable uses TableCatalog (Spark SQL) for the given Table to make necessary changes:

  • Loading and altering the table (even truncating it for a full refresh), if available
  • Creating the table, otherwise

In the end, materializeTable returns a Table that may have the normalized table storage path property set to be the location property of this table.


materializeTable prints out the following INFO message to the logs:

Materializing metadata for table [identifier].

materializeTable uses the given PipelineUpdateContext to find the CatalogManager (Spark SQL) (in the SparkSession).

materializeTable finds the TableCatalog (Spark SQL) for the table.

TableCatalog

materializeTable uses the catalog part of the table identifier, if defined, to find the table's catalog (Spark SQL). Otherwise, materializeTable defaults to the current catalog (Spark SQL).

materializeTable requests the TableCatalog (Spark SQL) to load the table, if exists (Spark SQL) already.

For an existing table, materializeTable wipes data out (TRUNCATE TABLE) if it is isFullRefresh or the table is not streaming.

For an existing table, materializeTable requests the TableCatalog (Spark SQL) to alter the table if there are any changes in the schema or table properties.

Unless created already, materializeTable requests the TableCatalog (Spark SQL) to create the table.

In the end, materializeTable requests the TableCatalog (Spark SQL) to load the materialized table and returns the given Table back (with the normalized table storage path updated to the location property of the materialized table).

Materialize Views

materializeViews(
  virtualizedConnectedGraphWithTables: DataflowGraph,
  context: PipelineUpdateContext): Unit

materializeViews requests the given DataflowGraph for the persisted views to materialize (publish or refresh).

Publish (Materialize) Views

To publish a view, it is required that all the input sources must exist in the metastore. If a Persisted View target reads another Persisted View source, the source must be published first.

materializeViews...FIXME

For each view to be persisted (with no pending inputs), materializeViews materialize the view.

Materialize View

materializeView(
  view: View,
  flow: ResolvedFlow,
  spark: SparkSession): Unit

materializeView executes a CreateViewCommand (Spark SQL) logical command (with PersistedView view type).


materializeView creates a CreateViewCommand (Spark SQL) logical command (as a PersistedView with allowExisting and replace flags enabled).

materializeView requests the given ResolvedFlow for the QueryContext to set the current catalog and current database, if defined, in the session CatalogManager (Spark SQL).

In the end, materializeView executes the CreateViewCommand (Spark SQL).

Find Tables to Refresh (incl. Full Refresh)

constructFullRefreshSet(
  graphTables: Seq[Table],
  context: PipelineUpdateContext): (Seq[Table], Seq[TableIdentifier], Seq[TableIdentifier])

constructFullRefreshSet determines the following table (identifiers) collections:

  1. Tables to be refreshed (incl. a full refresh)
  2. TableIdentifiers of the tables to be refreshed (excl. fully refreshed)
  3. TableIdentifiers of the tables to be fully refreshed only
First return value ignored by materializeDatasets

It appears that the first collection of tables to be refreshed is completely ignored while materializing datasets (the only place in the codebase that uses it).

If there are tables to be fully refreshed yet not allowed for a full refresh (per pipelines.reset.allowed table property), constructFullRefreshSet prints out the following INFO message to the logs:

Skipping full refresh on some tables
because pipelines.reset.allowed was set to false.
Tables: [fullRefreshNotAllowed]

constructFullRefreshSet is used when: