Thursday, 9 July 2015

Distributed Machine Learning Pipelines in Apache Spark

Machine learning pipelines

In machine learning we often want to express some kind of processing pipeline for our data. We load or receive the raw data from some kind of source. We then need to clean the data. We can use several techniques to identify and fix incorrect data points, outliers [6] that could affect the correctness of the result and mend missing data points. The next step may be normalisation of the data which is transformation of values to a common scale [7]. It is often followed by a dimensionality reduction step. Some phenomenons can only be described using a lot of features. This may have negative impact on performance of the machine learning exercise. We may want to reduce the number of features (dimensions) that describe each data point and thus reducing complexity and removing random noise and information that is not important for our use case. But at the same time it should keep the important information, knowledge and patterns encoded in the new representation [8]. Another step coupled with the previous one is feature extraction where we try to build derived data from the original set with intention to get the most informative and non redundant set of data that desribes the data for our use case [9]. A step we should not forget is the training of a machine learning model [10]. Finally we want to used the trained model to process new, unknown data points.
Each of the phases above can be accomplished using many different techniques known from machine learning, statistics, computer vision, pattern recognition, data mining and artificial intelligence literature. Importantly, some approaches play well with others and improve their performance while others don't. This differs between particular use cases and may be difficult to estimate without proper tests and error estimation. We may therefore want to substitute some steps in the pipeline or at least their parameters to improve performance. Another important point is that we want the data to go through the same transformation steps at several ocassions. At least during training of the machine learning model and then testing. Often we want to split the available data to training / testing / validation [11][12] datasets. We may want to split the data differently multiple times using for example cross validation to avoid overfitting [13], find the best model and pipeline and to optimise both parameters of the model and hyperparameters of the learning algorithm. Both the sets need to go through the same pipeline so they are presented in the same form to the model, otherwise obviously the model would not know how to handle them. And finally we may want to do this multiple times with different steps in the pipeline and different parameters of each step to try to figure out the best possible model for the use case. The same pipeline musth then be applied to real life use of the model on new data. A machine learning pipeline would look something like picture below. The top pipeline represents training, the bottom one validation. Multiple models, in this case decision trees, support vector machines and neural networks are displayed to remind that different models with different parameters as well as different stages in the pipeline exist.
spark-pipeline
For these reasons it is very convenient to be able to express the machine learning pipeline in a program, including the ability to chain steps, substitute steps and change parameters.

Spark Machine Learning API

The authors of Spark provided this sort of functionality to the framework's users. When I looked at the examples and explanations (see sources in the beginning) I thought it looked great and that it would be possible to use all the algorithms that already exist in MLLib [14]. It is unfortunately not entirely true as I will try to describe. 

Types and DataFrames

The machine learning Pipeline works with DataFrames [15] (formerly SchemaRDD). It is therefore tighly connected to Spark SQL. It provides convenient syntax for people who may not know Scala or the Spark's RDD abstraction, including SQL queries and more [16].
For me however as a Scala programmer and functional programming enthusiast the first thing that struck me was the lack of strong static types. DataFrame is essentially a table with columns that have types and names. The types are just some objects (DoubleType, StringType, ...) and the names of columns are just Strings. If you print a schema of a DataFrame it may look like the below. 
12// StructType(StructField(result,DoubleType,true))
It means you can do String transformations on Double typed columns. Or you can do transformations on columns that do not even exist. And the program will compile and run. There are some additional runtime checks and validations, but that does not compensate the missing typing. 
Consider the Spark example below. It uses Spark's RDDs [17]. RDDs are strongly statically typed and are therefore compile time type checked. The code reads a file, splits each line to three numbers (representing features (here represented by a matrix) and uses that as an input to a machine learning model (in this case a Neural Network)) and applies forwardRun() function that runs input through a trained Neural Network and produces result - I may discuss this code in more depth sometimes in another blog post - but for purpose of this one, let's just assume it is a function from some parameters to Double.
1234567891011val sc = new SparkContext("local", "Neural Network")

val result = sc.textFile("src/main/resources/data.csv", 3)
  .map { l =>
    val splits = l.split(",")
    val features = splits.map(_.toDouble)

    new DenseMatrix(3, 1, Array(features(0), features(1), features(2)))
  }
  .map(in => forwardRun(topology, in, weights))
The strong typing means that we have RDD[A] and after the map transformation we have RDD[B] and the next phase works with that. If the types don't match the compiler will emit an error. The high level API of Spark RDDs looks as if we were working with Akka Streams[18] or simply collections. But the semantics of what is going on in the background are very different. It does not run on a single thread, but partitions and distributes the computation of forwardRun function across Spark workers, be it in different threads or even distributed on different machines as sketched below.
distributed
We can then take the result and run some more computations on it. In the snippet below we just create DataFrame from the result of above code. Using toDF() function we create a DataFrame with single column called "result". As you can see we can filter on this "result" column in resultDF (which is Double) by comparing it to a String a then select (select is a projection, i.e. map(), just uses sql like naming convention) again by adding a String to the Double result. This compiles just fine and actually also runs fine, just does not return any result. But it should not, because in both cases we are comparing String to a Double.
12345678val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

val resultDF = result.toDF("result")
resultDF
 .filter(resultDF("result") > "String")
 .select(resultDF("result") + "String")
Another option to query data using Spark is to use SparkSQL. First we need to register a table called results. The code below is simply a SQL query which is parsed and executed again on the result above. This time a bit more complex - we use a nested query - but other than that it is very similar to previous example. Again we compare the result with a String and it will compile just fine (the query is just a string after all so it is only checked by Spark's internal SQL parser and not by the compiler). 
12345678resultDF.registerTempTable("results")
val filtered3 = sqlContext.sql(
 "SELECT result + \"String\" " +
 "FROM (" +
   "SELECT result " +
   "FROM results) r " +
 "WHERE r.result >= \"String\"")
Not using strong types may seem like a bad decision. But apparently this was thoroughly discussed among the developers of Spark and the community. And although not everyone will like this controversial decision there are very strong points that support it and it actually makes a lot of sense.
Firstly, many people using this API are people without knowledge of Scala and Spark's internals - i.e. data scientists, statisticians, business analysts etc. They often have experience with tools like R, Matlab or Python instead. It is obvious that they prefer using something similar and not have to work with RDDs. And second and in my opinion more relevant reason is the Spark's optimiser which I will talk about in depth in my next post.

Implementing your own machine learning pipeline

Pipeline consists of multiple differenct types of steps Transformers and Estimators. Transformers transforma a DataFrame to another and Estimators usually fit (train) a model and produce a Transformer representing the trained machine learning model. The programming guide is [19]
Many machine learning approaches in all phases of the machine learning pipeline are already implemented in Spark's MLlib in ml package, including various preprocessing steps, machine learning models or validation approaches such as cross validation. Unfortunately the pipeline can not just use algorithms that already exist in MLlib and they need to be manually implemented by creating some kind of wrapper around them that integrates them into the pipeline. What was a bit surprising for me to see was (in addition to the absence of compile time type checks) that the pipeline code feels quite heavy with a lot of boilerplate. Although the code in the end makes sense I could not stop thinking that there must be a simpler or more elegant way to achieve this functionality. The Spark team anounced they focus more on the functionality, quality and precision of what the code does rather than on the code itself which makes sense, but the two are not mutually exclusive. When I started using the pipeline which was I believe Spark 1.2 very little was implemented and it was not possible to use the pipeline without having to implement a lot manually (which was also very difficult due to a lot of things being not accessible outside Spark due to their private access modifiers). The situation seems to be improving very quickly with new versions. Many algorithms are now implemented in the pipelining API and new ones are being added with each release.
Just to illustrate why I think the implementation feels heavy, let's have a look at an extremely simple Transformer implemented in Spark called Binarizer. It just compares each value in a column with given threshold and returns 1 if the value is higher than the threshold or 0 otherwise. The usage of this Transformer is reasonably convenient. In the below code we distribute a dataset created from an array. Then the binarizer transformation is applied by calling the transform() method and the result retrieved. Another option would be to use the binarizer in a pipeline with other transformations and machine learning models defined as ordered stages as shown on the commented line. xxx
1234567891011121314val data = Array(0.1, -0.5, 0.2, -0.3, 0.8, 0.7, -0.1, -0.4)
val dataFrame: DataFrame = sqlContext.createDataFrame(data.zip(defaultBinarized)).toDF("feature"))

val binarizer: Binarizer = new Binarizer()
  .setInputCol("feature")
  .setOutputCol("binarized_feature")

binarizer
  .transform(dataFrame)
  .select("binarized_feature")
  .collect()

// val pipeline = new Pipeline().setStages(Array(binarizer, ..., ...)).fit(dataFrame)
The setInputCol() and setOutputCol() methods specify which columns are used as input and output. Therefore we need to use the same string in toDF(), setInputCol() and in setOutputCol() and select(). What happens if we "accidentally" call setInputCol("features") instead? We get a  java.lang.IllegalArgumentException: Field "features" does not exist. at runtime. Similarly if data was Array[String] instead of Array[Double] the result would be  java.lang.IllegalArgumentException: requirement failed: Column feature must be of type DoubleType but was actually StringType.. These are informative errors, but are still at runtime which has all the expected disadvantages including having to run the code to find if it is correct, slower development and increased bug probability.
As mentioned using the binarizer and defining a machine learning pipeline is actually very easy to use. Let's see how the actual code of the Binarizer looks like if you wanted to write your own transformer.
1234567891011121314151617181920212223242526272829303132333435final class Binarizer(override val uid: String) extends Transformer with HasInputCol with HasOutputCol {

  def this() = this(Identifiable.randomUID("binarizer"))
  val threshold: DoubleParam =
    new DoubleParam(this, "threshold", "threshold used to binarize continuous features")
  def getThreshold: Double = $(threshold)
  def setThreshold(value: Double): this.type = set(threshold, value)
  setDefault(threshold -> 0.0)
  def setInputCol(value: String): this.type = set(inputCol, value)
  def setOutputCol(value: String): this.type = set(outputCol, value)

  override def transform(dataset: DataFrame): DataFrame = {
    transformSchema(dataset.schema, logging = true)
    val td = $(threshold)
    val binarizer = udf { in: Double => if (in > td) 1.0 else 0.0 }
    val outputColName = $(outputCol)
    val metadata = BinaryAttribute.defaultAttr.withName(outputColName).toMetadata()
    dataset.select(col("*"),
      binarizer(col($(inputCol))).as(outputColName, metadata))
  }

  override def transformSchema(schema: StructType): StructType = {
    SchemaUtils.checkColumnType(schema, $(inputCol), DoubleType)
    val inputFields = schema.fields
    val outputColName = $(outputCol)
    require(inputFields.forall(_.name != outputColName),
      s"Output column $outputColName already exists.")
    val attr = BinaryAttribute.defaultAttr.withName(outputColName)
    val outputFields = inputFields :+ attr.toStructField()
    StructType(outputFields)
  }
  override def copy(extra: ParamMap): Binarizer = defaultCopy(extra)
}

This in my opinion contains a lot of boilerplate which makes it quite difficult to understand. First parameters are defined, e.g. threshold and then getters and setters for them. The transformSchema() method defines changes to the DataFrame schema during the transformation. In this case it simply adds new column with binary type and with name specified by setOutputCol(). This simple transformation required 7 lines of quite blurred code.  The most important method is however transform(). It calls transform schema and applies the binarization operation (comparison with threshold) and fills the new column. Again, in my opinion it is not very intuitive, especially the last two lines of the method which may be quite difficult for someone who does not know how the DataFrames work. More complex transformators have of course more complex code and Estimators define not only the logic, but also often the Model and Params making them even more complex. 

Exercise prediction

We are building an R&D project called Muvr [20]. It uses data from sensors from wearable devices such as accelerometer in a watch or heartbeat monitor to classify exercises in near real time. A user should see the result of classification almost immediately after the exercise at a gym is finished. An example classification would be “12 bicep curls”. It does real time classification in the mobile devices and in Akka Cluster distributed domain and it uses Apache Spark for more expensive analytics and machine learning tasks such as suggests improvements to user’s exercise routine, tracks compliance to exercise plan, suggests improvement to quality of individual exercises, classify users into groups using some similarity metric or most importantly improve machine learning models for better real time exercise classification and start using it immediately. Spark reads data event sourced by Akka to Cassandra using CQRS approach. I briefly covered some of that in one of my previous posts [21]. Muvr is an exciting project, but this blog post will only focus on use of Spark's machine learning pipelines in it.
Below is an example of pipeline computing linear regression predicting intensity of future exercise that we implemented in our Muvr project using the pipelining API where we defined some of our own stages. We create the individual stages and use them in certain order. First we filter users that we want, normalize the data and then extract features that we use for the training from the raw data. We construct this pipeline using new Pipeline().setStages(). Then by using fit() we train the linear regression model using training data. And finally we use the same machine learning pipeline to preprocess new data and use transform() method on the model to get prediction. If you are interested you can compare the API with pure MLlib implementation withouth the ML pipelines which uses RDDs [22]
1234567891011121314151617181920212223242526272829303132
val userFilter = new UserFilter()
val normalizer = new ZScoreNormalizer()
val intensityFeatureExtractor = 
                 new IntensityFeatureExtractor()
val intensityPredictor = new LinearRegression()
    .setLabelCol("label")
    .setFeaturesCol("features")
    .setPredictionCol("predictions")

implicit val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

val events = sc.eventTable().cache().toDF()

val pipeline = new Pipeline().setStages(Array(
  userFilter,
  normalizer,
  intensityFeatureExtractor,
  intensityPredictor
))

getEligibleUsers(events, sessionEndedBefore)
   .map { user => 
     val model = pipeline.fit(
       events, 
       ParamMap(ParamPair(userIdParam, user)))
     val testData = //prepare test data
     val predictions = model.transform(testData)
     submitResult(userId, predictions, config)
   }

Result of such prediction may look like the picture below where the blue dots represent known exercise intensities, the red line represents fitted linear regression and it continues further to the right past the last known intensity and we predict that the next intensity will lie on the red line.

Conclusion

DataFrames became the recommended way to work with data in Spark. I personally prefer RDDs (and strong static compile time type checks!) and I would like to see them used more, for example in the pipelining API. RDDs also provide more control over the underlying operations and they are increasingly becoming the low level (although they are very high level abstraction themselves!) representation to which DataFrames compile. Unfortunately it was confirmed that there are no plans at the moment to implement a strongly typed machine learning pipelines.
But there are valid reasons why the DataFrame approach was chosen over RDDs, including a speed improvement caused by more applicable optimiser rules. I will focus on Spark's optimiser and the reasons why it works well with DataFrames in my next blog post. I cannot avoid the feeling that the implementation code of Spark's ML for machine learning pipelining is just too heavy. But as demonstrated it is actually very simple, convenient and powerful to use which is the most important thing for most people and use cases and they serve their purpose very well.

No comments:

Post a Comment