Distributed Machine Learning Using PySpark

Introduction

Big Data is not a new thing nowadays, and numerous platforms and applications have been created to take advantage of Big Data. Apache Spark is one of the most popular platforms for handling Machine Learning with Big Data.  By implicitly distribute data into clusters, Spark enables developers to focus more on their analytical tasks without worrying about data parallelism under the hood. To learn how Apache Spark works and how to build a distributed ML pipeline, a local cluster on my desktop is implemented with PySpark. In this post, I will talk about how to make ML pipelines with PySpark and its automated parallel-computing functionality. 

Problem Definition & Data Ingestion

One of the most exciting ML applications would be predictive modeling, which utilizes the data collected in the past to train a model that is capable of predicting the future. A predictive model can be a regression model or a classification model. For the demonstration, I will use Apache Spark to build a predictive classification model. The dataset I use is collected from The U.S. Department of Transportation’s (DOT) Bureau of Transportation Statistics, which contains more than 5 million records of the flights’ information during 2015, including the carrier of the airline, the original airport, destination airport, the distance of the flight, the time spent, the departure delay time, and arrival delay time, etc. I will use this dataset to train a model to predict if a flight will be delayed or not. 

Data Preparation

In order to do some basic EDA, the dataset needs to be preprocessed. The following code sets up a Spark working environment (Spark Session) and loads the data into the Spark Session as a Spark DataFrame:

# instantiate a spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]')
                            .appName('flights_delay')
                            .config('spark.driver.memory', '8g')
                            .getOrCreate()
# load raw data
df = spark.read.csv('/home/shihao/Desktop/flights/flights.csv', header=True,
                    inferSchema=True, nullValue=' ')
# clean data set
to_keep = ['MONTH', 'DAY', 'DAY_OF_WEEK', 'AIRLINE', 'FLIGHT_NUMBER',
           'ORIGIN_AIRPORT', 'DESTINATION_AIRPORT', 'TAXI_OUT',
           'SCHEDULED_DEPARTURE', 'DEPARTURE_DELAY', 'DISTANCE',
           'SCHEDULED_ARRIVAL', 'ARRIVAL_DELAY']
df = df.select(to_keep)
df = df.dropna()

 

When instantiate the Spark session in PySpark, passing 'local[*]' to .master() sets Spark to use all the available devices as executor (8-core CPU hence 8 workers). The original dataset has 31 columns, here I only keep 13 of them, since some columns cannot be acquired beforehand for the prediction, such as the wheels-off time and tail number.

After selecting all the useful columns, drop all the null values in the Spark DataFrame. Since only roughly 20% of the total flights are delayed (flights are delayed if the actual arrival time is more than 15 minutes later than the scheduled arrival time), use the following code to create the label column. The following code also balances the dataset, specifically, makes the number of delayed samples equal to the number of on-time samples.

# create label
df = df.withColumn('label', (df.ARRIVAL_DELAY > 15).cast('integer'))
# balance data set
# get number of delayed flights
num_0 = df.filter(df.label==0).count()
# calculate the sub-sampling ratio for the on-time flights
ratio = (df.count() - num_0)/num_0
# under sample the redundant class
df = df.sampleBy('label', {0: ratio, 1:1})

Explanatory Data Analysis

Now the dataset is clean and balanced, we can do some EDA to examine the dataset. First, take a look at how each airline is performing in terms of delivering flights on time. As shown in Figure 1 (left), Frontier Airlines (F9) has the highest average delay time (68.58 minutes) among 14 airline companies and Hawaiian Airlines (HA) has the lowest average delay time (40.98 minutes). 

Figure 1. The sorted average arrival delay and maximal delay of each airline.
Figure 1. The sorted average arrival delay and maximal delay of each airline.

On the other hand, American Airlines (AA) has the most extended delay with 1546 mins \approx 25.77 hrs, and Virgin America (VX) has the shorted maximal delay with 440 mins \approx 7.33 hrs as shown in Figure 1 (right).

Another interesting fact is illustrated in Figure 2, seems like all the airlines are doing pretty poorly around February and June, and doing better in April, September, and October. Therefore, Month should be a useful indicator for our predictive model. 

Figure 2. Monthly average arrival difference of each airline.
Figure 2. Monthly average arrival difference of each airline.

How about other features? Such as the distance of the flight. Does it have some relationship with the arrival timing? Figure 3 shows the average arrival difference for each airline, along with different flight distances. Arrival difference is defined as the difference between the scheduled arrival time and the actual arrival time. 

Figure 3. Average arrival difference plotted along distance for each airline.
Figure 3. Average arrival difference plotted along distance for each airline.

While there are a lot of outliers in Figure 3, we can still see a small trend that longer distance tends to have longer arrival difference. Figure 4. shows the relationship between arrival difference and taxi-out time, where we can see a more apparent linear relationship. 

Figure 4. Average arrival difference plotted along taxi-out time.
Figure 4. Average arrival difference plotted along taxi-out time.

Finally, as shown in Figure 5, departure-delay is the best predictor for our model.

Figure 5. Average arrival difference plotted along departure-delay.
Figure 5. Average arrival difference plotted along departure-delay.

Data Segregation

Now that the data-preprocessing and EDA are done, we can go ahead and build the model. I created three simple ML models with the dataset and compare their performances. Since the problem is to predict whether a flight will be delayed or not, which is a binary classification problem, I choose the Random Forest Classification model, Logistic Regression model, and Neural Network model, to implement the predictive model. 

First of all, split the dataset into training and test set before building ML pipelines to avoid leakage. Leakage happens when any of the .fit() methods are applied to the test set, which may induce unneglectable bias to our model.  Therefore, extra attention should be paid to only apply .transform() method to the test set. The dataset can be split in PySpark using DataFrame.randomSplit([training_ratio, test_ratio], random_state) method as shown here:

# create training and test set
flights_train, flights_test = df.randomSplit([.8, .2], 5)

Model training

When the data is ready, we can begin to build our machine learning pipeline and train the model on the training set. PySpark provides us powerful sub-modules to create fully functional ML pipeline object with the minimal code. Before putting up a complete pipeline, we need to build each individual part in the pipeline.

First, convert all the categorical data in the dataset into numerical data since the underlying machine is more familiar with numbers than texts. Similar to pandas.get_dummies(), use StringIndexer() combined with OneHotEncoderEstimator() sub-module in PySpark to convert all the categorical data into numerical data. For the flights dataset, there are three categorical columns, including AIRLINE, ORIGINAL_AIRPORT, and DESTINATION_AIRPORT. Therefore, construct three StringIndexer() objects, like so: 

# create parts
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator
arr_indexer = StringIndexer(inputCol='AIRLINE', outputCol='arr_idx').setHandleInvalid("keep")
org_indexer = StringIndexer(inputCol='ORIGIN_AIRPORT', outputCol='org_idx').setHandleInvalid("keep")
des_indexer = StringIndexer(inputCol='DESTINATION_AIRPORT', outputCol='des_idx').setHandleInvalid("keep")
onehot = OneHotEncoderEstimator(inputCols=[arr_indexer.getOutputCol(), org_indexer.getOutputCol(),
                                           des_indexer.getOutputCol()],
                                outputCols=['arr_dummy', 'org_dummy', 'des_dummy'])

 

The OneHotEncoderEstimator() object will convert all the categorical columns into their corresponding numerical columns with sparse vector representations. Basically, a sparse vector representation is a more efficient way to represent a vector if the majority of the vector is zero values. 

Next, merge all the sparse vectors and other numerical values in the dataset into a single sparse vector by concatenating them one by one and call this new vector “features”, which can be done using the PySpark VectorAssembler() sub-module:

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['MONTH', 'DAY', 'DAY_OF_WEEK',
                                       'arr_dummy', 'org_dummy', 'des_dummy',
                                       'SCHEDULED_DEPARTURE','TAXI_OUT',
                                       'DEPARTURE_DELAY', 'DISTANCE',
                                       'SCHEDULED_ARRIVAL'],
                            outputCol='features')

 

Everything so far should be identical for all the three ML models since they all share the same dataset. After this point, however, the workflow will be slightly different for different models in terms of hyperparameter-tuning. For Random Forest Classifier and Logistic Regression Classifier, instantiate the ML model and attach it to the end of our pipeline:

from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
rf = RandomForestClassifier(labelCol='label', featuresCol='features', 
                            numTrees=30, maxDepth=30)
pipeline = Pipeline(stages=[arr_indexer, org_indexer, des_indexer,
                            onehot, assembler, rf])

 

For Logistic Regression Model, switch to the following lines of code to instantiate the pipeline:

from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
lr = LogisticRegression(maxIter=30)
pipeline = Pipeline(stages=[arr_indexer, org_indexer, des_indexer, onehot, assembler, lr])

 

During training, it is common practice to implement K-fold Cross-validation and Grid Search. Fortunately, PySpark provides sub-modules for both of them as well. To build a parameter-searching grid, use the ParamGridBuilder() sub-module and pass the parameter map into the CrossValidator(). The code for the Random Forest Classifier is shown here:

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
param = ParamGridBuilder().build()
cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=params,
                    evaluator=evaluator,
                    numFolds=5)

 

Do not pass any parameters when building the parameter map for Random Forest (RF) Classifier since its parameters are more straight forward and easy to interpret. The hyperparameters for RF are initialized to the best case when instantiating the RF object previously. Specifically, numTrees stands for the number of single decision trees make up the RF classifier and maxDepth stands for the maximal depth for each tree in the forest. RF does not require a lot of tuning since the higher the values of these two parameters, the better the performance of the model in terms of test-accuracy. On the other hand, of course, with more, deeper trees, the training process takes linearly more time.

Whereas for Logistic Regression (LR) Classifier, build the parameter grid by doing the following:

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
params = ParamGridBuilder()
params = params.addGrid(lr.regParam, [.01, .1, 1, 10]) 
               .addGrid(lr.elasticNetParam, [0, .5, 1])
params = params.build()
cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=params,
                    evaluator=evaluator,
                    numFolds=5)

 

As shown in the code, there are two parameters to be tuned for LR classifier. lr.regParam is the regularization coefficient which specifies that, in the range of 0.01 to 10, how much the model should be regularized. The bigger the number, the more the model is regularized. Meanwhile, lr.elasticNetParam specifies the way the model is regularized, 0 means it’s an L2 penalty and 1 means it’s an L1 penalty. And 0.5 means the penalty is a combination of both L1 and L2.

For Neural Network Classifier, the story is a little bit different. Since Spark Mllib does not have predefined models for ANNs, instead, use keras (with tensorflow as backend) to build the ANN model for comparison. Also, keras Dense layer does not take sparse vectors as input, therefore just convert the dataset in a conventional way. In other words, creating dummy variables for categorical data:

import pandas as pd
from sklearn.model_selection import train_test_split
from keras.utils import to_categorical
# convert Spark DataFrame into Pandas DataFrame
df_pd = df.toPandas()
# create feature and label set
X = df_pd.drop(['label', 'ARRIVAL_DELAY'], axis=1)
y = df_pd.label
# convert categorical data into numerical data
X = pd.get_dummies(X)
# split into training and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2,
                                                    random_state=5)
# reshape the dataset to fit ANNs
X_train = X_train.values[...,None]
X_test = X_test.values[...,None]
# convert the label into categorical data
y_train = to_categorical(y_train.values)
y_test = to_categorical(y_test.values)

 

Use the following code the initialize the ANN model:

from keras.models import Sequential
from keras.layers import Dense
from keras.layers import Flatten
from keras.layers import BatchNormalization
ann = Sequential()
ann.add(Dense(10, activation='relu', input_shape=(X_train.columns.size, 1,)))
ann.add(BatchNormalization())
ann.add(Flatten())
ann.add(Dense(2, activation='softmax'))
ann.compile(optimizer='adam', metrics=['accuracy'], loss='categorical_crossentropy')

 

A simple ANN structure that only has one hidden layer with ten hidden nodes is modeled for comparison. After the hidden layer, a BatchNormalization() layer normalizes the output of the hidden layer. Since the numerical scales of the features are different, the normalization layer makes it easier for the model to converge.

The last step is to train our models. For ANN, call the following line of code: 

from keras.callbacks import ModelCheckpoint
model_path = 'ann_keras_model'
model_check = ModelCheckpoint(model_path, save_best_only=True)
history = ann.fit(X_train, y_train,
                  batch_size = 200, epochs = 25,
                  validation_split = 0.2,
                  callbacks=[model_check])

 

Add a callback function for the training so that only the best model during training will be saved. For the Random Forest and Logistic Regression models, simply call the following line for training:

cv = cv.fit(flights_train)

Model Evaluation

For a binary classification problem, it is common practice to evaluate the performance of the model by plotting the ROC (Receiver Operating Characteristic) curve or calculating precision, recall, and F1-scores. For Random Forest classifier and Logistic Regression model, use the following code to calculate the AUC (area under ROC curve), precision, and recall, etc:

from pyspark.ml.evaluate import BinaryClassificationEvaluator
# get the prediction
prediction = cv.transform(flights_test)
# initialize the evaluator
evaluator = BinaryClassificationEvaluator()
# calculate AUC
auc = evaluator.evaluate(prediction, {evaluator.metricName: 'areaUnderROC'})
print('AUC: %0.3f' % auc)
# compute TN, TP, FN, and FP
prediction.groupBy('label', 'prediction').count().show()
# Calculate the elements of the confusion matrix
TN = prediction.filter('prediction = 0 AND label = prediction').count()
TP = prediction.filter('prediction = 1 AND label = prediction').count()
FN = prediction.filter('prediction = 0 AND label <> prediction').count()
FP = prediction.filter('prediction = 1 AND label <> prediction').count()
# calculate accuracy, precision, recall, and F1-score
accuracy = (TN + TP) / (TN + TP + FN + FP)
precision = TP / (TP + FP)
recall = TP / (TP + FN)
F =  2 * (precision*recall) / (precision + recall)
print('n precision: %0.3f' % precision)
print('n recall: %0.3f' % recall)
print('n accuracy: %0.3f' % accuracy)
print('n F1 score: %0.3f' % F)

 

Furthermore, take a look at the parameters of the best Logistic Regression model selected by Grid Search:

For the Logistic Regression model, the Grid Search chooses to have a combination of L1 and L2 penalty to get the best performance. And it also chooses to use the least regularization (0.01). For the Neural Network model, Figure 6 (left) shows the training history about training loss, validation loss, training accuracy, and validation accuracy. The training takes 15 seconds for a single epoch on a GPU implemented device. The right-hand figure shows the ROC curves for the three models.

Figure 6. Training history (left) for ANN and ROC curves (right).
Figure 6. Training history (left) for ANN and ROC curves (right).

Call .transform() method to the held-out test set to further evaluate these models :

from pyspark.ml.evaluation import BinaryClassificationEvaluator
# get predictions
prediction = cv.transform(flights_test)
# Calculate the elements of the confusion matrix
TN = prediction.filter('prediction = 0 AND label = prediction').count()
TP = prediction.filter('prediction = 1 AND label = prediction').count()
FN = prediction.filter('prediction = 0 AND label <> prediction').count()
FP = prediction.filter('prediction = 1 AND label <> prediction').count()
# show confusion matrix
prediction.groupBy('label', 'prediction').count().show()
# calculate metrics by the confusion matrix
accuracy = (TN + TP) / (TN + TP + FN + FP)
precision = TP / (TP + FP)
recall = TP / (TP + FN)
F =  2 * (precision*recall) / (precision + recall)
# calculate auc
evaluator = BinaryClassificationEvaluator()
auc = evaluator.evaluate(prediction, {evaluator.metricName: 'areaUnderROC'})
print('n precision: %0.3f' % precision)
print('n recall: %0.3f' % recall)
print('n accuracy: %0.3f' % accuracy)
print('n F1 score: %0.3f' % F)
print('AUC: %0.3f' % auc)

 

The code above prints the confusion matrix of the prediction and also calculates all metrics. And the results are summarized as a table shown below:

table1

 

From the table, all three models can predict the label pretty accurately. While Neural Network gets higher accuracy, its precision and recall are relatively lower. After all, the overall difference between these three models is relatively small. 

Conlusion

With all the models providing similar performances, it is fair to say that, for machine learning, sometimes it is the data that dominates the solution, instead of the model. In other words, with enough dataset, all models can achieve a pretty good score. With that being said, the focus shifted from data to approach in Big Data era. What’s important is no longer the result, but the way to approach the result.

For our experiment, the flights’ dataset contains more than 2 million records, which can take a long time for stand-alone devices to implement the machine learning application. Apache Spark provides an automatic distributed clustering-computing framework, under which a simple Logistic Regression can achieve almost identical performance as a GPU implemented Neural Network model with shorter training time. 

In this post, I talked about how to use PySpark to build machine learning pipelines which are suitable for Big Data analysis. With a smaller size of data, though, using standard machine learning library should be sufficient and more efficient. Nevertheless, Apache Spark is definitely a reliable tool for solving challenging machine learning problems using Big Data.

The author is still learning Apache Spark, and there might be error or mistakes in this post. Welcome to correct me if I’m wrong and let me know if you have any questions!

Thanks for reading 🙂