Difference between revisions of "Apache Spark"

From Christoph's Personal Wiki
Jump to: navigation, search
(Resilient Distributed Datasets (RDDs))
(Blog posts)
 
(24 intermediate revisions by the same user not shown)
Line 97: Line 97:
  
 
;Example - Spark at Pinterest
 
;Example - Spark at Pinterest
* Business Problem: Provide a recommendation and visual bookmarking tool that lets users discover, save, and share ideas. Also get an immediate view of Pinterest engagement activity with high-thoughput and minimal latency.
+
* Business Problem: Provide a recommendation and visual bookmarking tool that lets users discover, save, and share ideas. Also get an immediate view of Pinterest engagement activity with high-throughput and minimal latency.
 
* Requirements:
 
* Requirements:
 
** Real-time analytics to process user's activity.
 
** Real-time analytics to process user's activity.
Line 205: Line 205:
 
* The driver program divides the user program into tasks and sends them to the executors.
 
* The driver program divides the user program into tasks and sends them to the executors.
 
* The executors run the tasks, computes, saves results (either in-memory or to disk), and returns results to the driver.
 
* The executors run the tasks, computes, saves results (either in-memory or to disk), and returns results to the driver.
* When the driver's <code>main()</code> function exits or <main>SparkContext.stop()</code> is called, the executors are terminated and the cluster manager releases the resources.
+
* When the driver's <code>main()</code> function exits or <code>SparkContext.stop()</code> is called, the executors are terminated and the cluster manager releases the resources.
  
 
===Resilient Distributed Datasets (RDDs)===
 
===Resilient Distributed Datasets (RDDs)===
Line 236: Line 236:
  
 
;Operations on RDDs
 
;Operations on RDDs
 +
[[File:Spark lineage graph.png|right|Spark lineage graph]]
 
* Transformations:
 
* Transformations:
 
** Create a new dataset from an existing one
 
** Create a new dataset from an existing one
Line 256: Line 257:
 
**:<pre>badLinesRDD.take(10)</pre>
 
**:<pre>badLinesRDD.take(10)</pre>
  
[[File:Spark lineage graph.png|right|Spark lineage graph]]
 
 
;Lazy evaluation and lineage graph of RDDs
 
;Lazy evaluation and lineage graph of RDDs
 
* When an RDD is created, a lineage graph is created (a DAG);
 
* When an RDD is created, a lineage graph is created (a DAG);
Line 281: Line 281:
 
* Spark applications are essentially the manipulation of RDDs through transformations and actions.
 
* Spark applications are essentially the manipulation of RDDs through transformations and actions.
 
* Operations are invoked on RDDs by executing functions on each element of the RDD.
 
* Operations are invoked on RDDs by executing functions on each element of the RDD.
* Spark offers over 80 high-level operations beyond Map and Reduce.
+
* Spark offers over 80 high-level operations (for both transformations and actions) beyond Map and Reduce.
 
* Common transformation operators: map, file, distinct, flatMap
 
* Common transformation operators: map, file, distinct, flatMap
 
* Common action operators: collect, count, take, reduce
 
* Common action operators: collect, count, take, reduce
Line 292: Line 292:
  
 
; Transformation examples on RDD
 
; Transformation examples on RDD
*:<pre>my_rdd = {4,5,10,10}</pre>
+
* Note: The following examples use Scala syntax.
*Apply a function to each element in the RDD and return new RDD
+
*:<pre>rdd = {4,5,10,10}</pre>
 +
* Apply a function to each element in the RDD and return new RDD:
 
*:<pre>rdd.map(x => x + 1)  #=> {5,6,11,11}</pre>
 
*:<pre>rdd.map(x => x + 1)  #=> {5,6,11,11}</pre>
 +
* Apply a function to each element in the RDD and return elements that are true for the function:
 +
*:<pre>rdd.filter(x => x < 10)  #=> {4,5}</pre>
 +
* Remove duplicate elements in the RDD and return new RDD:
 +
*:<pre>rdd.distinct()  #=> {4,5,10}</pre>
 +
* Apply a function to each element in the RDD and then flatten results (similar to map, but the function returns a sequence rather than a single element):
 +
*:<pre>rdd.flatmap(x => list(x-1, x+1)  #=> {3,5,4,6,9,11,9,11}</pre>
 +
 +
; Action examples on RDD
 +
*:<pre>rdd = {4,5,10,10}</pre>
 +
* Return all elements from the RDD to the driver:
 +
*:<pre>rdd.collect()  #=> {4,5,10,10}</pre>
 +
* Return number of elements in the RDD:
 +
*:<pre>rdd.count()  #=> 4</pre>
 +
* Return ''n'' elements from the RDD:
 +
*:<pre>rdd.take(2)  #=> {4,5}</pre>
 +
* Combine elements in the RDD in parallel:
 +
*:<pre>rdd.reduce((x,y) => x+y)  #=> 29</pre>
 +
 +
; Example of simple transformations and actions together:
 +
<pre>
 +
data = xrange(1,30)  # initialize data
 +
 +
#== SparkContext ==
 +
xrangeRDD = sc.parallelize(data, 4)
 +
# Dataset is broken into 4 partitions by driver and sent to workers' memory. E.g.:
 +
# RDD_p1 = {1,2,3,4,5,6} => worker #1's memory
 +
# RDD_p2 = {7,8,9,10} => worker #1's memory
 +
# RDD_p3 = {11,12,13,14,15,16,17,18,19,20,21,22,23,24 => worker #2's memory
 +
# RDD_p4 = {25,26,27,28,29,30} => worker #3's memory
 +
 +
#== Transformations ==
 +
subRDD = xrangeRDD.map(lambda x: x-1)
 +
# {1,2,3,4,5,6} => {0,1,2,3,4,5}
 +
# {7,8,9,10} => {6,7,8,9}
 +
# Etc.
 +
 +
filteredRDD = xrangeRDD.filter(lambda x: x<10)
 +
# {1,2,3,4,5,6} => {1,2,3,4,5,6}
 +
# {7,8,9,10} => {7,8,9}
 +
 +
#== Actions ==
 +
subRDD.collect() # Gathers the entries from all partitions into the driver
 +
# {0,1,2,...,29}
 +
subRDD.count() # Results sent to SparkContext where they are summed => 30
 +
</pre>
 +
 +
===Key-value pair RDDs===
 +
 +
* Pair RDDs are RDDs containing key-value pairs.
 +
* Commonly used in applications involving a customerID or eventID, or some unique identifier.
 +
* Operations are invoked on RDDs based on the keys (e.g., group data across nodes by key).
 +
* Pair RDDs can be created by directly loading key-value data.
 +
* Pair RDDs can also be created using <code>map()</code>, which returns the key-value pair.
 +
* Operations like shuffle are only available on Pair RDDs.
 +
 +
; Example - Word count
 +
* The goal is to count the frequency of each word in a document.
 +
* Used in the [[:wikipedia:Bag-of-words model|bag-of-words model]] in Natural Language Processing (NLP) and information retrieval.
 +
* Applications include:
 +
** Document classification
 +
** Finding matching documents on web searches (e.g., looking for plagarism)
 +
** Email spam classification
 +
* Input: "Bob likes to watch films. Alice likes films too."
 +
* Output: <code>[("Bob",1),("likes",2),("to",1),("watch",1),("films",2),("Alice",1),("too",1)]</code>
 +
<pre>
 +
# Python method #1:
 +
input = sc.textFile("data.txt")
 +
lines = input.flatMap(lambda line: line.split())
 +
pairs = lines.map(lambda word: (word, 1))
 +
counts = pairs.reduceByKey(lambda a, b: a + b)
 +
 +
# Python method #2 (generally slower than method #1):
 +
input = sc.textFile("data.txt")
 +
lines = input.flatMap(labmda line: line.split())
 +
pairs = lines.map(lambda word: (word, 1))
 +
groups = pairs.groupByKey()
 +
counts = groups.map(lambda (word, count): (word, sum(count))
 +
 +
# Scala
 +
val input = sc.textFile("data.txt")
 +
val lines = input.flatMap(l => l.split(" "))
 +
val pairs = lines.map(word => (word, 1))
 +
val counts = pairs.reduceByKey(_ + _)
 +
</pre>
 +
 +
; Compare <code>reduceByKey</code> with <code>groupByKey</code>
 +
* Aggregate operations on Pair RDDs trigger a shuffle of data.
 +
* Shuffle occurs to transfer all data with the same key to the same worker node.
 +
* <code>reduceByKey</code> combines data on each worker node such that only one value per key is sent of the network to the reducer worker nodes.
 +
* <code>groupByKey</code> sends all data over the network to reducer worker nodes and is, therefore, less efficient.
 +
* Thus, <code>reduceByKey</code> is generally preferred over <code>groupByKey</code>.
 +
 +
==Installing Spark==
 +
 +
; Overview of install steps
 +
* Install Java and verify installation (since Spark runs inside a JVM)
 +
* Download and extract Spark package
 +
* Configure Spark environment
 +
* Invoke the Spark shell and verify Spark installation
 +
 +
===Install Oracle Java 8 JDK===
 +
 +
; CentOS
 +
 +
Note: Visit the [http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html Oracle Java 8 JDK Downloads] page, accept the license agreement, and copy the download link for the Linux 64-bit RPM.
 +
 +
$ RPM=<nowiki>http://download.oracle.com/otn-pub/java/jdk/8u121-b13/e9e7ea248e2c4826b92b3f075a80e441/jdk-8u121-linux-x64.rpm</nowiki>
 +
 +
Replace the above link with the latest link from Oracle's website.
 +
 +
$ wget --no-cookies --no-check-certificate \
 +
    --header "Cookie: gpw_e24=http%3A%2F%2Fwww.oracle.com%2F; oraclelicense=accept-securebackup-cookie" \
 +
    "${RPM}"
 +
$ sudo yum localinstall jdk-8u121-linux-x64.rpm
 +
$ java -version
 +
 +
Java should now be installed at <code>/usr/java/jdk1.8.0_121/jre/bin/java</code> and linked from <code>/usr/bin/java</code>.
 +
 +
; Ubuntu
 +
 +
$ sudo apt-add-repository ppa:webupd8team/java
 +
$ sudo apt-get update
 +
$ sudo apt-get install oracle-java8-installer
 +
$ java -version
 +
 +
Java should now be installed at <code>/usr/lib/jvm/java-8-oracle/jre/bin/java</code> and linked from <code>/usr/bin/java</code>.
 +
 +
===Install Apache Spark===
 +
 +
* Download the Spark tarball from the [http://spark.apache.org/downloads.html Apache Spark Download] page.
 +
 +
$ sudo mkdir -p /opt/spark
 +
$ tar zxvf spark-2.1.0-bin-hadoop2.7.tgz -O /opt/spark
 +
 +
* Add the following lines to your <code>~/.bashrc</code> file:
 +
export SPARK_HOME=/opt/spark
 +
export PATH=$PATH:/opt/spark/bin
 +
 +
$ source ~/.bashrc
 +
$ cd $SPARK_HOME
 +
$ vi ./bin/load_env.sh  # add the following line to the top of the file:
 +
export SPARK_LOCAL_IP=127.0.0.1
 +
 +
==Using Spark==
 +
 +
; REPL (Read–eval–print loop)
 +
* Python REPL invoked by executing <code>pyspark</code>
 +
* Scala REPL invoked by executing <code>spark-shell</code>
 +
* SparkContext is initialized automatically in the REPL and available as "<code>sc</code>"
 +
* As of March 2017, Java does not support REPL
 +
 +
; Spark applications
 +
* These are what are used in a production environment.
 +
* The <code>./bin/spark-submit</code> script is used to launch bundled Spark applications.
 +
* Scala applications are typically bundled using <code>sbt</code> (simple build tool).
 +
* Java applications are typically bundled using Maven.
 +
* Python programs are not required to be packaged, since the <code>pyspark</code> script takes care of setting up dependencies.
 +
* SparkContext must be manually initialized in Spark applications, unlike in the REPL.
 +
 +
; Spark submit example
 +
* Run application locally on 8 cores:
 +
<pre>
 +
$ ./bin/spark-submit \
 +
    --class org.apache.spark.examples.SparkPi \
 +
    --master local[8] \
 +
    ./examples/jars/spark-examples_2.11-2.1.0.jar 2>/tmp/run.log
 +
</pre>
 +
 +
Running the above should return:
 +
Pi is roughly 3.1431557157785788
 +
 +
The <code>/tmp/run.log</code> file will have the details for the Spark run log.
 +
 +
===Pyspark===
 +
NOTE: If you have IPython installed, set the following to force pyspark to use IPython for its REPL:
 +
export PYSPARK_DRIVER_PYTHON=ipython
 +
 +
* Navigate to the Spark installation directory and invoke the Spark Python REPL (interactive shell) using the "pyspark" command:
 +
cd $SPARK_HOME
 +
./bin/pyspark
 +
 +
* Create an RDD of strings using method <code>textfile()</code>, count the number of lines, and print the first line using <code>first()</code>:
 +
input = sc.textFile("README.md")
 +
input.count()
 +
input.first()
 +
 +
* Call the <code>filter()</code> transformation to filter lines containing the word "Python":
 +
pythonLines = input.filter(lambda line : "Python" in line)
 +
pythonLines.collect()
 +
 +
* Create an RDD using <code>parallelize()</code> on an array of words and check its contents calling the <code>collect()</code> method:
 +
words = sc.parallelize(['pencil', 'paper', 'computer', 'mouse'])
 +
words.collect()
 +
 +
* Write a map function to append the letter "s" to all the words in the above RDD:
 +
pluralWords = words.map(lambda word: word + 's')
 +
pluralWords.collect()
 +
 +
* Create an RDD using <code>parallelize()</code> on an array of integers:
 +
nums = sc.parallelize([1, 2, 3, 4, 5])
 +
nums.collect()
 +
 +
* Write a <code>map()</code> function to compute the square of each integer in the above RDD:
 +
squaredNums = nums.map(lambda num : num * num)
 +
squaredNums.collect()
 +
 +
* Write a <code>reduce()</code> function that computes the sum of all squared integers:
 +
squaredNums.reduce(lambda x, y : x + y)
 +
 +
* Example word count script:
 +
lines = sc.textFile("README.md")
 +
words = lines.flatMap(lambda line: line.split(" "))
 +
counts = words.map(lambda word: (word, 1))
 +
total = counts.reduceByKey(lambda x, y : x + y)
 +
# Filter words where the frequency is greater than 10
 +
total.filter(lambda x : x[1] > 10).collect()
 +
# [(u'', 72), (u'Spark', 16), (u'for', 12), (u'the', 24), (u'to', 17)]
 +
 +
* A more complete word count script:
 +
$ cat data.txt
 +
Bob likes to watch films. Alice likes films too.
 +
<pre>
 +
# Python script v1
 +
def remove(x):
 +
    if x.endswith('.'):
 +
        x = x.replace('.', '')
 +
    return x
 +
 +
infile = sc.textFile("data.txt")
 +
lines = infile.flatMap(lambda line: line.split())
 +
pairs = lines.map(lambda word: (remove(word), 1))
 +
groups = pairs.groupByKey()
 +
counts = groups.map(lambda (word, count): (word, sum(count)))
 +
output = counts.collect()
 +
for (word, count) in output:
 +
    print("%s: %i" % (word, count))
 +
</pre>
 +
<pre>
 +
# Python script v2
 +
from operator import add
 +
lines = spark.read.text("data.txt").rdd.map(lambda r: r[0])
 +
counts = lines.flatMap(lambda line: line.split()) \
 +
              .map(lambda word: (remove(word), 1)) \
 +
              .reduceByKey(add)
 +
output = counts.collect()
 +
for (word, count) in output:
 +
    print("%s: %i" % (word, count))
 +
#watch: 1
 +
#Alice: 1
 +
#to: 1
 +
#likes: 2
 +
#films: 2
 +
#Bob: 1
 +
#too: 1
 +
</pre>
 +
 +
==Spark MLlib==
 +
 +
; Spark MLlib, a Machine Learning library
 +
* The goal is to make practical Machine Learning (ML) scalable and easy.
 +
* Includes common ML algorithms (e.g., classification, regression, clustering, collaborative filtering, etc.).
 +
* Provides utilities for feature extraction, transformation, dimensionality reduction, and selection.
 +
* Provides tools for constructing ML pipelines and evaluating and tuning them.
 +
* Supports persistence of models and pipelines.
 +
* Includes convenient utilities for linear algebra, statistics, data handling, etc.
 +
 +
; Example use cases for Spark MLlib
 +
* Fraud detection: Spark Streaming and ML applied to prevent fraud.
 +
* Network intrusion detection: ML applied to detect cyber attacks.
 +
* Customer segmentation and personalization: Spark SQL and ML applied to maximize customer lifetime value.
 +
* Real-time ad targeting: Spark used to maximize Online ad revenues.
 +
* Predictive healthcare: Spark used to optimize healthcare costs.
 +
* Genomics analysis to provide precision medicine.
 +
 +
; Spark ML pipelines
 +
Provides a high-level API that helps users create and tune practical ML pipelines. Allows one to combine multiple ML algorithms and utilities into a single pipeline.
 +
 +
Key concepts in the Pipeline API:
 +
* DataFrame: Is the ML dataset and can hold a variety of data types.
 +
* Transformer: Is an algorithm that can transform one DataFrame into another DataFrame.
 +
* Estimator: Is an algorithm that can be fit on a DataFrame to produce a Transformer.
 +
* Pipeline: A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.
 +
* Parameter: This part of the API allows specifying parameters on all Transformers and Estimators.
 +
 +
; DataFrame
 +
* ML can be applied to a wide variety of data types (e.g., images, text, audio clips, numerical data, etc.). The ML dataset can hold a variety of data types.
 +
* The DataFrame API supports a variety of data types and is, therefore, well-suited for ML.
 +
* They are conceptually equivalent to a table in a relational database or a data frame in R/Python (e.g., Pandas).
 +
* Columns in a DataFrame are named.
 +
* Can be constructed from a wide array of sources (e.g., data files, tables in Hive, external databases, or existing RDDs).
 +
* DataFrames contain optimizations under-the-hood for better performance.
 +
 +
; Transformer
 +
* Is an algorithm that transforms one DataFrame into another, generally by appending one or more columns.
 +
* Can be a feature transformer that converts a column in a DataFrame to another type and appends the new column.
 +
* Can be a learning model that reads the features in the DataFrame, makes predictions, and appends the predicted label to the DataFrame.
 +
* Implements the <code>transform()</code> method (i.e., perform the transformation).
 +
* Feature Extraction utilities are Transformers that transform the input DataFrame by appending a column that contains the new feature. Feature Extraction involves extracting features from raw data.
 +
 +
; Estimator
 +
* Abstracts the concept of a learning algorithm that trains on the data.
 +
* Implements the method <code>fit()</code>, which accepts a DataFrame and produces a Model, which is a Transformer.
 +
* For example, a learning algorithm like Logistic Regression is an Estimator and calling <code>fit()</code> trains a Logistic Regression Model, which is a Model and a Transformer.
 +
 +
; Pipeline
 +
* Combines multiple transformers and estimators into a pipeline.
 +
* A Pipeline is specified as a sequence of stages and each stage is a Transformer or an Estimator.
 +
* Stages are specified as an ordered array.
 +
* In a linear pipeline, each stage uses data produced by the previous stage.
 +
* A non-linear pipeline is valid as long as it forms a Direct Acyclic Graph (DAG). A graph is specified based on the input and output column names of each stage.
 +
* Pipelines help ensure that training and test data go through identical feature extraction steps.
 +
* Tools used to tune ML pipelines include: CrossValidator, TrainValidationSplit
 +
 +
; Parameters
 +
* Parameters can be specified on Estimators and Transformers for tuning the algorithms and models.
 +
* Example: If <code>lr</code> is an instance of Logistic Regression, call <code>lr.setMaxIter(10)</code> to make <code>lr.fit()</code> use at most 10 iterations.
 +
* A <code>Param</code> is a named parameter, while a <code>ParamMap</code> is a set of <code>(parameter, value)</code> pairs.
 +
 +
; Spark ML Pipeline example
 +
A text document classification pipeline has the following workflow:
 +
* Training workflow: Input is a set of text documents, where each document is labelled. Stages while training the ML model are:
 +
** Split each text document into words;
 +
** Convert each document's words into a numerical feature vector; and
 +
** Create a prediction model using the feature vectors and labels.
 +
* Test/prediction workflow: Input is a set of text documents and the goal is to predict a label for each document. Stages while testing or making predictions with the ML model are:
 +
** Split each text document into words;
 +
** Convert each document's words into a numerical feature vector; and
 +
** Use the trained model to make predictions on the feature vector.
 +
 +
===Spark MLlib example===
 +
 +
''Note: The following example is taken from the <code>examples/src/main/python/ml/pipeline_example.py</code> example script in the Spark tarball.''
 +
 +
* First, set the <code>PYTHONPATH</code> (add the following to your <code>~/.bashrc</code> file):
 +
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH
 +
 +
<pre>
 +
from pyspark.ml import Pipeline
 +
from pyspark.ml.classification import LogisticRegression
 +
from pyspark.ml.feature import HashingTF, Tokenizer
 +
from pyspark.sql import SparkSession
 +
 +
spark = SparkSession\
 +
    .builder\
 +
    .appName("PipelineExample")\
 +
    .getOrCreate()
 +
 +
# Prepare training documents from a list of (id, text, label) tuples.
 +
training = spark.createDataFrame([
 +
    (0, "a b c d e spark", 1.0),
 +
    (1, "b d", 0.0),
 +
    (2, "spark f g h", 1.0),
 +
    (3, "hadoop mapreduce", 0.0)
 +
], ["id", "text", "label"])
 +
 +
# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
 +
tokenizer = Tokenizer(inputCol="text", outputCol="words")
 +
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
 +
lr = LogisticRegression(maxIter=10, regParam=0.001)
 +
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
 +
 +
# Fit the pipeline to training documents.
 +
model = pipeline.fit(training)
 +
 +
# Prepare test documents, which are unlabeled (id, text) tuples.
 +
test = spark.createDataFrame([
 +
    (4, "spark i j k"),
 +
    (5, "l m n"),
 +
    (6, "spark hadoop spark"),
 +
    (7, "apache hadoop")
 +
], ["id", "text"])
 +
 +
# Make predictions on test documents and print columns of interest.
 +
prediction = model.transform(test)
 +
selected = prediction.select("id", "text", "probability", "prediction")
 +
for row in selected.collect():
 +
    rid, text, prob, prediction = row
 +
    print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))
 +
 +
spark.stop()
 +
</pre>
 +
 +
The above simple script should return the following for Spark MLlib predictions for the test documents:
 +
<pre>
 +
(4, spark i j k) --> prob=[0.159640773879,0.840359226121], prediction=1.000000
 +
(5, l m n) --> prob=[0.837832568548,0.162167431452], prediction=0.000000
 +
(6, spark hadoop spark) --> prob=[0.0692663313298,0.93073366867], prediction=1.000000
 +
(7, apache hadoop) --> prob=[0.982157533344,0.0178424666556], prediction=0.000000
 +
</pre>
 +
 +
So, for this ''very simple'' example, Spark MLlib made perfect predictions (i.e., all documents with the word "spark" in them were correctly labeled).
 +
 +
==Spark SQL==
 +
 +
Spark SQL is:
 +
* A Spark module acting as a distributed SQL query engine
 +
* Used for structured data processing and allows running SQL-like queries
 +
* Data from different formats (e.g., JSON, Parquet, Hive tables, etc.) can be consumed
 +
* Allows combining SQL with Spark's programming API
 +
* Can perform ETL on data and then run ''ad hoc'' querying
 +
* SQLContext encapsulates all relational functionality in Spark
 +
* Starting with Spark 2.0, SparkSession replaces SparkContext and SQLContext
 +
 +
;Spark SQL DataFrame and DataSet API
 +
* Used the DataFrame API (introduced in Spark 1.3) or the DataSet API (introduced in Spark 1.6)
 +
* The DataFrame API introduces the concept of a schema to describe data, similar to a relational table
 +
* The DataFrame API provides further optimization of query plans using Spark's Catalyst optimizer
 +
* The DataSet API combines the best of RDDs and DataFrame APIs. It allows object oriented style programming like RDDs and utilizes performance benefits of the Catalyst query optimizer
 +
* Inter-operates with RDDs. Can convert existing RDDs into DataFrames.
 +
 +
;Spark SQL example using the DataFrame API
 +
''Note: This example was taken from the <code>examples/src/main/python/sql/basic.py</code> script in the Spark tarball.''
 +
<pre>
 +
$ cat people.json
 +
{"name":"Michael"}
 +
{"name":"Andy", "age":30}
 +
{"name":"Justin", "age":19}
 +
</pre>
 +
<pre>
 +
spark = SparkSession \
 +
    .builder \
 +
    .appName("Python Spark SQL basic example") \
 +
    .config("spark.some.config.option", "some-value") \
 +
    .getOrCreate()
 +
 +
df = spark.read.json("people.json")
 +
# Displays the content of the DataFrame to stdout
 +
df.show()
 +
# +----+-------+
 +
# | age|  name|
 +
# +----+-------+
 +
# |null|Michael|
 +
# |  30|  Andy|
 +
# |  19| Justin|
 +
# +----+-------+
 +
 +
# spark, df are from the previous example
 +
# Print the schema in a tree format
 +
df.printSchema()
 +
# root
 +
# |-- age: long (nullable = true)
 +
# |-- name: string (nullable = true)
 +
 +
# Select only the "name" column
 +
df.select("name").show()
 +
# +-------+
 +
# |  name|
 +
# +-------+
 +
# |Michael|
 +
# |  Andy|
 +
# | Justin|
 +
# +-------+
 +
 +
# Select everybody, but increment the age by 1
 +
df.select(df['name'], df['age'] + 1).show()
 +
# +-------+---------+
 +
# |  name|(age + 1)|
 +
# +-------+---------+
 +
# |Michael|    null|
 +
# |  Andy|      31|
 +
# | Justin|      20|
 +
# +-------+---------+
 +
 +
# Select people older than 21
 +
df.filter(df['age'] > 21).show()
 +
# +---+----+
 +
# |age|name|
 +
# +---+----+
 +
# | 30|Andy|
 +
# +---+----+
 +
 +
# Count people by age
 +
df.groupBy("age").count().show()
 +
# +----+-----+
 +
# | age|count|
 +
# +----+-----+
 +
# |  19|    1|
 +
# |null|    1|
 +
# |  30|    1|
 +
# +----+-----+
 +
 +
# Register the DataFrame as a SQL temporary view
 +
df.createOrReplaceTempView("people")
 +
 +
sqlDF = spark.sql("SELECT * FROM people")
 +
sqlDF.show()
 +
# +----+-------+
 +
# | age|  name|
 +
# +----+-------+
 +
# |null|Michael|
 +
# |  30|  Andy|
 +
# |  19| Justin|
 +
# +----+-------+
 +
 +
# Register the DataFrame as a global temporary view
 +
df.createGlobalTempView("people")
 +
 +
# Global temporary view is tied to a system preserved database `global_temp`
 +
spark.sql("SELECT * FROM global_temp.people").show()
 +
# +----+-------+
 +
# | age|  name|
 +
# +----+-------+
 +
# |null|Michael|
 +
# |  30|  Andy|
 +
# |  19| Justin|
 +
# +----+-------+
 +
 +
# Global temporary view is cross-session
 +
spark.newSession().sql("SELECT * FROM global_temp.people").show()
 +
# +----+-------+
 +
# | age|  name|
 +
# +----+-------+
 +
# |null|Michael|
 +
# |  30|  Andy|
 +
# |  19| Justin|
 +
# +----+-------+
 +
</pre>
 +
 +
;Running SQL queries programmatically
 +
''Note: This example was taken from the <code>examples/src/main/python/sql/basic.py</code> script in the Spark tarball.''
 +
The <code>sql</code> function on a SparkSession enables applications to run SQL queries programmatically and returns the result as a DataFrame:
 +
<pre>
 +
$ cat people.txt
 +
Michael, 29
 +
Andy, 30
 +
Justin, 19
 +
</pre>
 +
<pre>
 +
spark = SparkSession \
 +
    .builder \
 +
    .appName("Python Spark SQL basic example") \
 +
    .config("spark.some.config.option", "some-value") \
 +
    .getOrCreate()
 +
 +
sc = spark.sparkContext
 +
 +
# Load a text file and convert each line to a Row.
 +
lines = sc.textFile("people.txt")
 +
parts = lines.map(lambda l: l.split(","))
 +
# Each line is converted to a tuple.
 +
people = parts.map(lambda p: (p[0], p[1].strip()))
 +
 +
# The schema is encoded in a string.
 +
schemaString = "name age"
 +
 +
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
 +
schema = StructType(fields)
 +
 +
# Apply the schema to the RDD.
 +
schemaPeople = spark.createDataFrame(people, schema)
 +
 +
# Creates a temporary view using the DataFrame
 +
schemaPeople.createOrReplaceTempView("people")
 +
 +
# SQL can be run over DataFrames that have been registered as a table.
 +
results = spark.sql("SELECT name FROM people")
 +
 +
results.show()
 +
# +-------+
 +
# |  name|
 +
# +-------+
 +
# |Michael|
 +
# |  Andy|
 +
# | Justin|
 +
# +-------+
 +
</pre>
 +
 +
==Spark Streaming==
 +
 +
;Spark Streaming is
 +
* An extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams.
 +
* Simple Big Data analytics is no longer sufficient. What is needed is fast, real-time analysis of streaming Big Data sources.
 +
* Data can be ingested from many sources (e.g., Kafka, Flume, Twitter, ZeroMQ, Kinesis, or TCP sockets).
 +
* Data can further be processed using MLlib, Graph processing, or high-level functions like <code>map</code>, <code>reduce</code>, <code>join</code>, <code>window</code>, etc.
 +
* Processed data can be pushed out to file systems, databases, and live dashboards:
 +
<pre>
 +
Kafka
 +
Flum                          HDFS
 +
HDFS/S3 => Spark Streaming => Databases
 +
Kinesis                      Dashboards
 +
Twitter
 +
</pre>
 +
 +
;Internal process of Spark Streaming
 +
* Spark Streaming receives live input data streams and divides data into batches.
 +
* Batches of data are processed by the Spark engine to produce batches of results.
 +
* A high-level API, called DStream (discretized stream), represents a continuous stream of data.
 +
* Internally, DStream is represented as a sequence of RDDs.
 +
* DStreams can be created either from input data streams from sources such as Kafka, Flume, and Kinesis, or by applying high-level operations on other DStreams:
 +
<pre>
 +
input data stream => Spark Streaming => batches of input data => Spark Engine => batches of processed data
 +
</pre>
 +
 +
;Example - Network word count using Spark Streaming
 +
* Create a StreamingContext (the main entry point to all Streaming functionality):
 +
<pre>
 +
from pyspark import SparkContext
 +
from pyspark.streaming import StreamingContext
 +
 +
# Create a local StreamingContext with two local working threads and a batch interval of 1:
 +
second sc = SparkContext("local[2]", "NetworkWordCount")
 +
ssc = StreamingContext(sc, 1)
 +
</pre>
 +
* Create a DStream that represents streaming data from a TCP source:
 +
<pre>
 +
# Create a DStream that will connect to hostname:port (e.g., localhost:9999):
 +
lines = ssc.socketTextStream("localhost", 9999)
 +
</pre>
 +
* Define Map Reduce functions to count the number of words from the streaming data:
 +
<pre>
 +
# Split each line into words
 +
words = lines.flatMap(lambda line: line.split(" "))
 +
 +
# Count each word in each batch
 +
pairs = words.map(lambda word: (word, 1))
 +
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
 +
 +
# Print the first 10 elements of each RDD generated in this DStream to the console
 +
wordCounts.pprint()
 +
</pre>
 +
* Start processing after all the transformations have been setup:
 +
<pre>
 +
ssc.start()
 +
ssc.awaitTermination()
 +
</pre>
 +
<pre>
 +
# Complete script
 +
# Source: examples/src/main/python/streaming/network_wordcount.py
 +
from __future__ import print_function
 +
 +
import sys
 +
 +
from pyspark import SparkContext
 +
from pyspark.streaming import StreamingContext
 +
 +
if __name__ == "__main__":
 +
    if len(sys.argv) != 3:
 +
        print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
 +
        exit(-1)
 +
    sc = SparkContext(appName="PythonStreamingNetworkWordCount")
 +
    ssc = StreamingContext(sc, 1)
 +
 +
    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
 +
    counts = lines.flatMap(lambda line: line.split(" "))\
 +
                  .map(lambda word: (word, 1))\
 +
                  .reduceByKey(lambda a, b: a+b)
 +
    counts.pprint()
 +
 +
    ssc.start()
 +
    ssc.awaitTermination()
 +
</pre>
 +
* Start a netcat server in one terminal:
 +
$ nc -lk 9999
 +
* Submit the following Spark command in another terminal:
 +
$ ./bin/spark-submit network_wordcount.py localhost 9999 2>/dev/null
 +
* Any lines typed in the terminal running the netcat server will be counted and printed on the screen:
 +
<pre>
 +
terminal1> nc -lk 9999
 +
foo bar baz foo
 +
 +
terminal2> ./bin/spark-submit network_wordcount.py localhost 9999 2>/tmp/streaming.log
 +
-------------------------------------------
 +
Time: 2017-03-27 20:57:56
 +
-------------------------------------------
 +
(u'bar', 1)
 +
(u'baz', 1)
 +
(u'foo', 2)
 +
</pre>
 +
 +
==Spark GraphX==
 +
 +
;What is Spark GraphX?
 +
* It is a graph processing library with APIs to manipulate graphs and performing graph-parallel computations.
 +
* It extends the Spark RDD API by introducing a new Graph abstraction, a directed multi-graph with properties attached to each vertex and edge.
 +
* Like RDDs, property graphs are immutable, distributed, and fault-tolerant.
 +
* It provides various operators for manipulating graphs (e.g., subgraph and mapVertices).
 +
* It provides a library of common graph algorithms (e.g., PageRank and triangle counting).
 +
* It has a growing collection of algorithms and builders to simplify graph analytics tasks.
 +
 +
; GraphX example
 +
* Define a vertex array and edge array.
 +
* Construct RDDs from vertex and edge arrays.
 +
* Build a property graph using the RDDs of vertices and edges.
 +
* Perform filter operations on the graph (e.g., list users at least 30 years old).
 +
* Use the <code>graph.triplets</code> view to display relationships in the graph.
 +
* Example use cases: Graph social networks (e.g., LinkedIn, Facebook, Twitter, etc.)
 +
 +
==SparkR==
 +
 +
;What is SparkR?
 +
* SparkR is an R package that provides a light-weight frontend to use Apache Spark from within R.
 +
* SparkR provides a distributed data frame implementation, which supports operations like selection, filtering, aggregation, etc. on large datasets.
 +
* SparkR also supports distributed machine learning using MLlib.
 +
* SparkR uses the SparkDataFrame API. SparkDataFrames can be constructed from a wide array of sources (e.g., structured data files, tables in Hive, external databases, or existing local R data frames).
 +
* The SparkR shell can be invoked using the <code>./bin/sparkR</code> command.
 +
* You can also connect to SparkR from Rstudio or other R IDEs.
 +
* Allows converting R data frames to SparkDataFrames.
 +
* Can read from JSON, Parquet files, Hive, and other data sources.
 +
* <code>spark.lapply</code> (similar to <code>lapply</code> in native R) runs a function over a list of elements and distributes the computations with Spark.
 +
* A SparkDataFram can also be registered as a temporary view in Spark SQL and allows you to run SQL queries over its data.
 +
* SparkR support the following machine learning algorithms (as of March 2017):
 +
** Generalized Linear Model
 +
** Accelerated Failure Time (AFT)
 +
** Survival Regression Model
 +
** Naive Bayes Model
 +
** KMeans Model
 +
* Under the hood, SparkR uses MLlib to train the model.
  
 
==See also==
 
==See also==
Line 305: Line 1,013:
 
*[https://spark.apache.org/ Official website]
 
*[https://spark.apache.org/ Official website]
  
 +
===Blog posts===
 +
*[https://www.redapt.com/blog/apache-spark-for-machine-learning-part-1 Apache Spark for Machine Learning - Part 1] &mdash; by Christoph Champ
 +
*[https://www.redapt.com/blog/using-apache-spark-for-machine-learning-part-2 Apache Spark for Machine Learning - Part 2] &mdash; by Christoph Champ
 +
*[https://www.redapt.com/blog/using-apache-spark-for-machine-learning-part-3 Apache Spark for Machine Learning - Part 3] &mdash; by Christoph Champ
 +
*[https://www.redapt.com/resource/spark-on-amazon-emr Spark on Amazon AMR] &mdash; by Jonathan Dawson
 +
 +
[[Category:Machine Learning]]
 
[[Category:Technical and Specialized Skills]]
 
[[Category:Technical and Specialized Skills]]

Latest revision as of 18:29, 27 November 2019

Apache Spark is an open-source cluster-computing framework. Originally developed at the University of California, Berkeley's AMPLab, the Spark codebase was later donated to the Apache Software Foundation, which has maintained it since. Spark provides an interface for programming entire clusters with implicit data parallelism and fault-tolerance.

Introduction

What is Spark?
  • Fast and general engine for large-scale data processing and analysis
  • Parallel distributed processing on commodity hardware
  • Easy to use
  • A comprehensive, unified framework for Big Data analytics
  • Open source and a top-level Apache project
Spark use cases
  • Big Data use case like intrusion detection, product recommendations, estimating financial risks, detecting genomic associations to a disease, etc. require analysis on large-scale data.
  • In depth analysis requires a combination of tools like SQL, statistics, and machine learning to gain meaningful insights from data.
  • Historical choices like R and Octave operate only on single node machines and are, therefore, not suitable for large volume data sets.
  • Spark allows rich programming APIs like SQL, machine learning, graph processing, etc. to run on clusters of computers to achieve large-scale data processing and analysis.
Spark and distributed processing
  • Challenges of distributed processing:
    • Distributed programming is much more complex than single node programming
    • Data must be partitioned across servers, increasing the latency if data has to be shared between servers over the network
    • Chances of failure increases with the increase in the number of servers
  • Spark makes distributed processing easy:
    • Provides a distributed and parallel processing framework
    • Provides scalability
    • Provides fault-tolerance
    • Provides a programming paradigm that makes it easy to write code in a parallel manner
Spark and its speed
  • Lightning fast speeds due to in-memory caching and a DAG-based processing engine.
  • 100 times faster than Hadoop's MapReduce for in-memory computations and 10 time faster for on-disk.
  • Well suited for iterative algorithms in machine learning
  • Fast, real-time response to user queries on large in-memory data sets.
  • Low latency data analysis applied to processing live data streams
Spark is easy to use
  • General purpose programming model using expressive languages like Scala, Python, and Java.
  • Existing libraries and API makes it easy to write programs combining batch, streaming, interactive machine learning and complex queries in a single application.
  • An interactive shell is available for Python and Scala
  • Built for performance and reliability, written in Scala and runs on top of Java Virtual Machine (JVM).
  • Operational and debugging tools from the Java stack are available for programmers.
Spark components
  • Spark SQL
  • Spark Streaming
  • MLib (machine learning)
  • GraphX (graph)
  • Spark R
Spark is a comprehensive unified framework for Big Data analytics
  • Collapses the data science pipeline by allowing pre-processing of data to model evaluation in one single system.
  • Spark provides an API for data munging, Extract, transform, load (ETL), machine learning, graph processing, streaming, interactive, and batch processing. Can replace several SQL, streaming, and complex analytics systems with one unified environment.
  • Simplifies application development, deployment, and maintenance.
  • Strong integration with a variety of tools in the Hadoop ecosystem.
  • Can read and write to different data formats and data sources, including HDFS, Cassandra, S3, and HBase.
Spark is not a data storage system
  • Spark is not a data store but is versatile in reading from and writing to a variety of data sources.
  • Can access traditional business intelligence (BI) tools using a server mode that provides standard Java Database Connectivity (JDBC) and Open Database Connectivity (ODBC).
  • The DataFrame API provides a pluggable mechanism to access structured data using Spark SQL.
  • Its API provides tight optimization integration, thereby enhances the speed of the Spark jobs that process vast amounts of data.
History of Spark
  • Originated as a research project in 2009 at UC Berkeley AMPLab.
  • Motivated by MapReduce and the need to apply machine learning in a scalable fashion.
  • Open sourced in 2010 and transferred to Apache in 2013.
  • A top-level Apache project, as of 2017.
  • Spark is winner of Daytona GraySort contesting 2014, sorting a petabyte 3 times faster and using 10 times less hardware than Hadoop's MapReduce.
  • "Apache Spark is the Taylor Swift of big data software. The open source technology has been around and popular for a few years. But 2015 was the year Spark went from an ascendant technology to a bona fide superstar".[1]

Spark use cases

  • Fraud detection: Spark streaming an machine learning applied to prevent fraud
  • Network intrusion detection: Machine learning applied to detect cyber hacks
  • Customer segmentation and personalization: Spark SQL and machine learning applied to maximize customer lifetime value
  • Social media sentiment analysis: Spark streaming, Spark SQL, and Stanford's CoreNLP wrapper helps achieve sentiment analysis
  • Real-time ad targeting: Spark used to maximize Online ad revenues
  • Predictive healthcare: Spark used to optimize healthcare costs
Example - Spark at Uber
  • Business Problem: A simple problem of getting people around a city with an army of more than 100,000 drivers and to use data to intelligently perfect the business in an automated and real-time fashion.
  • Requirements:
    • Accurately pay drivers per trips in the dataset
    • Maximize profits by positioning vehicles optimally
    • Help drivers avoid accidents
    • Calculate surge pricing
  • Solution: Use Spark Streaming and Spark SQL as the ELT system and Spark MLlib and GraphX for advanced analytics
  • Reference: Talk by Uber engineers at Apache Spark meetup: https://youtu.be/zKbds9ZPjLE
Example - Spark at Netflix
  • Business Problem: A video streaming service with emphasis on data quality, agility, and availability. Using analytics to help users discover films and TV shows that they like is key to Netflix's success.
  • Requirements:
    • Streaming applications are long-running tasks that need to be resilient in Cloud deployments.
    • Optimize content buying
    • Renowned personalization algorithms
  • Solution: Use Spark Streaming in AWS and Spark GraphX for recommendation system.
  • Reference: Talk by Netflix engineers at Apache Spark meetup: https://youtu.be/gqgPtcDmLGs
Example - Spark at Pinterest
  • Business Problem: Provide a recommendation and visual bookmarking tool that lets users discover, save, and share ideas. Also get an immediate view of Pinterest engagement activity with high-throughput and minimal latency.
  • Requirements:
    • Real-time analytics to process user's activity.
    • Process petabytes of data to provide recommendations and personalization.
    • Apply sophisticated deep learning techniques to a Pin image in order to suggest related Pins.
  • Solution: Use Spark Streaming, Spark SQL, MemSSQL's Spark connector for real-time analytics, and Spark MLlib for machine learning use cases.
  • Reference: https://medium.com/@Pinterest_Engineering/real-time-analytics-at-pinterest-1ef11fdb1099#.y1cdhb9c3
Example - Spark at ADAM (a Big Data genomics project)
  • Business Problem: ADAM is a genomics analysis platform that provides large-scale analyses to support population-based genomics studies, which is essential for precision medicine.
  • Requirements:
    • Parallelize genomics analysis in the Cloud
    • Replace developing, custom distributed computing code.
    • Support for file formats well suited for genomic data, like Apache Parquet and Avro.
    • Support for languages like R, which are popular in the genomics community.
  • Solution: Use Spark on Amazon EMR
  • Reference: https://github.com/bigdatagenomics/adam and http://bdgenomics.org/
Example - Spark at Yahoo
  • Business Problem: Deep learning is critical for Yahoo's product teams to acquire intelligence from huge amounts of Online data. Examples are image recognition and speech recognition for improved search on photo sharing service Flickr.
  • Requirements:
    • Run deep learning software on existing infrastructure.
    • Distribute deep learning processes across multiple Big Data clusters.
    • Handle potential system failures on long running deep learning jobs.
  • Solution: Create a way to run deep learning system CaffeOnSpark.
  • Reference: https://youtu.be/bqj7nML-aHk

Architecture and components of Spark

The Spark stack:

+-----------------+ +-----------------+ +------------------+ +------------------+
|    Spark SQL    | | Spark Streaming | |      MLlib       | |      GraphX      |  <= APIs
| structured data | |    real-time    | | machine learning | | graph processing |
+-----------------+ +-----------------+ +------------------+ +------------------+

+-------------------------------------------------------------------------------+
|                                Spark Core                                     |
+-------------------------------------------------------------------------------+

+---------------------------+    +------------------+    +-----------------+
|   Stand alone scheduler   |    |   Apache Mesos   |    |   Hadoop YARN   |  <= Cluster managers
+---------------------------+    +------------------+    +-----------------+
Spark Core (the base engine)
  • Distributes workloads
  • Monitors applications across the cluster
  • Schedules tasks
  • Memory management
  • Fault recovery
  • Interacts with storage systems
  • Houses API that defines Resilient Distributed Datasets (RDDs) - the primary data abstraction in Spark
Spark API libraries
  • Spark SQL: Provides structured data processing
  • Spark Streaming: Enables processing of live streams of data
  • Spark MLlib: A library containing common machine learning functionality
  • GraphX: A library for manipulating graphs and performing graph-parallel computations
  • SparkR: An R package that provides a lightweight frontend to use Spark from within R
Spark cluster managers
  • Cluster managers allocate resources across applications on a cluster.
  • As of March 2017, the following are the cluster managers Spark supports:
    • Standalone — a simple cluster manager included with Spark that makes it easy to setup a cluster.
    • Apache Mesos — a general cluster manager that can also run Hadoop MapReduce and service applications.
    • Hadoop YARN — the resource manager in Hadoop 2.
Spark runtime architecture
  • In distributed mode, Spark uses a master/slave architecture.
  • The master is called the "driver" and the slaves are the "executors" (live on worker nodes)
  • Drivers and executors run in their own Java process.
  • A driver and its executors put together form a Spark application.
  • A Spark application is launched using the cluster manager.
SparkContext
  • Main entry point to everything Spark.
  • Defined in the main/driver program.
  • Tells Spark how and where to access a cluster.
  • Connects to cluster managers.
  • Coordinates Spark processes running on different cluster nodes.
  • Use to create RDDs and shared variables on the cluster.
The Driver
  • The process where the main() method of the Spark program runs.
  • Responsible for converting a user program into tasks.
  • The driver schedules the tasks on executors (live on worker nodes).
Executors (aka worker processes)
  • Launched once at the beginning of the application and typically run for the entire lifetime of an application.
  • Executors register themselves with the driver, thereby allowing the driver to schedule tasks on the executors.
  • Worker/executor processes run the individual tasks and return results to the driver.
  • Provide in-memory storage for RDDs, as well as disk storage.
Spark running on clusters
  • Application: User program built on Spark; consists of a driver program and executors on the cluster.
  • Cluster Manager: A pluggable service for acquiring resources on the cluster.
  • WOrker node: Any node that can run application code in the cluster.
  • Driver program: The process running the main() function of the application and creating the SparkContext.
  • Executor: A process launched on a worker node that runs tasks.
  • Task: The smallest unit of work sent to one executor. Tasks are bundled into "stages".
Flow of execution in a Spark program
  • User submits an application, which launches the driver program.
  • The driver program invokes the main() function specified by the user.
  • The driver program contacts the cluster manager to ask for resources to launch executors.
  • The cluster manager launches executors.
  • The driver program divides the user program into tasks and sends them to the executors.
  • The executors run the tasks, computes, saves results (either in-memory or to disk), and returns results to the driver.
  • When the driver's main() function exits or SparkContext.stop() is called, the executors are terminated and the cluster manager releases the resources.

Resilient Distributed Datasets (RDDs)

Data API in Spark
  • RDD API: The core data API (introduced in Spark 1.0).
  • DataFrame API: Uses a schema to describe the data (introduced in Spark 1.3). More suitable for query building.
  • DataSet API: Combines the best of the RDD and DataFrame APIs (released via Spark 2.0).
RDD basics
  • Core concept in Spark.
  • Abstraction for working with data in Spark.
  • RDD API in Python, Scala, and Java.
  • Simply described as a distributed collection of elements.
  • Spark automatically distributes RDDs across nodes in a cluster and can parallelize operations.
  • A fault-tolerant collection of elements, since they can be re-built from the lineage (i.e., it keeps track of how the RDD was constructed and what operations were performed on it).
  • An RDD is a read-only collection of elements, immutable once constructed.
Creating RDDs
  • Three methods for creating an RDD:
  1. Parallelize or distributing an existing collection (e.g., a list or a set)
    lines = sc.parallelize(["I", "am", "learning", "Spark"])
  2. Loading an external dataset from files on an external storage system
    lines = sc.textFile("README.md")
    or,
    text_file = spark.textFile("hdfs://...")
  3. Operations on an existing RDD
     newLines = lines.transform(...)
where sc = SparkContext
Operations on RDDs
Spark lineage graph
  • Transformations:
    • Create a new dataset from an existing one
    • Returns an RDD
    • Lazy evaluation, not computed immediately
    • Generally works element-wise
    • Relationships between transformations are recorded in a lineage graph, which is DAG
    • Examples:
      inputRDD = sc.textFile("log.txt")
      errorsRDD = inputRDD.filter(lambda x: "error" in x)
      warningRDD = inputRDD.filter(lambda x: "warning" in x)
      badLinesRDD = errorsRDD.union(warningRDD)
  • Actions:
    • RDDs are evaluated when an action is called (i.e., nothing happens on the above transformations until the action is called)
    • They are a mechanism to get results out of Spark
    • They return result(s) to the driver or to persistent storage
    • When an action is called, the entire RDD is computed from scratch. Therefore, it is a good practice to persist intermediate results.
    • Examples:
      print "Number of problems in log file: " + badLinesRDD.count()
      badLinesRDD.take(10)
Lazy evaluation and lineage graph of RDDs
  • When an RDD is created, a lineage graph is created (a DAG);
  • DAG is not evaluated until an action is called, thus a "lazy evaluation";
  • When more transformations are called and new RDDs are derived, the lineage graph is updated and keeps track of the relationships between RDDs; and
  • If a failure occurs in a given task, the DAG/lineage graph is replicated and each RDD can be computed on demand. Thus, it is fault-tolerant.
A Spark program lifecycle
  1. Create RDD from external data or existing collection;
  2. Lazily transform into new RDD;
  3. Cache() some RDD for repeated usage; and
  4. Perform actions on RDD to execute parallel operations and to compute results.
Where RDD operations are run
  1. RDD created at the driver and distributed to executors;
  2. Driver lazily evaluates transformations and creates a lineage graph;
  3. Upon encountering an action, the driver schedules tasks on the executors;
  4. Executors run in parallel and execute transformations and/or actions on RDDs;
  5. Executors have more memory and, therefore, can cache an RDD if necessary;
  6. Executors run actions to compute results and return results to the driver; and
  7. Drivers run actions on the results returned from executors.
RDD operations
  • Spark applications are essentially the manipulation of RDDs through transformations and actions.
  • Operations are invoked on RDDs by executing functions on each element of the RDD.
  • Spark offers over 80 high-level operations (for both transformations and actions) beyond Map and Reduce.
  • Common transformation operators: map, file, distinct, flatMap
  • Common action operators: collect, count, take, reduce
  • For a complete set of operators, refer to the Spark documentation on RDD (Python).
Passing functions to Spark
  • Operations are invoked on RDDs by passing functions to each element of the RDD.
  • In Python, pass lambda expressions for shorter functions or locally defined functions or top-level functions.
  • Note: While passing a function to Spark, if it is a member of an object or contains references to fields of an object, the entire object gets sent to worker nodes. A best practice is to extract fields into a local variable and pass that in.
Transformation examples on RDD
  • Note: The following examples use Scala syntax.
    rdd = {4,5,10,10}
  • Apply a function to each element in the RDD and return new RDD:
    rdd.map(x => x + 1)  #=> {5,6,11,11}
  • Apply a function to each element in the RDD and return elements that are true for the function:
    rdd.filter(x => x < 10)  #=> {4,5}
  • Remove duplicate elements in the RDD and return new RDD:
    rdd.distinct()  #=> {4,5,10}
  • Apply a function to each element in the RDD and then flatten results (similar to map, but the function returns a sequence rather than a single element):
    rdd.flatmap(x => list(x-1, x+1)  #=> {3,5,4,6,9,11,9,11}
Action examples on RDD
  • rdd = {4,5,10,10}
  • Return all elements from the RDD to the driver:
    rdd.collect()  #=> {4,5,10,10}
  • Return number of elements in the RDD:
    rdd.count()  #=> 4
  • Return n elements from the RDD:
    rdd.take(2)  #=> {4,5}
  • Combine elements in the RDD in parallel:
    rdd.reduce((x,y) => x+y)  #=> 29
Example of simple transformations and actions together
data = xrange(1,30)  # initialize data

#== SparkContext ==
xrangeRDD = sc.parallelize(data, 4)
# Dataset is broken into 4 partitions by driver and sent to workers' memory. E.g.:
# RDD_p1 = {1,2,3,4,5,6} => worker #1's memory
# RDD_p2 = {7,8,9,10} => worker #1's memory
# RDD_p3 = {11,12,13,14,15,16,17,18,19,20,21,22,23,24 => worker #2's memory
# RDD_p4 = {25,26,27,28,29,30} => worker #3's memory

#== Transformations ==
subRDD = xrangeRDD.map(lambda x: x-1)
# {1,2,3,4,5,6} => {0,1,2,3,4,5}
# {7,8,9,10} => {6,7,8,9}
# Etc.

filteredRDD = xrangeRDD.filter(lambda x: x<10)
# {1,2,3,4,5,6} => {1,2,3,4,5,6}
# {7,8,9,10} => {7,8,9}

#== Actions ==
subRDD.collect() # Gathers the entries from all partitions into the driver
# {0,1,2,...,29}
subRDD.count() # Results sent to SparkContext where they are summed => 30

Key-value pair RDDs

  • Pair RDDs are RDDs containing key-value pairs.
  • Commonly used in applications involving a customerID or eventID, or some unique identifier.
  • Operations are invoked on RDDs based on the keys (e.g., group data across nodes by key).
  • Pair RDDs can be created by directly loading key-value data.
  • Pair RDDs can also be created using map(), which returns the key-value pair.
  • Operations like shuffle are only available on Pair RDDs.
Example - Word count
  • The goal is to count the frequency of each word in a document.
  • Used in the bag-of-words model in Natural Language Processing (NLP) and information retrieval.
  • Applications include:
    • Document classification
    • Finding matching documents on web searches (e.g., looking for plagarism)
    • Email spam classification
  • Input: "Bob likes to watch films. Alice likes films too."
  • Output: [("Bob",1),("likes",2),("to",1),("watch",1),("films",2),("Alice",1),("too",1)]
# Python method #1:
input = sc.textFile("data.txt")
lines = input.flatMap(lambda line: line.split())
pairs = lines.map(lambda word: (word, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)

# Python method #2 (generally slower than method #1):
input = sc.textFile("data.txt")
lines = input.flatMap(labmda line: line.split())
pairs = lines.map(lambda word: (word, 1))
groups = pairs.groupByKey()
counts = groups.map(lambda (word, count): (word, sum(count))

# Scala
val input = sc.textFile("data.txt")
val lines = input.flatMap(l => l.split(" "))
val pairs = lines.map(word => (word, 1))
val counts = pairs.reduceByKey(_ + _)
Compare reduceByKey with groupByKey
  • Aggregate operations on Pair RDDs trigger a shuffle of data.
  • Shuffle occurs to transfer all data with the same key to the same worker node.
  • reduceByKey combines data on each worker node such that only one value per key is sent of the network to the reducer worker nodes.
  • groupByKey sends all data over the network to reducer worker nodes and is, therefore, less efficient.
  • Thus, reduceByKey is generally preferred over groupByKey.

Installing Spark

Overview of install steps
  • Install Java and verify installation (since Spark runs inside a JVM)
  • Download and extract Spark package
  • Configure Spark environment
  • Invoke the Spark shell and verify Spark installation

Install Oracle Java 8 JDK

CentOS

Note: Visit the Oracle Java 8 JDK Downloads page, accept the license agreement, and copy the download link for the Linux 64-bit RPM.

$ RPM=http://download.oracle.com/otn-pub/java/jdk/8u121-b13/e9e7ea248e2c4826b92b3f075a80e441/jdk-8u121-linux-x64.rpm

Replace the above link with the latest link from Oracle's website.

$ wget --no-cookies --no-check-certificate \
    --header "Cookie: gpw_e24=http%3A%2F%2Fwww.oracle.com%2F; oraclelicense=accept-securebackup-cookie" \
    "${RPM}"
$ sudo yum localinstall jdk-8u121-linux-x64.rpm
$ java -version

Java should now be installed at /usr/java/jdk1.8.0_121/jre/bin/java and linked from /usr/bin/java.

Ubuntu
$ sudo apt-add-repository ppa:webupd8team/java
$ sudo apt-get update
$ sudo apt-get install oracle-java8-installer
$ java -version

Java should now be installed at /usr/lib/jvm/java-8-oracle/jre/bin/java and linked from /usr/bin/java.

Install Apache Spark

$ sudo mkdir -p /opt/spark
$ tar zxvf spark-2.1.0-bin-hadoop2.7.tgz -O /opt/spark
  • Add the following lines to your ~/.bashrc file:
export SPARK_HOME=/opt/spark
export PATH=$PATH:/opt/spark/bin
$ source ~/.bashrc
$ cd $SPARK_HOME
$ vi ./bin/load_env.sh  # add the following line to the top of the file:
export SPARK_LOCAL_IP=127.0.0.1

Using Spark

REPL (Read–eval–print loop)
  • Python REPL invoked by executing pyspark
  • Scala REPL invoked by executing spark-shell
  • SparkContext is initialized automatically in the REPL and available as "sc"
  • As of March 2017, Java does not support REPL
Spark applications
  • These are what are used in a production environment.
  • The ./bin/spark-submit script is used to launch bundled Spark applications.
  • Scala applications are typically bundled using sbt (simple build tool).
  • Java applications are typically bundled using Maven.
  • Python programs are not required to be packaged, since the pyspark script takes care of setting up dependencies.
  • SparkContext must be manually initialized in Spark applications, unlike in the REPL.
Spark submit example
  • Run application locally on 8 cores:
$ ./bin/spark-submit \
    --class org.apache.spark.examples.SparkPi \
    --master local[8] \
    ./examples/jars/spark-examples_2.11-2.1.0.jar 2>/tmp/run.log

Running the above should return:

Pi is roughly 3.1431557157785788

The /tmp/run.log file will have the details for the Spark run log.

Pyspark

NOTE: If you have IPython installed, set the following to force pyspark to use IPython for its REPL:

export PYSPARK_DRIVER_PYTHON=ipython
  • Navigate to the Spark installation directory and invoke the Spark Python REPL (interactive shell) using the "pyspark" command:
cd $SPARK_HOME
./bin/pyspark
  • Create an RDD of strings using method textfile(), count the number of lines, and print the first line using first():
input = sc.textFile("README.md")
input.count()
input.first()
  • Call the filter() transformation to filter lines containing the word "Python":
pythonLines = input.filter(lambda line : "Python" in line)
pythonLines.collect()
  • Create an RDD using parallelize() on an array of words and check its contents calling the collect() method:
words = sc.parallelize(['pencil', 'paper', 'computer', 'mouse'])
words.collect()
  • Write a map function to append the letter "s" to all the words in the above RDD:
pluralWords = words.map(lambda word: word + 's')
pluralWords.collect()
  • Create an RDD using parallelize() on an array of integers:
nums = sc.parallelize([1, 2, 3, 4, 5])
nums.collect()
  • Write a map() function to compute the square of each integer in the above RDD:
squaredNums = nums.map(lambda num : num * num)
squaredNums.collect()
  • Write a reduce() function that computes the sum of all squared integers:
squaredNums.reduce(lambda x, y : x + y)
  • Example word count script:
lines = sc.textFile("README.md")
words = lines.flatMap(lambda line: line.split(" "))
counts = words.map(lambda word: (word, 1))
total = counts.reduceByKey(lambda x, y : x + y)
# Filter words where the frequency is greater than 10
total.filter(lambda x : x[1] > 10).collect()
# [(u, 72), (u'Spark', 16), (u'for', 12), (u'the', 24), (u'to', 17)]
  • A more complete word count script:
$ cat data.txt
Bob likes to watch films. Alice likes films too.
# Python script v1
def remove(x):
    if x.endswith('.'):
        x = x.replace('.', '')
    return x

infile = sc.textFile("data.txt")
lines = infile.flatMap(lambda line: line.split())
pairs = lines.map(lambda word: (remove(word), 1))
groups = pairs.groupByKey()
counts = groups.map(lambda (word, count): (word, sum(count)))
output = counts.collect()
for (word, count) in output:
    print("%s: %i" % (word, count))
# Python script v2
from operator import add
lines = spark.read.text("data.txt").rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda line: line.split()) \
              .map(lambda word: (remove(word), 1)) \
              .reduceByKey(add)
output = counts.collect()
for (word, count) in output:
    print("%s: %i" % (word, count))
#watch: 1
#Alice: 1
#to: 1
#likes: 2
#films: 2
#Bob: 1
#too: 1

Spark MLlib

Spark MLlib, a Machine Learning library
  • The goal is to make practical Machine Learning (ML) scalable and easy.
  • Includes common ML algorithms (e.g., classification, regression, clustering, collaborative filtering, etc.).
  • Provides utilities for feature extraction, transformation, dimensionality reduction, and selection.
  • Provides tools for constructing ML pipelines and evaluating and tuning them.
  • Supports persistence of models and pipelines.
  • Includes convenient utilities for linear algebra, statistics, data handling, etc.
Example use cases for Spark MLlib
  • Fraud detection: Spark Streaming and ML applied to prevent fraud.
  • Network intrusion detection: ML applied to detect cyber attacks.
  • Customer segmentation and personalization: Spark SQL and ML applied to maximize customer lifetime value.
  • Real-time ad targeting: Spark used to maximize Online ad revenues.
  • Predictive healthcare: Spark used to optimize healthcare costs.
  • Genomics analysis to provide precision medicine.
Spark ML pipelines

Provides a high-level API that helps users create and tune practical ML pipelines. Allows one to combine multiple ML algorithms and utilities into a single pipeline.

Key concepts in the Pipeline API:

  • DataFrame: Is the ML dataset and can hold a variety of data types.
  • Transformer: Is an algorithm that can transform one DataFrame into another DataFrame.
  • Estimator: Is an algorithm that can be fit on a DataFrame to produce a Transformer.
  • Pipeline: A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.
  • Parameter: This part of the API allows specifying parameters on all Transformers and Estimators.
DataFrame
  • ML can be applied to a wide variety of data types (e.g., images, text, audio clips, numerical data, etc.). The ML dataset can hold a variety of data types.
  • The DataFrame API supports a variety of data types and is, therefore, well-suited for ML.
  • They are conceptually equivalent to a table in a relational database or a data frame in R/Python (e.g., Pandas).
  • Columns in a DataFrame are named.
  • Can be constructed from a wide array of sources (e.g., data files, tables in Hive, external databases, or existing RDDs).
  • DataFrames contain optimizations under-the-hood for better performance.
Transformer
  • Is an algorithm that transforms one DataFrame into another, generally by appending one or more columns.
  • Can be a feature transformer that converts a column in a DataFrame to another type and appends the new column.
  • Can be a learning model that reads the features in the DataFrame, makes predictions, and appends the predicted label to the DataFrame.
  • Implements the transform() method (i.e., perform the transformation).
  • Feature Extraction utilities are Transformers that transform the input DataFrame by appending a column that contains the new feature. Feature Extraction involves extracting features from raw data.
Estimator
  • Abstracts the concept of a learning algorithm that trains on the data.
  • Implements the method fit(), which accepts a DataFrame and produces a Model, which is a Transformer.
  • For example, a learning algorithm like Logistic Regression is an Estimator and calling fit() trains a Logistic Regression Model, which is a Model and a Transformer.
Pipeline
  • Combines multiple transformers and estimators into a pipeline.
  • A Pipeline is specified as a sequence of stages and each stage is a Transformer or an Estimator.
  • Stages are specified as an ordered array.
  • In a linear pipeline, each stage uses data produced by the previous stage.
  • A non-linear pipeline is valid as long as it forms a Direct Acyclic Graph (DAG). A graph is specified based on the input and output column names of each stage.
  • Pipelines help ensure that training and test data go through identical feature extraction steps.
  • Tools used to tune ML pipelines include: CrossValidator, TrainValidationSplit
Parameters
  • Parameters can be specified on Estimators and Transformers for tuning the algorithms and models.
  • Example: If lr is an instance of Logistic Regression, call lr.setMaxIter(10) to make lr.fit() use at most 10 iterations.
  • A Param is a named parameter, while a ParamMap is a set of (parameter, value) pairs.
Spark ML Pipeline example

A text document classification pipeline has the following workflow:

  • Training workflow: Input is a set of text documents, where each document is labelled. Stages while training the ML model are:
    • Split each text document into words;
    • Convert each document's words into a numerical feature vector; and
    • Create a prediction model using the feature vectors and labels.
  • Test/prediction workflow: Input is a set of text documents and the goal is to predict a label for each document. Stages while testing or making predictions with the ML model are:
    • Split each text document into words;
    • Convert each document's words into a numerical feature vector; and
    • Use the trained model to make predictions on the feature vector.

Spark MLlib example

Note: The following example is taken from the examples/src/main/python/ml/pipeline_example.py example script in the Spark tarball.

  • First, set the PYTHONPATH (add the following to your ~/.bashrc file):
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("PipelineExample")\
    .getOrCreate()

# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr. 
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# Fit the pipeline to training documents.
model = pipeline.fit(training)

# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row 
    print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))

spark.stop()

The above simple script should return the following for Spark MLlib predictions for the test documents:

(4, spark i j k) --> prob=[0.159640773879,0.840359226121], prediction=1.000000
(5, l m n) --> prob=[0.837832568548,0.162167431452], prediction=0.000000
(6, spark hadoop spark) --> prob=[0.0692663313298,0.93073366867], prediction=1.000000
(7, apache hadoop) --> prob=[0.982157533344,0.0178424666556], prediction=0.000000

So, for this very simple example, Spark MLlib made perfect predictions (i.e., all documents with the word "spark" in them were correctly labeled).

Spark SQL

Spark SQL is:

  • A Spark module acting as a distributed SQL query engine
  • Used for structured data processing and allows running SQL-like queries
  • Data from different formats (e.g., JSON, Parquet, Hive tables, etc.) can be consumed
  • Allows combining SQL with Spark's programming API
  • Can perform ETL on data and then run ad hoc querying
  • SQLContext encapsulates all relational functionality in Spark
  • Starting with Spark 2.0, SparkSession replaces SparkContext and SQLContext
Spark SQL DataFrame and DataSet API
  • Used the DataFrame API (introduced in Spark 1.3) or the DataSet API (introduced in Spark 1.6)
  • The DataFrame API introduces the concept of a schema to describe data, similar to a relational table
  • The DataFrame API provides further optimization of query plans using Spark's Catalyst optimizer
  • The DataSet API combines the best of RDDs and DataFrame APIs. It allows object oriented style programming like RDDs and utilizes performance benefits of the Catalyst query optimizer
  • Inter-operates with RDDs. Can convert existing RDDs into DataFrames.
Spark SQL example using the DataFrame API

Note: This example was taken from the examples/src/main/python/sql/basic.py script in the Spark tarball.

$ cat people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spark.read.json("people.json")
# Displays the content of the DataFrame to stdout
df.show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

# spark, df are from the previous example
# Print the schema in a tree format
df.printSchema()
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)

# Select only the "name" column
df.select("name").show()
# +-------+
# |   name|
# +-------+
# |Michael|
# |   Andy|
# | Justin|
# +-------+

# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
# +-------+---------+
# |   name|(age + 1)|
# +-------+---------+
# |Michael|     null|
# |   Andy|       31|
# | Justin|       20|
# +-------+---------+

# Select people older than 21
df.filter(df['age'] > 21).show()
# +---+----+
# |age|name|
# +---+----+
# | 30|Andy|
# +---+----+

# Count people by age
df.groupBy("age").count().show()
# +----+-----+
# | age|count|
# +----+-----+
# |  19|    1|
# |null|    1|
# |  30|    1|
# +----+-----+

# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

# Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

# Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

# Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+
Running SQL queries programmatically

Note: This example was taken from the examples/src/main/python/sql/basic.py script in the Spark tarball. The sql function on a SparkSession enables applications to run SQL queries programmatically and returns the result as a DataFrame:

$ cat people.txt
Michael, 29
Andy, 30
Justin, 19
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))

# The schema is encoded in a string.
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)

# Creates a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT name FROM people")

results.show()
# +-------+
# |   name|
# +-------+
# |Michael|
# |   Andy|
# | Justin|
# +-------+

Spark Streaming

Spark Streaming is
  • An extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams.
  • Simple Big Data analytics is no longer sufficient. What is needed is fast, real-time analysis of streaming Big Data sources.
  • Data can be ingested from many sources (e.g., Kafka, Flume, Twitter, ZeroMQ, Kinesis, or TCP sockets).
  • Data can further be processed using MLlib, Graph processing, or high-level functions like map, reduce, join, window, etc.
  • Processed data can be pushed out to file systems, databases, and live dashboards:
Kafka
Flum                          HDFS
HDFS/S3 => Spark Streaming => Databases
Kinesis                       Dashboards
Twitter
Internal process of Spark Streaming
  • Spark Streaming receives live input data streams and divides data into batches.
  • Batches of data are processed by the Spark engine to produce batches of results.
  • A high-level API, called DStream (discretized stream), represents a continuous stream of data.
  • Internally, DStream is represented as a sequence of RDDs.
  • DStreams can be created either from input data streams from sources such as Kafka, Flume, and Kinesis, or by applying high-level operations on other DStreams:
input data stream => Spark Streaming => batches of input data => Spark Engine => batches of processed data
Example - Network word count using Spark Streaming
  • Create a StreamingContext (the main entry point to all Streaming functionality):
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two local working threads and a batch interval of 1:
second sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
  • Create a DStream that represents streaming data from a TCP source:
# Create a DStream that will connect to hostname:port (e.g., localhost:9999):
lines = ssc.socketTextStream("localhost", 9999)
  • Define Map Reduce functions to count the number of words from the streaming data:
# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first 10 elements of each RDD generated in this DStream to the console
wordCounts.pprint()
  • Start processing after all the transformations have been setup:
ssc.start()
ssc.awaitTermination()
# Complete script
# Source: examples/src/main/python/streaming/network_wordcount.py
from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="PythonStreamingNetworkWordCount")
    ssc = StreamingContext(sc, 1)

    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    counts = lines.flatMap(lambda line: line.split(" "))\
                  .map(lambda word: (word, 1))\
                  .reduceByKey(lambda a, b: a+b)
    counts.pprint()

    ssc.start()
    ssc.awaitTermination()
  • Start a netcat server in one terminal:
$ nc -lk 9999
  • Submit the following Spark command in another terminal:
$ ./bin/spark-submit network_wordcount.py localhost 9999 2>/dev/null
  • Any lines typed in the terminal running the netcat server will be counted and printed on the screen:
terminal1> nc -lk 9999
foo bar baz foo

terminal2> ./bin/spark-submit network_wordcount.py localhost 9999 2>/tmp/streaming.log
-------------------------------------------
Time: 2017-03-27 20:57:56
-------------------------------------------
(u'bar', 1)
(u'baz', 1)
(u'foo', 2)

Spark GraphX

What is Spark GraphX?
  • It is a graph processing library with APIs to manipulate graphs and performing graph-parallel computations.
  • It extends the Spark RDD API by introducing a new Graph abstraction, a directed multi-graph with properties attached to each vertex and edge.
  • Like RDDs, property graphs are immutable, distributed, and fault-tolerant.
  • It provides various operators for manipulating graphs (e.g., subgraph and mapVertices).
  • It provides a library of common graph algorithms (e.g., PageRank and triangle counting).
  • It has a growing collection of algorithms and builders to simplify graph analytics tasks.
GraphX example
  • Define a vertex array and edge array.
  • Construct RDDs from vertex and edge arrays.
  • Build a property graph using the RDDs of vertices and edges.
  • Perform filter operations on the graph (e.g., list users at least 30 years old).
  • Use the graph.triplets view to display relationships in the graph.
  • Example use cases: Graph social networks (e.g., LinkedIn, Facebook, Twitter, etc.)

SparkR

What is SparkR?
  • SparkR is an R package that provides a light-weight frontend to use Apache Spark from within R.
  • SparkR provides a distributed data frame implementation, which supports operations like selection, filtering, aggregation, etc. on large datasets.
  • SparkR also supports distributed machine learning using MLlib.
  • SparkR uses the SparkDataFrame API. SparkDataFrames can be constructed from a wide array of sources (e.g., structured data files, tables in Hive, external databases, or existing local R data frames).
  • The SparkR shell can be invoked using the ./bin/sparkR command.
  • You can also connect to SparkR from Rstudio or other R IDEs.
  • Allows converting R data frames to SparkDataFrames.
  • Can read from JSON, Parquet files, Hive, and other data sources.
  • spark.lapply (similar to lapply in native R) runs a function over a list of elements and distributes the computations with Spark.
  • A SparkDataFram can also be registered as a temporary view in Spark SQL and allows you to run SQL queries over its data.
  • SparkR support the following machine learning algorithms (as of March 2017):
    • Generalized Linear Model
    • Accelerated Failure Time (AFT)
    • Survival Regression Model
    • Naive Bayes Model
    • KMeans Model
  • Under the hood, SparkR uses MLlib to train the model.

See also

References

  1. Survey shows huge popularity spike for Apache Spark. Fortune.com. 2015-09-25.

External links

Blog posts