Apache Spark is a popular framework for distributed computing and big data. It can be used with Java, Scala, R and Python via its high-level APIs. The techniques and patterns of the Python API (PySpark) are quite similar to those of Pandas and Scikit-Learn as previously explored. In this post, we use PySpark to build a recommender system.

What is Spark?

Spark is an open-source computational engine for analyzing data sets which cannot fit into memory (RAM and on disk) on one node. It started as a research project in 2009 at the University of California, Berkley’s AMPLab. It was open-sourced in 2010 and became a part of the Apache Foundation in 2013. It is written in Scala and hence needs a JVM to run.

It is fast and general purpose. One of the main reasons it is so popular is because of its speed. It is 100 times faster than Hadoop in memory and 10 times faster on disk.

Spark Core, MLlib, Spark SQL, Spark Streaming, GraphX and Cluster Managers.

Spark Core

Spark core handles distributing tasks across worker nodes and other basic tasks like fault recovery. This core engine powers the higher-level libraries MLlib, Spark SQL, Spark Streaming, and GraphX. The libraries are made to integrate closely and can be used together seamlessly in projects.

spark stack

MLlib

MLlib is the Spark analytical library for scalable machine learning. It contains a set of typical algorithms built-in such as those for linear regression, clustering, and recommender systems. It also features pipelines, utilities and functionality for persistence. The pattern for using the algorithms bears many similarities to Scikit-Learn.

A point to note is that features data has to be transformed to a vector with a VectorAssembler before fitting the models. This is not the case for other libraries like Scikit-Learn where all the features go in as a matrix.

Spark SQL

Spark SQL provides a SQL interface which can be used in addition to the Spark query syntax to manipulate and filter data in DataFrames. An example is shown below where a DataFrame is created from a table called ratings_full via a SQL select query.

df = sqlContext.sql("select * from ratings_full")

Spark Streaming

Spark Streaming is for data coming in from a live data source. It is scalable and fault-tolerant. Data can come in from a variety of sources like Flume, Kafka and Twitter. The data is split into batches after coming in through Spark Streaming and then processed by the Spark engine.

With Structured Streaming you interact with querying as normal without having to worry about the new data. It runs the commands continuously with new data. Each new data item is like a new row being appended to the end of an infinite table.

GraphX

GraphX is a library for graph manipulation (like social network graphs) which can be done in parallel. Like MLlib it contains common graph algorithms.

Cluster Managers

Spark has the ability to use multiple cluster managers. These include a simple Standalone Scheduler which comes with Spark, Hadoop YARN, and Apache Mesos. This is how Spark scales to many nodes.

Distributed Computing

Scaling can work either vertically or horizontally. In vertical scaling, more components (like RAM) are added to one node. This will eventually reach a limit and is not fault tolerant.

In horizontal scaling, more relatively weak nodes are added to a cluster. There is a master node which controls how tasks are split across the worker nodes. It is fault tolerant and not limited to expansion like vertical scaling.

distributed-spark

In Spark, a driver program on the master node controls the parallel operations via a SparkContext. The worker nodes in Spark are called executors. The parallel operations are split across these worker nodes. Spark automatically distributes the tasks in the RDDs (see below) across the worker nodes.

DataFrames and RDDs

Since version 2.0, DataFrames have been the main interface to Spark. They are similar in function and appearance to the Pandas and R DataFrames. They can also be compared to an Excel or Google Spreadsheet.

Columns represent features (like a movie rating, the size of a house, or the gradient of terrain) and rows represent samples (all the features of a particular movie, house or coordinate on earth).

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    110|   1.0|1425941529|
|     1|    147|   4.5|1425942435|
|     1|    858|   5.0|1425941523|
|     1|   1221|   5.0|1425941546|
|     1|   1246|   5.0|1425941556|

They can be used to import, explore and manipulate data. A nice feature is that SQL queries can be used in addition to the Spark syntax for queries, as mentioned before. This means that if you are familiar with SQL you can get going with Spark without having to learn it’s specific querying syntax. They can also similarly be used interactively for exploration in a notebook environment as is the case with Pandas.

Prior to this, RDDs or Resilient Distributed Datasets were the main interface. The syntax for DataFrames is more intuitive. The functionality is still the same under the hood. DataFrames are part of the Structured APIs and RDDs are part of the low-level API (Chambers and Zaharia, 2018).

Data Sources

Spark can use multiple storage systems such as HDFS, AWS S3 and Cassandra. This is in contrast to Hadoop MapReduce which can only use HDFS or the Hadoop Distributed File System. In the Hadoop MapReduce ecosystem, Hadoop splits the data across nodes and MapReduce performs computations via key value pairs. Both parts are closely interlinked.

Hence, Spark is thought to extend this solution since it can use multiple data source types.

Getting a Workspace

Databricks

The simplest way I have found to get started is with Databricks. Databricks is a platform which was created by the founders of Spark and is relatively easy to use. It runs on AWS and has a free community edition as well as a paid version. All the necessary tools are already installed. It uses a notebook environment similar to Jupyter Notebooks. This is the platform used in this post.

AWS

Alternatively, an AWS instance can also be used. The instance will have to be set up with all the necessary tools of your choice like an OS, Python, Anaconda, Spark etc.

Locally

Spark can also be run locally with a Jupyter Notebook as before. However, performance will be limited to the resources available.

Recommender System Example

Recommender systems should be quite familiar to most users. They are used by companies like Netflix, Facebook and Amazon to determine the best recommendations for movies, friends or products to show users based on their browsing history or other people’s activity. They are critical to the success of these applications since even a small increment in performance can increase turnover significantly.

Types of Recommender Systems

There are 3 types of recommender systems: content-based, collaborative filtering, knowledge-based. There is also the hybrid approach where all three of these are combined to get the best features of the three and mitigate the drawbacks of using them separately. These have a parallel to ensemble methods like random forests.

Content-Based

In a content-based system, the algorithm looks for features of the items and determines similarity based on whether or not these features are present. Features can include any characteristic of the movie like the year of publication or whether or not a certain actor is present. A major problem of this type of system is that the user can get stuck in a bubble where no serendipitous items are recommended. Additionally, expert humans are required which can be expensive.

Collaborative Filtering

In a collaborative filtering system, you can use similarities between users and items simultaneously. Domain knowledge is not required and recommendations can go outside the regular content-based bubble since recommendations can be made based on what other users like. These recommendation use latent factors and generalize them across users. For example, the length of time watching a movie can signal that a user liked (longer watch time) or disliked it (shorter watch time).

However, the sparsity of these matricies can lead to scalability problems. Additionally, it suffers from the cold start problem when there hasn’t been much interactions on a particular item or user. Implicit feedback can also turn out to be wrong, so there could be questions about the quality of the data.

The algorithm used by Spark is of the collaborative filtering type using the Alternating Least Squares (ALS) algorithm.

Knowledge Based

Hybrid

Hybrid versions combining the positive aspects of both systems which can be built on neural nets are tend to perform the best.

Dataset

In this example, we use the well-known MovieLens dataset. There are different versions of the dataset available. Here we use a version with 26M samples. There is also a 100K, a 10M and a 20M version. It contains samples with a userId identifying a particular user, a rating given by that user, and the movieId identifying the particular movie.

Notebook

The code is available on Databricks for 6 months from the date of this post here. You can import and run it by clicking Import Notebook.

Set up the environment

If using Databricks, an account is first needed. The community edition is used in this post. To get the system ready a cluster has to be created, the data should be imported and a new notebook has to be started.

Create a cluster

A cluster is needed to run the notebook code.

  1. Click Clusters in the left hand navigation bar.
  2. Click the blue Create Cluster button.
  3. Create a cluster with the desired settings and click the blue Create Cluster button.

cluster-create

Import data

To use a particular data source in Databricks the data has to be uploaded into the system.

  1. Click Data in the left hand navigation bar.
  2. Click the + button next to Tables.
  3. Click browse in the file box.
  4. Navigate to the file on the system and upload it. In this case it is the MovieLens data downloaded from Kaggle located here.
  5. Note the location /FileStore/tables/ratings_full.csv which appears under the grey file box after the upload as this will be used to import the data into the notebook.

import-data

Create a notebook

  1. Go to the dashboard by clicking the Databricks icon in the left navigation bar.
  2. Click Notebook under New.
  3. Type in the Name for the notebook in the name text field, then choose a language and a cluster.
  4. A new notebook should open.

create-notebook

Code

Set up and Imports

To start, create a SparkSession then import the recommendation system and evaluation algorithms. SparkSessions were introduced in Spark 2.0. They create a SparkConf, SparkContext and SQLContext so they do not have to be created manually. SparkSessions are needed to use Spark DataFrames.

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('recommender').getOrCreate()
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator


Next, import the data into a Spark DataFrame. Note the path is specific to the location on the Databricks platform and may vary. inferSchema=True means Spark will automatically determine the data types. header=True sets the first row to the header of the DataFrame. Otherwise it will default to arbitrary column names like _c0, _c1, _c2.

df = spark.read.csv('/FileStore/tables/ratings_full.csv', inferSchema=True, header=True)


Preparing and Exploring the Data

printSchema() shows the column names and the data types. It also indicates that is is possible for the values in each column to be null.

df.printSchema()
root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)
 

To get an array of the column names only use the columns attribute.

df.columns
['userId', 'movieId', 'rating', 'timestamp']

To remove unneeded columns use the select function with an array of the columns to keep in the DataFrame.

df = df.select(['userId', 'movieId', 'rating'])


To view the data, df.head(5) can be used. The output is in the form of an array.

df.head(5)
[Row(userId=1, movieId=110, rating=1.0),
 Row(userId=1, movieId=147, rating=4.5),
 Row(userId=1, movieId=858, rating=5.0),
 Row(userId=1, movieId=1221, rating=5.0),
 Row(userId=1, movieId=1246, rating=5.0)]

Alternatively, df.show() shows the first 20 rows of the DataFrame in tabular format by default. Adding a numerical parameter to the show function can adjust the number of rows in the table. For example, df.show(100) will show the first 100 rows.

df.show()
+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    110|   1.0|1425941529|
|     1|    147|   4.5|1425942435|
|     1|    858|   5.0|1425941523|
|     1|   1221|   5.0|1425941546|
|     1|   1246|   5.0|1425941556|
|     1|   1968|   4.0|1425942148|
|     1|   2762|   4.5|1425941300|
|     1|   2918|   5.0|1425941593|
|     1|   2959|   4.0|1425941601|
|     1|   4226|   4.0|1425942228|
|     1|   4878|   5.0|1425941434|
|     1|   5577|   5.0|1425941397|
|     1|  33794|   4.0|1425942005|
|     1|  54503|   3.5|1425941313|
|     1|  58559|   4.0|1425942007|
|     1|  59315|   5.0|1425941502|
|     1|  68358|   5.0|1425941464|
|     1|  69844|   5.0|1425942139|
|     1|  73017|   5.0|1425942699|
|     1|  81834|   5.0|1425942133|
+------+-------+------+----------+
only showing top 20 rows

df.describe().show() gives summarized statistics on the data. There are actually 26,024,289 data points for each feature. The ratings range from 0.5 to 5.0. with the average rating at 3.52.

df.describe().show()
+-------+-----------------+------------------+------------------+
|summary|           userId|           movieId|            rating|
+-------+-----------------+------------------+------------------+
|  count|         26024289|          26024289|          26024289|
|   mean| 135037.090248114|15849.109677040553|3.5280903543608817|
| stddev|78176.19722170352| 31085.25753139175|1.0654427636662234|
|    min|                1|                 1|               0.5|
|    max|           270896|            176275|               5.0|
+-------+-----------------+------------------+------------------+

Training the Model and Making Predictions

To train the model and make predictions, we need a training and an evaluation set. Here the training set is 80% of randomly selected samples and the rest are for evaluation.

Note that because the split is random and there is no random state parameter as in Scikit-Learn these results will not be exactly reproducible.

training, test = df.randomSplit([0.8,0.2])


The following line creates the alternating least squares model. Then the model is fitted with the training data and predictions are made on the test set.

als = ALS(maxIter=5, regParam=0.01, userCol='userId', itemCol='movieId', ratingCol='rating')

model = als.fit(training)

predictions = model.transform(test)
Cold Start Predictions

When there are cold start users or items to make predictions on (ones not available in the model) the predictions produce NaNs as shown in the summary below. This also causes evaluation with the mean squared error to produce a NaN.

To solve this problem, the rows can be dropped with predictions.na.drop(). A more streamlined way is to add the coldStartStrategy="drop" as a model parameter.

predictions.describe().show()
+-------+------------------+------------------+------------------+----------+
|summary|            userId|           movieId|            rating|prediction|
+-------+------------------+------------------+------------------+----------+
|  count|           5204702|           5204702|           5204702|   5204702|
|   mean|135081.15589038527|15842.569995938287| 3.527940312432873|       NaN|
| stddev| 78157.14981042777|31084.371710509196|1.0650888379810377|       NaN|
|    min|                 1|                 1|               0.5|-10.415044|
|    max|            270896|            176273|               5.0|       NaN|
+-------+------------------+------------------+------------------+----------+
predictions = predictions.na.drop()
predictions.describe().show()
+-------+-----------------+------------------+------------------+------------------+
|summary|           userId|           movieId|            rating|        prediction|
+-------+-----------------+------------------+------------------+------------------+
|  count|          5201214|           5201214|           5201214|           5201214|
|   mean|135086.1509872503|15792.209085801891| 3.528117858638387| 3.414847603497772|
| stddev|78155.50018207252|30982.096920031716|1.0649537415123238|0.7465019197404218|
|    min|                1|                 1|               0.5|        -10.415044|
|    max|           270896|            176249|               5.0|          15.47142|
+-------+-----------------+------------------+------------------+------------------+

Evaluation

The RMSE is 0.8387864404987665 for these 26,024,289 data points. On an earlier run with 100,004 data points it was 1.1244220. Thus, adding additional points increased prediction performance.

evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating')
rmse = evaluator.evaluate(predictions)
rmse
0.8387864404987665

Conclusion

This post considered a brief overview of Apache Spark and implemented a recommender system with the Python API using the Databricks platform. Spark is quite intuitive and makes a fairly complex algorithm very easy to use. To find out more check out the Spark documentation.

 References

  1. Chambers, Bill, and Matei Zaharia. Spark the definitive guide. Big Data Processing Made Simple. “ O’Reilly Media, Inc.”, 2018.

  2. Google Cloud. Hybrid Recommendation Systems. Coursera, https://www.coursera.org/learn/recommendation-models-gcp/lecture/1QtFl/hybrid-recommendation-systems.

  3. Karau, Holden, et al. Learning spark: lightning-fast big data analysis. “ O’Reilly Media, Inc.”, 2015.

  4. Portilla, Jose. Recommender System: Code Along Project. Udemy, https://www.udemy.com/spark-and-python-for-big-data-with-pyspark/learn/v4/t/lecture/7047202?start=0.

  5. Spark.apache.org. (2018). Collaborative Filtering - Spark 2.2.0 Documentation. [online] Available at: https://spark.apache.org/docs/2.2.0/ml-collaborative-filtering.html [Accessed 12 Feb. 2018].