Spark is the primier BigData tool for data science, and PySpark supports a natural move from the local machine to cluster computing. In fact, you can use PySpark on your local machine in standalone mode just as you would on a cluster. In this post, we provide a refresher for those working on legacy or other systems, and want to quickly transition back to Spark.
Environment
When using the pyspark-shell, the spark.sparkContext
is available via sc
environment variable. However, in Jupyter, we will have to create our own session. Take note of the version, Spark v3.0 had some big changes.
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("app.name") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
spark.sparkContext
<div>
<p><b>SparkContext</b></p>
<p><a href="http://7ffeadb3f712:4040">Spark UI</a></p>
<dl>
<dt>Version</dt>
<dd><code>v2.4.4</code></dd>
<dt>Master</dt>
<dd><code>local[*]</code></dd>
<dt>AppName</dt>
<dd><code>app.name</code></dd>
</dl>
</div>
sc = spark.sparkContext
sc.version
'2.4.4'
import pandas as pd
import numpy as np
Configuration
Always get a sense of the raw data. Typically, this can only be done via commandline. Here, we see there is some metadata as a header, before the actual data. This would cause some problems if we were loading the data with a simple dataframe command.
! head -n 15 ./Data/spark_Houses/cadata.txt
S&P Letters Data
We collected information on the variables using all the block groups in California from the 1990 Census. In this sample a block group on average includes 1425.5 individuals living in a geographically compact area. Naturally, the geographical area included varies inversely with the population density. We computed distances among the centroids of each block group as measured in latitude and longitude. We excluded all the block groups reporting zero entries for the independent and dependent variables. The final data contained 20,640 observations on 9 variables. The dependent variable is ln(median house value).
Bols tols
INTERCEPT 11.4939 275.7518
MEDIAN INCOME 0.4790 45.7768
MEDIAN INCOME2 -0.0166 -9.4841
MEDIAN INCOME3 -0.0002 -1.9157
ln(MEDIAN AGE) 0.1570 33.6123
ln(TOTAL ROOMS/ POPULATION) -0.8582 -56.1280
ln(BEDROOMS/ POPULATION) 0.8043 38.0685
ln(POPULATION/ HOUSEHOLDS) -0.4077 -20.8762
ln(HOUSEHOLDS) 0.0477 13.0792
The file cadata.txt contains all the the variables. Specifically, it contains median house value, median income, housing median age, total rooms, total bedrooms, population, households, latitude, and longitude in that order.
Resilient Distributed Datasets
The Resilient Distributed Dataset (RDD) is a really fun way for tackling string data. It has strong support for functional programming, so you can get initial processing completed in a very clean manner.
Its important to note that if you perform operations that require shuffling data among the different Tasks, that your Job will be slowed, considerably.
textFile = sc.textFile("./Data/spark_Houses/cadata.txt") #"hdfs://<HDFS loc>/data/*.zip")
notes = textFile.take(27)
#this is incorrect, but a fun exercise
#also possible to use .lookup(1), in-place of filter(lambda x: x[0]>4)
headers = textFile.zipWithIndex().\
map(lambda x: (x[1],x[0]) ).\
filter(lambda x: x[0]>4).\
filter(lambda x: x[0]<13).\
map(lambda x: (x[1].split("\t"))[0] ).\
collect()
col_names = [u'longitude: continuous.', u'latitude: continuous.', u'housingMedianAge: continuous. ', u'totalRooms: continuous. ', u'totalBedrooms: continuous. ', u'population: continuous. ', u'households: continuous. ', u'medianIncome: continuous. ', u'medianHouseValue: continuous. ']
header = [x.split(": ")[0] for x in col_names]
header
['longitude',
'latitude',
'housingMedianAge',
'totalRooms',
'totalBedrooms',
'population',
'households',
'medianIncome',
'medianHouseValue']
rdd = textFile.zipWithIndex().filter(lambda x: x[1]>27)
ln = rdd.sample(0, .0001)
import re
ptrn = re.compile("\s+")
line = (ln.take(1)[0])[0]
re.split(ptrn, line)
['',
'4.8900000000000000e+004',
'1.7197000000000000e+000',
'3.4000000000000000e+001',
'1.3790000000000000e+003',
'3.3300000000000000e+002',
'1.1560000000000000e+003',
'3.1500000000000000e+002',
'3.5369999999999997e+001',
'-1.1897000000000000e+002']
Once the data is prepared, use the Row class to get it into a schema and format that will make import to Spark clean and reproducible.
from pyspark.sql import Row
#DO NOT fix dtypes before conversion to DF:
# TypeError: not supported type: <class 'numpy.float64'>
rows = rdd.map(lambda x: re.split(ptrn, x[0])).map(lambda x:
Row(longitude= np.float64(x[1]),
latitude= np.float64(x[2]),
housingMedianAge=x[3],
totalRooms=x[4],
totalBedRooms=x[5],
population=x[6],
households=x[7],
medianIncome=x[8],
medianHouseValue=x[9]
))
rows = rdd.map(lambda x: re.split(ptrn, x[0])).map(lambda x:
Row(longitude= x[1],
latitude= x[2],
housingMedianAge=x[3],
totalRooms=x[4],
totalBedRooms=x[5],
population=x[6],
households=x[7],
medianIncome=x[8],
medianHouseValue=x[9]
))
Dataframe
The Spark DataFrame is the workhorse tool for data scientists. Operations on DataFrames are optimized, so it is better to provide simple instructions to it, than it is to create your own, in say, a User Defined Function (UDF), or some other manner.
Ensure your types are correct - if you’re coming from a RDD, then they will all be strings!
df_raw = rows.toDF()
df_raw.dtypes
[('households', 'string'),
('housingMedianAge', 'string'),
('latitude', 'string'),
('longitude', 'string'),
('medianHouseValue', 'string'),
('medianIncome', 'string'),
('population', 'string'),
('totalBedRooms', 'string'),
('totalRooms', 'string')]
from pyspark.sql.types import *
df = df_raw.withColumn("longitude", df_raw["longitude"].cast(FloatType()) ) \
.withColumn("latitude", df_raw["latitude"].cast(FloatType()) ) \
.withColumn("housingMedianAge", df_raw["housingMedianAge"].cast(FloatType()) ) \
.withColumn("totalRooms", df_raw["totalRooms"].cast(FloatType()) ) \
.withColumn("totalBedRooms", df_raw["totalBedRooms"].cast(FloatType()) ) \
.withColumn("population", df_raw["population"].cast(FloatType()) ) \
.withColumn("households", df_raw["households"].cast(FloatType()) ) \
.withColumn("medianIncome", df_raw["medianIncome"].cast(FloatType()) ) \
.withColumn("medianHouseValue", df_raw["medianHouseValue"].cast(FloatType()) )
#automate column
from pyspark.sql.types import *
# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
for name in names:
df = df.withColumn(name, df[name].cast(newType))
return df
# Assign all column names to `columns`
columns = df_raw.columns
# Conver the `df` columns to `FloatType()`
df = convertColumn(df_raw, columns, FloatType())
df.dtypes
[('households', 'float'),
('housingMedianAge', 'float'),
('latitude', 'float'),
('longitude', 'float'),
('medianHouseValue', 'float'),
('medianIncome', 'float'),
('population', 'float'),
('totalBedRooms', 'float'),
('totalRooms', 'float')]
Once the data starts looking complete, you can begin executing SQL-like commands against the DataFrame.
df.groupBy("housingMedianAge").count().sort("housingMedianAge", ascending=False).show(5)
+----------------+-----+
|housingMedianAge|count|
+----------------+-----+
| 52.0| 1273|
| 51.0| 48|
| 50.0| 136|
| 49.0| 134|
| 48.0| 177|
+----------------+-----+
only showing top 5 rows
df.select('MedianHouseValue').describe().show()
+-------+-------------------+
|summary| MedianHouseValue|
+-------+-------------------+
| count| 20639|
| mean|-119.56957555201876|
| stddev| 2.003494699348379|
| min| -124.35|
| max| -114.31|
+-------+-------------------+
Data Preparation
Preparing data for models can be more involved than in libraries specific to your local machine. These are a few of the steps that will need to be completed. Some of these methods look similar to those of SKLearn.
Add columns
We will add a few more columns using the .withColumn()
method.
df_prep1 = df
from pyspark.sql.functions import *
# Adjust the values of `medianHouseValue`
df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)
df.select('MedianHouseValue').describe().show()
+-------+--------------------+
|summary| MedianHouseValue|
+-------+--------------------+
| count| 20639|
| mean|-0.00119569575552...|
| stddev|2.003494699348537...|
| min|-0.00124349998474...|
| max|-0.00114309997558...|
+-------+--------------------+
from pyspark.sql.functions import *
# Add the new columns to `df`
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")) \
.withColumn("populationPerHousehold", col("population")/col("households")) \
.withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))
df.select("roomsPerHousehold", "populationPerHousehold", "bedroomsPerRoom").first()
Row(roomsPerHousehold=6.238137082601054, populationPerHousehold=2.109841827768014, bedroomsPerRoom=0.15579659106916466)
Create a DenseVector
Spark uses breeze under the hood for high performance Linear Algebra in Scala.
In Spark MLlib and ML some algorithms depends on org.apache.spark.mllib.libalg.Vector
type which is rather dense (DenseVector
) or sparse (SparseVector
).
Their is no implicit conversion between a scala Vector or array into a dense Vector from mllib, so you must ensure this is complete before feeding it to a model.
df_prep2 = df
# Re-order and select columns
df = df.select("medianHouseValue",
"totalBedRooms",
"population",
"households",
"medianIncome",
"roomsPerHousehold",
"populationPerHousehold",
"bedroomsPerRoom")
from pyspark.ml.linalg import DenseVector
# Define the `input_data`
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))
input_data.take(3)
[(-0.0012222000122070313,
DenseVector([1106.0, 2401.0, 1138.0, 37.86, 6.2381, 2.1098, 0.1558])),
(-0.0012223999786376953,
DenseVector([190.0, 496.0, 177.0, 37.85, 8.2881, 2.8023, 0.1295])),
(-0.0012225,
DenseVector([235.0, 558.0, 219.0, 37.85, 5.8174, 2.5479, 0.1845]))]
# Replace `df` with the new DataFrame
df = spark.createDataFrame(input_data, ["label", "features"])
df.take(3)
[Row(label=-0.0012222000122070313, features=DenseVector([1106.0, 2401.0, 1138.0, 37.86, 6.2381, 2.1098, 0.1558])),
Row(label=-0.0012223999786376953, features=DenseVector([190.0, 496.0, 177.0, 37.85, 8.2881, 2.8023, 0.1295])),
Row(label=-0.0012225, features=DenseVector([235.0, 558.0, 219.0, 37.85, 5.8174, 2.5479, 0.1845]))]
Standardize columns
The StandardScaler
is used to transform a column to zero mean and a standard deviation of 1.
# Import `StandardScaler`
from pyspark.ml.feature import StandardScaler
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled") #initialize
scaler = standardScaler.fit(df) #fit
scaled_df = scaler.transform(df) #scale
# Inspect the result
scaled_df.select("features_scaled").take(2)
[Row(features_scaled=DenseVector([2.6255, 2.1202, 2.9765, 17.7252, 2.5213, 0.2031, 2.6851])),
Row(features_scaled=DenseVector([0.451, 0.438, 0.463, 17.7205, 3.3498, 0.2698, 2.2321]))]
Model
Once the data is prepared, we can choose from a number of different models to apply to the data.
Split and Train
train_data, test_data = scaled_df.randomSplit([.8,.2],seed=1234)
from pyspark.ml.regression import LinearRegression
# Initialize `lr`
lr = LinearRegression(labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)
linearModel = lr.fit(train_data)
Predict
# Generate predictions
predicted = linearModel.transform(test_data)
# Extract the predictions and the "known" correct labels
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])
# Zip `predictions` and `labels` into a list
predictionAndLabel = predictions.zip(labels).collect()
# Print out first 5 instances of `predictionAndLabel`
predictionAndLabel[:5]
[(-0.0011958166787652167, -0.001243499984741211),
(-0.0011958166787652167, -0.0012430000305175782),
(-0.0011958166787652167, -0.0012430000305175782),
(-0.0011958166787652167, -0.0012419000244140626),
(-0.0011958166787652167, -0.0012416999816894532)]
Model results
# Coefficients for the model
linearModel.coefficients
DenseVector([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
# Intercept for the model
linearModel.intercept
-0.0011958166787652167
# Get the RMSE
linearModel.summary.rootMeanSquaredError
2.004643282977487e-05
# Get the R2
linearModel.summary.r2
-1.3034018309099338e-13
Conclusion
These are the basic steps taken in every Spark machine learning application.