8
Logistic Regression in Apache Spark
Hadoop
Machine Learning
Python

Apache Spark is an amazingly fast large scale data processing engine that can be run on Hadoop, Mesos or on your local machine. 1

In contrast to Mahout, Hadoop, Spark allows not only Map Reduce, but general programming tasks; which is good for us because ML is primarily not Map Reduce. And building a ML Algorithm in Map Reduce is difficult!

Spark also has an enormous advantage over Mahout : It does computation in memory. This means, most of the times, it’s a lot faster than Mahout.

In this post, I’ll be showing you how to run a small example in Spark, in Python! (You can port the code into Java or Scala with ease.)

Okay, so let’s get started with a brief intro on Logistic Regression, and then move on to how to do it in spark.

Brief intro on Logistic Regression

Logistic Regression is an algorithm in Machine Learning for Classification. Classification involves looking at data and assigning a class (or a label) to it. Usually there are more than one classes, but in our example, we’ll be tackling Binary Classification, in which there at two classes: 0 or 1. You can find more about this algorithm here: Logistic Regression (Wikipedia) 2

Essentially what we do, is to draw a ‘line’ through our data, and say ‘If this data point (or sample) falls on one side, assign it a label 0, or if it falls on the other side of the line, give it a label 1?. Machine Leaning ‘fits’ this line using a optimization algorithm (usually Gradient Descent or some form of it), such that the error of prediction is lowered.

enter image description here

The Dataset

Now let’s take a look at the data. The dataset can be found here. 3 It’s a bank note authentication dataset, where the features are extracted from images of fake and authentic bank notes. Taking a look at the data:

3.6216,8.6661,-2.8073,-0.44699,0
4.5459,8.1674,-2.4586,-1.4621,0
3.866,-2.6383,1.9242,0.10645,0
3.4566,9.5228,-4.0112,-3.5944,0
0.32924,-4.4552,4.5718,-0.9888,0
.
.
.
.

.. we find that each data point is in one line, and is comma separated. As you can probably see, the last number is the class – 0 or 1. The rest of the numbers are ‘features’ – using which we discriminate between classes. In this data, there are 4 features: (computed after a Wavelet Transform) [11]

  • Variance of Wavelet Transformed image (continuous)

[12]

  • Skewness of Wavelet Transformed image (continuous)

[8]

  • Curtosis of Wavelet Transformed image (continuous)

[7]

  • Entropy of image (continuous)

[9] [10]

So now that we know how the data looks like, we can get started with writing code!

The Code

You can see / get the code from my Git Hub repo :Spark-Example

The input format expected in the LogisticRegression algorithm implementation in Spark is a numpy array, in which the first element is the class ( 0 or 1 ), and all elements after the first elements are the features, which must a float datatype.

The language I’m using is Python, but you can easily port this code to java / scala.

Spark Config

First, we have to make sure that Python can see the Spark libs:

# Add Spark Python Files to Python Path
import sys
import os
SPARK_HOME = "/usr/local/spark-0.9.1" # Set this to wherever you have compiled Spark
os.environ["SPARK_HOME"] = SPARK_HOME # Add Spark path
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1" # Set Local IP
sys.path.append( SPARK_HOME + "/python") # Add python files to Python Path

Now, we have access to the spark MLLib, so we import the relevant files and set up a method to get the Spark Context:

from pyspark.mllib.classification import LogisticRegressionWithSGD
import numpy as np
from pyspark import SparkConf, SparkContext

def getSparkContext():
    """
    Gets the Spark Context
    """
    conf = (SparkConf()
         .setMaster("local") # run on local
         .setAppName("Logistic Regression") # Name of App
         .set("spark.executor.memory", "1g")) # Set 1 gig of memory
    sc = SparkContext(conf = conf) 
    return sc

Reading the Data

The class that implements Spark’s Logistic Regression uses Stochastic Gradient Descent, which is an algorithm that ‘fits’ a model to the data. You can learn about SGD here: SGD (Wikipedia) 4

We now can read the file and store it into a Spark RDD ( Link to Spark Basics ). Change the path in this line to wherever you have downloaded the dataset

sc = getSparkContext()

# Load and parse the data
data = sc.textFile("hdfs://localhost/user/hduser2/data")

We now have to transform the data, which is in a CSV format, to a numpy array, which is the format for LogisticRegressionWithSGD’s train method. For that we use the following Map function, and apply it to the data:

def mapper(line):
    """
    Mapper that converts an input line to a feature vector
    """    
    feats = line.strip().split(",") 
    # labels must be at the beginning for LRSGD, it's in the end in our data, so 
    # putting it in the right place
    label = feats[len(feats) - 1] 
    feats = feats[: len(feats) - 1]
    feats.insert(0,label)
    features = [ float(feature) for feature in feats ] # need floats
    return np.array(features)

parsedData = data.map(mapper)

Training and testing our model

Now we can train the model, and test it.

# Train model
model = LogisticRegressionWithSGD.train(parsedData)

# Predict the first elem will be actual data and the second 
# item will be the prediction of the model
labelsAndPreds = parsedData.map(lambda point: (int(point.item(0)), 
        model.predict(point.take(range(1, point.size)))))

# Evaluating the model on training data
trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count())

# Print some stuff
print("Training Error = " + str(trainErr))
To run this example, use:

python run-spark-ex.py

If everything runs well, then you should see an error of 0.0211370262391 i.e 2%!

If you have any questions or suggestions, please feel free to comment!

References

1 Spark: http://spark.apache.org/

2 Logistic Regression (Wikipedia) : http://en.wikipedia.org/wiki/Logistic_regression

3 The Dataset: https://archive.ics.uci.edu/ml/datasets/banknote+authentication

4 SGD (Wikipedia) :http://en.wikipedia.org/wiki/Stochastic_gradient_descent

[5] Link to Spark Basics :http://spark.apache.org/docs/0.9.1/quick-start.html#basics

[6] Spark Logistic Regression Example :http://spark.apache.org/docs/0.9.0/mllib-guide.html#binary-classification-2

[7] Kurtosis: http://en.wikipedia.org/wiki/Kurtosis

[8] Skewness: http://en.wikipedia.org/wiki/Skewness

[9] Entropy (General): http://en.wikipedia.org/wiki/Entropy_(information_theory)

[10] Image Entropy: http://www.astro.cornell.edu/research/projects/compression/entropy.html

[11] Wavelet Transforms: http://en.wikipedia.org/wiki/Wavelet_transform

[12] Variance: http://en.wikipedia.org/wiki/Variance

Thanks to Jaskaran, Vipin and Dr. Vijay Srinivas!

Author

Notifications

?