ML Pipeline Models

Model abstract class is a Transformer with the optional Estimator that has produced it (as a transient parent field).

model: DataFrame =[predict]=> DataFrame (with predictions)
An Estimator is optional and is available only after fit (of an Estimator) has been executed whose result a model is.

As a Transformer it takes a DataFrame and transforms it to a result DataFrame with prediction column added.

There are two direct implementations of the Model class that are not directly related to a concrete ML algorithm:

PipelineModel

PipelineModel is a private[ml] class.

PipelineModel is a Model of Pipeline estimator.

Once fit, you can use the result model as any other models to transform datasets (as DataFrame).

A very interesting use case of PipelineModel is when a Pipeline is made up of Transformer instances.

// Transformer #1
import org.apache.spark.ml.feature.Tokenizer
val tok = new Tokenizer().setInputCol("text")

// Transformer #2
import org.apache.spark.ml.feature.HashingTF
val hashingTF = new HashingTF().setInputCol(tok.getOutputCol).setOutputCol("features")

// Fuse the Transformers in a Pipeline
import org.apache.spark.ml.Pipeline
val pipeline = new Pipeline().setStages(Array(tok, hashingTF))

val dataset = Seq((0, "hello world")).toDF("id", "text")

// Since there's no fitting, any dataset works fine
val featurize = pipeline.fit(dataset)

// Use the pipelineModel as a series of Transformers
scala> featurize.transform(dataset).show(false)
+---+-----------+------------------------+--------------------------------+
|id |text       |tok_8aec9bfad04a__output|features                        |
+---+-----------+------------------------+--------------------------------+
|0  |hello world|[hello, world]          |(262144,[71890,72594],[1.0,1.0])|
+---+-----------+------------------------+--------------------------------+

PredictionModel

PredictionModel is an abstract class to represent a model for prediction algorithms like regression and classification (that have their own specialized models - details coming up below).

PredictionModel is basically a Transformer with predict method to calculate predictions (that end up in prediction column).

PredictionModel belongs to org.apache.spark.ml package.

import org.apache.spark.ml.PredictionModel

The contract of PredictionModel class requires that every custom implementation defines predict method (with FeaturesType type being the type of features).

predict(features: FeaturesType): Double

The direct less-algorithm-specific extensions of the PredictionModel class are:

As a custom Transformer it comes with its own custom transform method.

Internally, transform first ensures that the type of the features column matches the type of the model and adds the prediction column of type Double to the schema of the result DataFrame.

It then creates the result DataFrame and adds the prediction column with a predictUDF function applied to the values of the features column.

FIXME A diagram to show the transformation from a dataframe (on the left) and another (on the right) with an arrow to represent the transformation method.

Enable DEBUG logging level for a PredictionModel implementation, e.g. LinearRegressionModel, to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.ml.regression.LinearRegressionModel=DEBUG

Refer to Logging.

ClassificationModel

ClassificationModel is a PredictionModel that transforms a DataFrame with mandatory features, label, and rawPrediction (of type Vector) columns to a DataFrame with prediction column added.

A Model with ClassifierParams parameters, e.g. ClassificationModel, requires that a DataFrame have the mandatory features, label (of type Double), and rawPrediction (of type Vector) columns.

ClassificationModel comes with its own transform (as Transformer) and predict (as PredictionModel).

The following is a list of the known ClassificationModel custom implementations (as of March, 24th):

  • ProbabilisticClassificationModel (the abstract parent of the following classification models)

    • DecisionTreeClassificationModel (final)

    • LogisticRegressionModel

    • NaiveBayesModel

    • RandomForestClassificationModel (final)

RegressionModel

RegressionModel is a PredictionModel that transforms a DataFrame with mandatory label, features, and prediction columns.

It comes with no own methods or values and so is more a marker abstract class (to combine different features of regression models under one type).

LinearRegressionModel

LinearRegressionModel represents a model produced by a LinearRegression estimator. It transforms the required features column of type org.apache.spark.mllib.linalg.Vector.

It is a private[ml] class so what you, a developer, may eventually work with is the more general RegressionModel, and since RegressionModel is just a marker no-method abstract class, it is more a PredictionModel.

As a linear regression model that extends LinearRegressionParams it expects the following schema of an input DataFrame:

  • label (required)

  • features (required)

  • prediction

  • regParam

  • elasticNetParam

  • maxIter (Int)

  • tol (Double)

  • fitIntercept (Boolean)

  • standardization (Boolean)

  • weightCol (String)

  • solver (String)

(New in 1.6.0) LinearRegressionModel is also a MLWritable (so you can save it to a persistent storage for later reuse).

With DEBUG logging enabled (see above) you can see the following messages in the logs when transform is called and transforms the schema.

16/03/21 06:55:32 DEBUG LinearRegressionModel: Input schema: {"type":"struct","fields":[{"name":"label","type":"double","nullable":false,"metadata":{}},{"name":"features","type":{"type":"udt","class":"org.apache.spark.mllib.linalg.VectorUDT","pyClass":"pyspark.mllib.linalg.VectorUDT","sqlType":{"type":"struct","fields":[{"name":"type","type":"byte","nullable":false,"metadata":{}},{"name":"size","type":"integer","nullable":true,"metadata":{}},{"name":"indices","type":{"type":"array","elementType":"integer","containsNull":false},"nullable":true,"metadata":{}},{"name":"values","type":{"type":"array","elementType":"double","containsNull":false},"nullable":true,"metadata":{}}]}},"nullable":true,"metadata":{}}]}
16/03/21 06:55:32 DEBUG LinearRegressionModel: Expected output schema: {"type":"struct","fields":[{"name":"label","type":"double","nullable":false,"metadata":{}},{"name":"features","type":{"type":"udt","class":"org.apache.spark.mllib.linalg.VectorUDT","pyClass":"pyspark.mllib.linalg.VectorUDT","sqlType":{"type":"struct","fields":[{"name":"type","type":"byte","nullable":false,"metadata":{}},{"name":"size","type":"integer","nullable":true,"metadata":{}},{"name":"indices","type":{"type":"array","elementType":"integer","containsNull":false},"nullable":true,"metadata":{}},{"name":"values","type":{"type":"array","elementType":"double","containsNull":false},"nullable":true,"metadata":{}}]}},"nullable":true,"metadata":{}},{"name":"prediction","type":"double","nullable":false,"metadata":{}}]}

The implementation of predict for LinearRegressionModel calculates dot(v1, v2) of two Vectors - features and coefficients - (of DenseVector or SparseVector types) of the same size and adds intercept.

The coefficients Vector and intercept Double are the integral part of LinearRegressionModel as the required input parameters of the constructor.

LinearRegressionModel Example

// Create a (sparse) Vector
import org.apache.spark.mllib.linalg.Vectors
val indices = 0 to 4
val elements = indices.zip(Stream.continually(1.0))
val sv = Vectors.sparse(elements.size, elements)

// Create a proper DataFrame
val ds = sc.parallelize(Seq((0.5, sv))).toDF("label", "features")

import org.apache.spark.ml.regression.LinearRegression
val lr = new LinearRegression

// Importing LinearRegressionModel and being explicit about the type of model value
// is for learning purposes only
import org.apache.spark.ml.regression.LinearRegressionModel
val model: LinearRegressionModel = lr.fit(ds)

// Use the same ds - just for learning purposes
scala> model.transform(ds).show
+-----+--------------------+----------+
|label|            features|prediction|
+-----+--------------------+----------+
|  0.5|(5,[0,1,2,3,4],[1...|       0.5|
+-----+--------------------+----------+

RandomForestRegressionModel

RandomForestRegressionModel is a PredictionModel with features column of type Vector.

Interestingly, DataFrame transformation (as part of Transformer contract) uses SparkContext.broadcast to send itself to the nodes in a Spark cluster and calls calculates predictions (as prediction column) on features.

KMeansModel

KMeansModel is a Model of KMeans algorithm.

It belongs to org.apache.spark.ml.clustering package.

// See spark-mllib-estimators.adoc#KMeans
val kmeans: KMeans = ???
val trainingDF: DataFrame = ???
val kmModel = kmeans.fit(trainingDF)

// Know the cluster centers
scala> kmModel.clusterCenters
res0: Array[org.apache.spark.mllib.linalg.Vector] = Array([0.1,0.3], [0.1,0.1])

val inputDF = Seq((0.0, Vectors.dense(0.2, 0.4))).toDF("label", "features")

scala> kmModel.transform(inputDF).show(false)
+-----+---------+----------+
|label|features |prediction|
+-----+---------+----------+
|0.0  |[0.2,0.4]|0         |
+-----+---------+----------+