Skip to content


Dataset[T] is a strongly-typed data structure that represents a structured query over rows of T type.

Dataset is created using SQL or Dataset high-level declarative "languages".

Dataset's Internals

It is fair to say that Dataset is a Spark SQL developer-friendly layer over the following two low-level entities:

  1. QueryExecution (with the parsed yet unanalyzed LogicalPlan of a structured query)

  2. Encoder (of the type of the records for fast serialization and deserialization to and from InternalRow)

Creating Instance

Dataset takes the following when created:


Dataset can be created using LogicalPlan when executed using SessionState.

When created, Dataset requests QueryExecution to assert analyzed phase is successful.

Dataset is created when:


  partitionExprs: Column*): Dataset[T] // (1)!
  numPartitions: Int): Dataset[T] // (2)!
  numPartitions: Int,
  partitionExprs: Column*): Dataset[T]  // (3)!
  1. An alias of repartitionByExpression with undefined numPartitions
  2. Creates a Repartition logical operator with shuffle enabled
  3. An alias of repartitionByExpression


  numPartitions: Option[Int],
  partitionExprs: Seq[Column]): Dataset[T]

repartitionByExpression withTypedPlan with a new RepartitionByExpression (with the given numPartitions and partitionExprs, and the logicalPlan).

Example: Number of Partitions Only

val numsRepd = nums.repartition(numPartitions = 4)
assert(numsRepd.rdd.getNumPartitions == 4, "Number of partitions should be 4")
scala> numsRepd.explain(extended = true)
== Parsed Logical Plan ==
Repartition 4, true
+- RepartitionByExpression [id#4L ASC NULLS FIRST]
   +- Range (0, 10, step=1, splits=Some(16))

== Analyzed Logical Plan ==
id: bigint
Repartition 4, true
+- RepartitionByExpression [id#4L ASC NULLS FIRST]
   +- Range (0, 10, step=1, splits=Some(16))

== Optimized Logical Plan ==
Repartition 4, true
+- Range (0, 10, step=1, splits=Some(16))

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange RoundRobinPartitioning(4), REPARTITION_BY_NUM, [id=#75]
   +- Range (0, 10, step=1, splits=16)


  partitionExprs: Column*): Dataset[T]
  numPartitions: Int,
  partitionExprs: Column*): Dataset[T]
    numPartitions: Option[Int],
    partitionExprs: Seq[Column]): Dataset[T] // (1)!
  1. A private method

repartitionByRange creates a SortOrders with Ascending sorting direction (ascending nulls first) for the given partitionExprs with no sorting specified.

In the end, repartitionByRange creates a Dataset with a RepartitionByExpression (with the SortOrders, the logicalPlan and the given numPartitions).

Example: Partition Expressions Only

val nums = spark.range(10).repartitionByRange($"id".asc)
scala> println(nums.queryExecution.logical.numberedTreeString)
00 'RepartitionByExpression ['id ASC NULLS FIRST]
01 +- Range (0, 10, step=1, splits=Some(16))


Adaptive Query Execution is enabled.

scala> println(nums.queryExecution.toRdd.getNumPartitions)
scala> println(nums.queryExecution.toRdd.toDebugString)
(1) SQLExecutionRDD[17] at toRdd at <console>:24 []
 |  ShuffledRowRDD[16] at toRdd at <console>:24 []
 +-(16) MapPartitionsRDD[15] at toRdd at <console>:24 []
    |   MapPartitionsRDD[11] at toRdd at <console>:24 []
    |   MapPartitionsRDD[10] at toRdd at <console>:24 []
    |   ParallelCollectionRDD[9] at toRdd at <console>:24 []

Example: Number of Partitions and Partition Expressions

val q = spark.range(10).repartitionByRange(numPartitions = 5, $"id")
scala> println(q.queryExecution.logical.numberedTreeString)
00 'RepartitionByExpression ['id ASC NULLS FIRST], 5
01 +- Range (0, 10, step=1, splits=Some(16))
scala> println(q.queryExecution.toRdd.getNumPartitions)
scala> println(q.queryExecution.toRdd.toDebugString)
(5) SQLExecutionRDD[8] at toRdd at <console>:24 []
 |  ShuffledRowRDD[7] at toRdd at <console>:24 []
 +-(16) MapPartitionsRDD[6] at toRdd at <console>:24 []
    |   MapPartitionsRDD[2] at toRdd at <console>:24 []
    |   MapPartitionsRDD[1] at toRdd at <console>:24 []
    |   ParallelCollectionRDD[0] at toRdd at <console>:24 []


logicalPlan: LogicalPlan

Dataset initializes an internal LogicalPlan when created.

logicalPlan requests the QueryExecution for a LogicalPlan (with commands executed per the command execution mode).

With SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED enabled, logicalPlan...FIXME. Otherwise, logicalPlan returns the LogicalPlan intact.

Lazy Values

The following are Scala lazy values of Dataset. Using lazy values guarantees that the code to initialize them is executed once only (when accessed for the first time) and the computed value never changes afterwards.

Learn more in the Scala Language Specification.


rdd: RDD[T]

rdd ...FIXME

rdd is used when:


rddQueryExecution: QueryExecution

rddQueryExecution creates a deserializer for the T type and the logical query plan of this Dataset.

In other words, rddQueryExecution simply adds a new DeserializeToObject unary logical operator as the parent of the current logical query plan (that in turn becomes a child operator).

In the end, rddQueryExecution requests the SparkSession for the SessionState to create a QueryExecution for the query plan with the top-most DeserializeToObject.

rddQueryExecution is used when:


  body: => U): U

withNewRDDExecutionId executes the input body action (that produces the result of type U) under a new execution id (with the QueryExecution).

withNewRDDExecutionId requests the QueryExecution for the SparkPlan to resetMetrics.

withNewRDDExecutionId is used when the following Dataset operators are used:


  table: String): DataFrameWriterV2[T]

writeTo creates a DataFrameWriterV2 for the input table and this Dataset.


write: DataFrameWriter[T]

write creates a DataFrameWriter for this Dataset.


collectToPython(): Array[Any]


collectToPython is used when:

  • DataFrame (PySpark) is requested to collect


withTypedPlan[U : Encoder](
  logicalPlan: LogicalPlan): Dataset[U]
Final Method

withTypedPlan is annotated with @inline annotation that requests the Scala compiler to try especially hard to inline it



withSetOperator[U: Encoder](
  logicalPlan: LogicalPlan): Dataset[U]
Final Method

withSetOperator is annotated with @inline annotation that requests the Scala compiler to try especially hard to inline it



apply[T: Encoder](
  sparkSession: SparkSession,
  logicalPlan: LogicalPlan): Dataset[T]


apply is used when:

Review Me

Datasets are lazy and structured query operators and expressions are only triggered when an action is invoked.

import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...

scala> val dataset = spark.range(5)
dataset: org.apache.spark.sql.Dataset[Long] = [id: bigint]

// Variant 1: filter operator accepts a Scala function
dataset.filter(n => n % 2 == 0).count

// Variant 2: filter operator accepts a Column-based SQL expression
dataset.filter('value % 2 === 0).count

// Variant 3: filter operator accepts a SQL query
dataset.filter("value % 2 = 0").count

The <> offers declarative and type-safe operators that makes for an improved experience for data processing (comparing to DataFrames that were a set of index- or column name-based Rows).

Dataset offers convenience of RDDs with the performance optimizations of DataFrames and the strong static type-safety of Scala. The last feature of bringing the strong type-safety to DataFrame makes Dataset so appealing. All the features together give you a more functional programming interface to work with structured data.

scala> spark.range(1).filter('id === 0).explain(true)
== Parsed Logical Plan ==
'Filter ('id = 0)
+- Range (0, 1, splits=8)

== Analyzed Logical Plan ==
id: bigint
Filter (id#51L = cast(0 as bigint))
+- Range (0, 1, splits=8)

== Optimized Logical Plan ==
Filter (id#51L = 0)
+- Range (0, 1, splits=8)

== Physical Plan ==
*Filter (id#51L = 0)
+- *Range (0, 1, splits=8)

scala> spark.range(1).filter(_ == 0).explain(true)
== Parsed Logical Plan ==
'TypedFilter <function1>, class java.lang.Long, [StructField(value,LongType,true)], unresolveddeserializer(newInstance(class java.lang.Long))
+- Range (0, 1, splits=8)

== Analyzed Logical Plan ==
id: bigint
TypedFilter <function1>, class java.lang.Long, [StructField(value,LongType,true)], newInstance(class java.lang.Long)
+- Range (0, 1, splits=8)

== Optimized Logical Plan ==
TypedFilter <function1>, class java.lang.Long, [StructField(value,LongType,true)], newInstance(class java.lang.Long)
+- Range (0, 1, splits=8)

== Physical Plan ==
*Filter <function1>.apply
+- *Range (0, 1, splits=8)

It is only with Datasets to have syntax and analysis checks at compile time (that was not possible using DataFrame, regular SQL queries or even RDDs).

Using Dataset objects turns DataFrames of Row instances into a DataFrames of case classes with proper names and types (following their equivalents in the case classes). Instead of using indices to access respective fields in a DataFrame and cast it to a type, all this is automatically handled by Datasets and checked by the Scala compiler.

If however a logical-operators/[LogicalPlan] is used to <Dataset>>, the logical plan is first[executed] (using the current[SessionState] in the SparkSession) that yields the QueryExecution plan.

A Dataset is <> and Serializable, i.e. can be saved to a persistent storage.

NOTE:[SparkSession] and QueryExecution are transient attributes of a Dataset and therefore do not participate in Dataset serialization. The only firmly-tied feature of a Dataset is the Encoder.

You can request the "untyped" view of a Dataset or access the[RDD] that is generated after executing the query. It is supposed to give you a more pleasant experience while transitioning from the legacy RDD-based or DataFrame-based APIs you may have used in the earlier versions of Spark SQL or encourage migrating from Spark Core's RDD API to Spark SQL's Dataset API.

The default storage level for Datasets is[MEMORY_AND_DISK] because recomputing the in-memory columnar representation of the underlying table is expensive. You can however persist a Dataset.

NOTE: Spark 2.0 has introduced a new query model called[Structured Streaming] for continuous incremental execution of structured queries. That made possible to consider Datasets a static and bounded as well as streaming and unbounded data sets with a single unified API for different execution models.

A Dataset is[local] if it was created from local collections using[SparkSession.emptyDataset] or[SparkSession.createDataset] methods and their derivatives like <>. If so, the queries on the Dataset can be optimized and run locally, i.e. without using Spark executors.

NOTE: Dataset makes sure that the underlying QueryExecution is analyzed and[checked].

[[properties]] [[attributes]] .Dataset's Properties [cols="1,2",options="header",width="100%",separator="!"] !=== ! Name ! Description

! [[deserializer]] deserializer a! Deserializer expressions/[expression] to convert internal rows to objects of type T

Created lazily by requesting the <> to resolveAndBind

Used when:

  • Dataset is <> (for a logical plan in a given SparkSession)

  •[Dataset.toLocalIterator] operator is used (to create a Java Iterator of objects of type T)

  • Dataset is requested to <>

! logicalPlan a! [[logicalPlan]] Analyzed <> with all <> executed and turned into a <>.

[source, scala]

logicalPlan: LogicalPlan

When initialized, logicalPlan requests the <> for analyzed logical plan. If the plan is a <> or a union thereof, logicalPlan <> (using <>).

! planWithBarrier a! [[planWithBarrier]]

[source, scala]

planWithBarrier: AnalysisBarrier

! [[rdd]] rdd a! (lazily-created)[RDD] of JVM objects of type T (as converted from rows in Dataset in the internal binary row format).

[source, scala]

rdd: RDD[T]

NOTE: rdd gives RDD with the extra execution step to convert rows from their internal binary row format to JVM objects that will impact the JVM memory as the objects are inside JVM (while were outside before). You should not use rdd directly.

Internally, rdd creates a deserializer the Dataset's logical plan.

[source, scala]

val dataset = spark.range(5).withColumn("group", 'id % 2) scala> dataset.rdd.toDebugString res1: String = (8) MapPartitionsRDD[8] at rdd at :26 [] // ← extra deserialization step | MapPartitionsRDD[7] at rdd at :26 [] | MapPartitionsRDD[6] at rdd at :26 [] | MapPartitionsRDD[5] at rdd at :26 [] | ParallelCollectionRDD[4] at rdd at :26 []

// Compare with a more memory-optimized alternative // Avoids copies and has no schema scala> dataset.queryExecution.toRdd.toDebugString res2: String = (8) MapPartitionsRDD[11] at toRdd at :26 [] | MapPartitionsRDD[10] at toRdd at :26 [] | ParallelCollectionRDD[9] at toRdd at :26 []

rdd then requests SessionState to[execute the logical plan] to get the corresponding RDD of binary rows.

NOTE: rdd uses <> to[access SessionState].

rdd then requests the Dataset's <> for the expressions/[data type] of the rows (using deserializer expression) and[maps over them (per partition)] to create records of the expected type T.

NOTE: rdd is at the "boundary" between the internal binary row format and the JVM type of the dataset. Avoid the extra deserialization step to lower JVM memory requirements of your Spark application.


=== [[inputFiles]] Getting Input Files of Relations (in Structured Query) -- inputFiles Method

[source, scala]

inputFiles: Array[String]

inputFiles requests <> for optimized logical plan and collects the following logical operators:

inputFiles then requests the logical operators for their underlying files:

=== [[resolve]] resolve Internal Method

[source, scala]

resolve(colName: String): NamedExpression


=== [[isLocal]] Is Dataset Local? -- isLocal Method

[source, scala]

isLocal: Boolean

isLocal flag is enabled (i.e. true) when operators like collect or take could be run locally, i.e. without using executors.

Internally, isLocal checks whether the logical query plan of a Dataset is[LocalRelation].

=== [[isStreaming]] Is Dataset Streaming? -- isStreaming method

[source, scala]

isStreaming: Boolean

isStreaming is enabled (i.e. true) when the logical plan logical-operators/[is streaming].

Internally, isStreaming takes the Dataset's logical-operators/[logical plan] and gives logical-operators/[whether the plan is streaming or not].

=== [[Queryable]] Queryable


Creating DataFrame (For Logical Query Plan and SparkSession)

  sparkSession: SparkSession,
  logicalPlan: LogicalPlan): DataFrame


ofRows is part of Dataset Scala object that is marked as a private[sql] and so can only be accessed from code in org.apache.spark.sql package.

ofRows returns DataFrame (which is the type alias for Dataset[Row]). ofRows uses RowEncoder to convert the schema (based on the input logicalPlan logical plan).

Internally, ofRows[prepares the input logicalPlan for execution] and creates a Dataset[Row] with the current[SparkSession], the QueryExecution and RowEncoder.

ofRows is used when:

  • DataFrameReader is requested to load data from a data source

  • Dataset is requested to execute <>, mapPartitionsInR, <> and <>

  • RelationalGroupedDataset is requested to create a DataFrame from aggregate expressions, flatMapGroupsInR and flatMapGroupsInPandas

  • SparkSession is requested to <>, <>, <>, <> and <>

  • CacheTableCommand, <>, <> and SaveIntoDataSourceCommand logical commands are executed (run)

  • DataSource is requested to writeAndRead (for a CreatableRelationProvider)

  • FrequentItems is requested to singlePassFreqItems

  • StatFunctions is requested to crossTabulate and summary

  • Spark Structured Streaming's DataStreamReader is requested to load

  • Spark Structured Streaming's DataStreamWriter is requested to start

  • Spark Structured Streaming's FileStreamSource is requested to getBatch

  • Spark Structured Streaming's MemoryStream is requested to toDF

=== [[withNewExecutionId]] Tracking Multi-Job Structured Query Execution (PySpark) -- withNewExecutionId Internal Method

[source, scala]

withNewExecutionIdU: U

withNewExecutionId executes the input body action under new execution id.

NOTE: withNewExecutionId sets a unique execution id so that all Spark jobs belong to the Dataset action execution.

=== [[withAction]] Executing Action Under New Execution ID -- withAction Internal Method

[source, scala]

withActionU(action: SparkPlan => U)

withAction requests QueryExecution for the optimized physical query plan and[resets the metrics] of every physical operator (in the physical plan).

withAction requests SQLExecution to execute the input action with the executable physical plan (tracked under a new execution id).

In the end, withAction notifies ExecutionListenerManager that the name action has finished[successfully] or[with an exception].

NOTE: withAction uses <> to access[ExecutionListenerManager].


withAction is used when Dataset is requested for the following:

  • <> (and executing a[logical command] or their Union)

* Dataset operators: <>, <>, <> and <>

=== [[collectFromPlan]] Collecting All Rows From Spark Plan -- collectFromPlan Internal Method

[source, scala]

collectFromPlan(plan: SparkPlan): Array[T]


NOTE: collectFromPlan is used for[Dataset.head],[Dataset.collect] and[Dataset.collectAsList] operators.

=== [[selectUntyped]] selectUntyped Internal Method

[source, scala]

selectUntyped(columns: TypedColumn[, _]*): Dataset[]


NOTE: selectUntyped is used exclusively when <> typed transformation is used.

=== [[sortInternal]] sortInternal Internal Method

[source, scala]

sortInternal(global: Boolean, sortExprs: Seq[Column]): Dataset[T]

sortInternal <> with <> unary logical operator (and the <> as the <>).

[source, scala]

val nums = Seq((0, "zero"), (1, "one")).toDF("id", "name") // Creates a Sort logical operator: // - descending sort direction for id column (specified explicitly) // - name column is wrapped with ascending sort direction val numsSorted = nums.sort('id.desc, 'name) val logicalPlan = numsSorted.queryExecution.logical scala> println(logicalPlan.numberedTreeString) 00 'Sort ['id DESC NULLS LAST, 'name ASC NULLS FIRST], true 01 +- Project [_1#11 AS id#14, _2#12 AS name#15] 02 +- LocalRelation [_1#11, _2#12]

Internally, sortInternal firstly builds ordering expressions for the given sortExprs columns, i.e. takes the sortExprs columns and makes sure that they are <> expressions already (and leaves them untouched) or wraps them into <> expressions with <> sort direction.

In the end, sortInternal <> with <> unary logical operator (with the ordering expressions, the given global flag, and the <> as the <>).

NOTE: sortInternal is used for the <> and <> typed transformations in the Dataset API (with the only change of the global flag being enabled and disabled, respectively).

=== [[withPlan]] Helper Method for Untyped Transformations and Basic Actions -- withPlan Internal Method

[source, scala]

withPlan(logicalPlan: LogicalPlan): DataFrame

withPlan simply uses <> internal factory method to create a DataFrame for the input <> and the current <>.

NOTE: withPlan is annotated with Scala's[@inline] annotation that requests the Scala compiler to try especially hard to inline it.

withPlan is used in untyped transformations

=== [[i-want-more]] Further Reading and Watching