The goal of this project is predict the tip amount paid for a a taxi ride in NYC in 2015. The main technical challenge in this project is the size of the data set: 77106102 observations! This is obviously too much data to simply download locally and load into memora via a pandas dataframe, so in this project I focus less on the typical model building techniques like data cleaning and parameter tuning and more on exploring the technologies needed to manipulate and explore such a large data set.
For this project I created an Apache Spark cluster consisting of 1 master node and 4 worker nodes on the Google Cloud Plaform (GCP). Apache Spark is an open source platform for fast analytics on large datasets and fortunately a Python API (PySpark) exists. The core principal of Spark is the notion of Resilient Distributed Datasets (RDD). In the Spark framework a dataset is divided into RDDs and distributed to different compute nodes on a cluster, allowing us to essentially partition and apply operations on a dataset over a cluster size of our choice.
After setting up a GCP account (thanks for the free $300 trial!) a PySpark cluster was created by running the following command locally:
gcloud dataproc clusters create cluster_name \
--num-workers 4 --worker-machine-type n1-standard-1 \
--master-machine-type n1-standard-2 \
--master-boot-disk-size 128 \
--worker-boot-disk-size 128 \
--scopes 'https://www.googleapis.com/auth/cloud-platform' \
--project thaddeus \
--metadata="MINICONDA_VARIANT=2" \
--initialization-actions \
gs://dataproc-initialization-actions/jupyter/jupyter.sh
Next, I created a ssh tunnel to the cluster:
gcloud compute ssh cluster_name-m -- -D 10000 -N -n
An connected via my browser:
/Applications/Google\ Chrome.app/Contents/MacOS/Google\ Chrome "http://cluster_name-m:8123" \
--proxy-server="socks5://localhost:10000" \
--host-resolver-rules="MAP * 0.0.0.0 , EXCLUDE localhost" \
--user-data-dir=/tmp/
And with that, we are ready to start analyzing the taxi dataset.
from pyspark.sql.functions import mean, min as smin, max as smax, udf, date_format
from pyspark.sql.types import TimestampType, StringType
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
import time
import pyspark.ml.regression
from pyspark.ml.linalg import Vectors
!pip install pandas
!pip install matplotlib
!pip install seaborn
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns; sns.set(style="ticks", color_codes=True)
# load the dataset from the datasci-450 bucket
start_time = time.time()
storage_url = 'gs://datasci-450/resources/datasets/taxi/*'
taxi = spark.read.format('csv').load(storage_url, header=True, inferSchema=True)
print("--- %s seconds ---" % (time.time() - start_time))
# determine the number of rows in the spark DF
print 'number of rows: ', taxi.count()
# holy smokes, that is a lot of rows. next, take a look at the df
taxi.show(3)
# determine what features are in the df
taxi.columns
# check the data types
taxi.printSchema()
# we will take a snippet of the data for exploration.
start_time = time.time()
taxi_sample = taxi.sample(withReplacement=False, fraction=0.005, seed=123)
taxi_pd = taxi_sample.toPandas()
print '# of columns: ', taxi_pd.shape[0]
print("--- %s seconds ---" % (time.time() - start_time))
# write this pd to disk in case i get booted
taxi_pd.to_csv('taxi_pd.csv')
taxi_pd.head()
# start with tip_amount
taxi_pd['tip_amount'].describe()
# how can there be a negative tip? moving on....
# how many vendor IDs are there?
print 'unique vendor IDs: ', len(taxi_pd['vendor_id'].unique())
# how many observations from each vendor?
vendors = taxi_pd['vendor_id'].unique()
for i in vendors:
name = i.split('u')[0]
print 'vendor: ', name, ' - # of observations: ', taxi_pd[taxi_pd['vendor_id'] == name].shape[0]
# next look at the non-categorical, non location features
discrete_features = ['passenger_count',
'trip_distance',
'fare_amount',
'total_amount',
'tolls_amount',
'tip_amount',
'vendor_id']
taxi_discrete = taxi_pd.loc[:, discrete_features]
g = sns.pairplot(taxi_discrete, hue="vendor_id")
plt.show()
# fare_amount and total_amount seem to be colinear, so i will not include both of them in the model.
# conclusions for the other features are limited b/c of signficant outliers.
# trip_distance appears to have some significant outliers making it difficult to interpret the plot
# also tip_amount and tolls_amount and total_amount have outliers. lets look at these features in more detail.
print 'tip_amount: \n', taxi_pd['tip_amount'].describe()
print '\n trip_distance: \n', taxi_pd['trip_distance'].describe()
print '\n tolls_amount: \n', taxi_pd['tolls_amount'].describe()
print '\n total_amount: \n', taxi_pd['total_amount'].describe()
# yikes! ther are negative toll amounts, zero trip_distance values (missing data?) and negative tip_amounts.
# this data set is a hot mess.
# plot the above features separately w/ different axis limits to examine the data w/o outliers
taxi_pd.plot.scatter('total_amount', 'tip_amount', alpha=0.2, figsize=(8,8), fontsize=14)
plt.xlim([0,400])
plt.ylim([0,200])
plt.show()
taxi_pd.plot.scatter('trip_distance', 'tip_amount', alpha=0.2, figsize=(8,8), fontsize=14)
plt.xlim([0,10])
plt.ylim([0,25])
plt.show()
taxi_pd.plot.scatter('tolls_amount', 'tip_amount', alpha=0.2, figsize=(8,8), fontsize=14)
plt.xlim([0,10])
plt.ylim([0,200])
plt.show()
taxi_pd.plot.scatter('passenger_count', 'tip_amount', alpha=0.2, figsize=(8,8), fontsize=14)
plt.xlim([0,8])
plt.ylim([0,300])
plt.show()
# as shown above total_amount and trip_distance and passenger appear to have a linear (maybe?) relationship with tip_amount
# so these will be included in the model.
# also, most of the toll_amount values are zero so we will remove this from the model.
# in summary, we will include the following discrete features in the model:
# trip_distance, total_amount, passenger_count
# next we evalute the categorial data:
def unique_snowflakes(feature):
uniques = taxi_pd[feature].unique()
print '\nfeature', feature
for i in uniques:
name = i.split('u')[0]
print 'features: ', name, ' - # of observations: ', taxi_pd[taxi_pd[feature] == name].shape[0]
cats = ['payment_type', 'store_and_fwd_flag']
for i in cats:
unique_snowflakes(i)
# the majority of the the payments are either crd or csh (card or cash?)
# also, 99.1% of the store_and_fwd_flag features are 'N', so we will not use this features.
# lets determine if payment type and tip_amount are related
sns.boxplot(x='payment_type', y='tip_amount', data=taxi_pd);
plt.rcParams['figure.figsize']=(10,8)
plt.show()
# hard to see anything b/c of outliers. zoom in.
sns.boxplot(x='payment_type', y='tip_amount', data=taxi_pd);
plt.rcParams['figure.figsize']=(10,8)
plt.ylim([0,7.5])
plt.show()
# people seem to tip quite a bit more when they pay with card.
taxi_pd.groupby('payment_type')['tip_amount'].mean()
# payment type will certainly be included in our model!
# finally we turn to the date. date formats are painful, so i will practice with a small
# chunk of the spark dataframe (2 rows, to be exact)
taxi_small = taxi.limit(2)
taxi_small.show()
# there are two date components: pickup_datetime and dropoff_datetime
taxi_small.select('pickup_datetime').show()
taxi_small.select('dropoff_datetime').show()
# write a udf to extract day of the week (dow) from the pickup dates
def dates_are_painful(stripper):
day, time, timezone = stripper.strip().split()
return ' '.join([day, time])
udf_myFunction = udf(dates_are_painful)
taxi_dow = taxi_sample.withColumn("pickup_udf", udf_myFunction("pickup_datetime").cast('timestamp'))
taxi_dow2 = taxi_dow.withColumn('dow', date_format('pickup_udf', 'E'))
taxi_dow2_pd = taxi_dow2.toPandas()
sns.boxplot(x='dow', y='tip_amount', data=taxi_dow2_pd);
plt.rcParams['figure.figsize']=(10,8)
plt.show()
# zoom in for a better view
sns.boxplot(x='dow', y='tip_amount', data=taxi_dow2_pd);
plt.rcParams['figure.figsize']=(10,8)
plt.ylim([0,7.5])
plt.show()
# calculate the summary statistics
print('mean tip amount:')
taxi_dow2_pd.groupby('dow')['tip_amount'].mean()
print('mean tip amount:')
taxi_dow2_pd.groupby('dow')['tip_amount'].median()
# Saturday and Suday have the lowest mean and median tip_amounts. Of the cateogorical values
# we will include payment_type and dow as variables. We are now ready to format the full data
# set for model building.
# But first, we need to transform our data via one-hot encoding. again, practice with the small snippet of data
# first, convert payment type to a numeric index
payment_indexer = StringIndexer(inputCol='payment_type', outputCol='payment_typeIndex')
payment_model = payment_indexer.fit(taxi_sample)
payment_indexed = payment_model.transform(taxi_sample)
payment_indexed
# next, one hot encode the indexed payment_type
payment_encoder = OneHotEncoder(inputCol='payment_typeIndex', outputCol='paymentVec')
payment_encoded = payment_encoder.transform(payment_indexed)
payment_encoded.show()
# beautiful! (not really). now we can one hot encode the entire original dataframe.
# we will use the following features to build our model:
# numeric: trip_distance, total_amount, passenger_count
# categorical: payment_type, pickup day of week (calculated from pickup_datetime)
# so, we will select just these features from our original data
taxi_features = taxi.select('trip_distance',
'total_amount',
'passenger_count',
'payment_type',
'pickup_datetime',
'tip_amount')
taxi_features.show()
# next, extract day of week from pickup_datetime
taxi_dow = taxi_features.withColumn("pickup_udf", udf_myFunction("pickup_datetime").cast('timestamp'))
taxi_dowE = taxi_dow.withColumn('dow', date_format('pickup_udf', 'E'))
taxi_dowE.show()
# one hot encode payment type
payment_indexer = StringIndexer(inputCol='payment_type', outputCol='payment_typeIndex')
payment_model = payment_indexer.fit(taxi_dowE)
payment_indexed = payment_model.transform(taxi_dowE)
payment_encoder = OneHotEncoder(inputCol='payment_typeIndex', outputCol='paymentVec')
payment_encoded = payment_encoder.transform(payment_indexed)
payment_encoded.show()
# one hot encode pickup day of week
dow_indexer = StringIndexer(inputCol='dow', outputCol='dowIndex')
dow_model = dow_indexer.fit(payment_encoded)
dow_indexed = dow_model.transform(payment_encoded)
dow_encoder = OneHotEncoder(inputCol='dowIndex', outputCol='dowVec')
dow_payment_encoded = dow_encoder.transform(dow_indexed)
dow_payment_encoded.show()
# select only the features that we need moving forward
taxi_final = dow_payment_encoded.select('trip_distance',
'total_amount',
'passenger_count',
'dowVec',
'paymentVec',
'tip_amount')
taxi_final.show()
# now we build a linear regression model to predict tip amount from the features we have created
feature_cols = ['trip_distance', 'total_amount', 'passenger_count', 'dowVec', 'paymentVec']
label_col = 'tip_amount'
# vector assembler to build a dense vector
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
# Reduce dataframe to essential columns for model building
taxi_xform = assembler.transform(
taxi_final.select(feature_cols + [label_col]))
taxi_xform.printSchema()
taxi_xform.show()
taxi_xform.count()
# build LR model
taxi_train = taxi_xform.withColumnRenamed('tip_amount', 'label').select('label', 'features')
taxi_train
taxi_train.show()
# split into test/train sets
taxi_full_train, taxi_full_test = taxi_train.randomSplit([0.5, 0.5])
taxi_full_train.count(), taxi_full_test.count()
# fit the model to the data
start_time = time.time()
lr = pyspark.ml.regression.LinearRegression(maxIter=1)
lrmodel = lr.fit(taxi_full_train)
print("--- %s seconds ---" % (time.time() - start_time))
# make predictions
taxi_pred = lrmodel.transform(taxi_full_test)
taxi_pred
# show a few predictions
taxi_pred.show(3)
# model coefficients
lrmodel.coefficients, lrmodel.intercept
# print those normally distrubuted about zero residuals
lrmodel.summary.residuals.agg({'residuals': 'avg'}).show()
lrmodel.summary.residuals.show()
# how much variance in the response variable is predicted by the independent variables?
lrmodel.summary.r2
# get a snipped of the predictions for visualization
pred_sample = taxi_pred.sample(withReplacement=False, fraction=0.005, seed=123)
pred_pd = pred_sample.toPandas()
# visualize predictions versus actual
pred_pd.plot.scatter('label', 'prediction', figsize=(12, 12), fontsize=16, alpha=0.2)
plt.xlabel('Actual Tip Amount', fontsize=24)
plt.ylabel('Predicted Tip Amount', fontsize=24)
plt.xlim([0,200])
plt.ylim([0,400])
plt.show()
# plot the residuals
pred_pd['residual'] = pred_pd['prediction'] - pred_pd['label']
pred_pd.plot.scatter('label', 'residual', figsize=(12, 12), fontsize=16, alpha=0.2)
plt.xlabel('Actual Tip Amount', fontsize=24)
plt.ylabel('Residual', fontsize=24)
plt.xlim([0,200])
plt.ylim([-100,200])
plt.show()
# histogram of residuals
font = {'weight' : 'bold',
'size' : 24}
plt.rc('font', **font)
resids = list(pred_pd['residual'])
plt.figure(figsize=(12, 12))
plt.hist(resids, bins=100)
plt.xlabel('Residual', fontsize=24)
plt.ylabel('Count', fontsize=24)
plt.xlim([-100, 100])
plt.show()
# As shown above, the distribution of the residuals is right skewed.
# This seems to come from the fact that our model predicts a tip when no tip was actually paid.
# Moving forward, we should explore the data deeper and find features or ways to exploit the
# data to predict when no tip will be paid. Next time!