Practical Apache Spark in 10 minutes (MLib)

Practical Apache Spark in 10 minutes (MLib)

MLlib is Spark’s scalable machine learning library consisting of common learning algorithms and utilities, including classification, regression, clustering, collaborative filtering, dimensionality reduction, as well as underlying optimization primitives.

In this post we would like to show the one of the most common ML task - churn prediction. Churn is about dealing with the risk of a customer to churn to another company. In our case we have numeric data from user accounts on the site and should to predict whether they will stay with company (0) or leave it - churn (1). Here we consider the four models of machine learning (two regression models and two models of classification) to solve this task:

  1. Linear regression.
  2. Logistic Regression.
  3. SVM.
  4. Naive Bayers.

Load Data

data = sc.textFile("data.txt").map(lambda line:[float(x) for x in line.split(' ')])
print data.take(2)
[ [1.0, 0.0, 2.52078447201548, 0.0, 0.0, 0.0, 2.004684436494304, 2.000347299268466, 0.0, 2.228387042742021, 2.228387042742023, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], [0.0, 2.857738033247042, 0.0, 0.0, 2.619965104088255, 0.0, 2.004684436494304, 2.000347299268466, 0.0, 2.228387042742021, 2.228387042742023, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0] ]

Features Standard Scaling

Standardization can improve the convergence rate during the optimization process, and also prevents against features with very large variances exerting an overly large influence during model training. To standardize mllib uses StandardScaler from spark.mllib.feature.

from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.feature import StandardScaler

Take the features from data by using map().

features = data.map(lambda row: row[1:])
print features.take(2)
[ [0.0, 2.52078447201548, 0.0, 0.0, 0.0, 2.004684436494304, 2.000347299268466, 0.0, 2.228387042742021, 2.228387042742023, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], [2.857738033247042, 0.0, 0.0, 2.619965104088255, 0.0, 2.004684436494304, 2.000347299268466, 0.0, 2.228387042742021, 2.228387042742023, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0] ]

Use StandardScaler for features normalization.

standardizer = StandardScaler()
model = standardizer.fit(features)
features_transform = model.transform(features)
features_transform.take(2)
[DenseVector([0.0, 2.5169, 0.0, 0.0, 0.0, 2.0016, 1.9972, 0.0, 2.2249, 2.2249, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]),
 DenseVector([2.8533, 0.0, 0.0, 2.6159, 0.0, 2.0016, 1.9972, 0.0, 2.2249, 2.2249, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])]

Zip labels with features and create LabeledPoint object.

lab = data.map(lambda row: row[0])
transformedData = lab.zip(features_transform)
transformedData = transformedData.map(lambda row: LabeledPoint(row[0],[row[1]]))
transformedData.take(2)
[LabeledPoint(1.0, [0.0,2.51686716666,0.0,0.0,0.0,2.00156915188,1.99723875458,0.0,2.22492412372,2.22492412372,0.0,0.0,0.0,0.0,0.0,0.0]),
 LabeledPoint(0.0, [2.85329710122,0.0,0.0,2.61589367179,0.0,2.00156915188,1.99723875458,0.0,2.22492412372,2.22492412372,0.0,0.0,0.0,0.0,0.0,0.0])]

Model building

Split data into test and train by using randomSplit().

trainingData, testingData = transformedData.randomSplit([0.8,0.2],seed=1234)

Create a function to calculate the Mean Squared Error for each model.

def getMSE(model):
    valuesAndPreds = testingData.map(lambda p: (p.label, model.predict(p.features[0])))
    MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y) / valuesAndPreds.count()
    return MSE

Linear regression

Linear regression is an approach for modeling the relationship between a scalar dependent variable y and one or more explanatory (independent) variables. It is used for correlation analysis and tries to come up with the best model that fits the values of independent variables.

In mllib linear regression is presented as LinearRegressionWithSGD.

from pyspark.mllib.regression import LinearRegressionWithSGD
model = LinearRegressionWithSGD.train(trainingData, iterations=10, step=0.1)
print("Mean Squared Error = " + str(getMSE(model)))
0.262944332025

Logistic Regression

Logistic regression is a type of probabilistic statistical classification model. Logistic regression measures the relationship between the categorical dependent variable and one or more independent variables, which are usually (but not necessarily) continuous, by using probability scores as the predicted values of the dependent variable.

Class LogisticRegressionWithLBFGS allows you to build a logistic regression model.

from pyspark.mllib.classification import LogisticRegressionWithLBFGS
model = LogisticRegressionWithLBFGS.train(trainingData, iterations=10)
print("Mean Squared Error = " + str(getMSE(model)))
0.484848484848

SVM

Given a set of training examples, each belonging to one of two class labels, an SVM algorithm builds a model that assigns new examples into one label or another. An SVM model is a representation of the examples as points in space, mapped so that the examples of the separate categories are divided by a clear gap that is as wide as possible. New examples are then mapped into that same space and predicted to belong to a category based on which side of the gap they fall on.

The support vector model presented in mllib at class SVMWithSGD.

from pyspark.mllib.classification import SVMWithSGD
model = SVMWithSGD.train(trainingData,iterations=100, step=20.0)
print("Mean Squared Error = " + str(getMSE(model)))
0.378787878788

Naive Bayers

Naive Bayes is a simple technique for constructing classifiers: models that assign class labels to problem instances, represented as vectors of feature values, where the class labels are drawn from some finite set. For some types of probability models, naive Bayes classifiers can be trained very efficiently in a supervised learning setting. An advantage of naive Bayes is that it only requires a small amount of training data to estimate the parameters necessary for classification.

Naive Bayes simple method is implemented in mllib in class NaiveBayes.

from pyspark.mllib.classification import NaiveBayes
model = NaiveBayes.train(trainingData, lambda_=1.0)
print("Mean Squared Error = " + str(getMSE(model)))
0.575757576

As a result, we obtained the following results for our data:

Comments 0

Add a new comment: