Apache Spark

From Christoph's Personal Wiki
Revision as of 21:25, 24 March 2017 by Christoph (Talk | contribs) (Using Spark)

Jump to: navigation, search

Apache Spark is an open-source cluster-computing framework. Originally developed at the University of California, Berkeley's AMPLab, the Spark codebase was later donated to the Apache Software Foundation, which has maintained it since. Spark provides an interface for programming entire clusters with implicit data parallelism and fault-tolerance.

Introduction

What is Spark?
  • Fast and general engine for large-scale data processing and analysis
  • Parallel distributed processing on commodity hardware
  • Easy to use
  • A comprehensive, unified framework for Big Data analytics
  • Open source and a top-level Apache project
Spark use cases
  • Big Data use case like intrusion detection, product recommendations, estimating financial risks, detecting genomic associations to a disease, etc. require analysis on large-scale data.
  • In depth analysis requires a combination of tools like SQL, statistics, and machine learning to gain meaningful insights from data.
  • Historical choices like R and Octave operate only on single node machines and are, therefore, not suitable for large volume data sets.
  • Spark allows rich programming APIs like SQL, machine learning, graph processing, etc. to run on clusters of computers to achieve large-scale data processing and analysis.
Spark and distributed processing
  • Challenges of distributed processing:
    • Distributed programming is much more complex than single node programming
    • Data must be partitioned across servers, increasing the latency if data has to be shared between servers over the network
    • Chances of failure increases with the increase in the number of servers
  • Spark makes distributed processing easy:
    • Provides a distributed and parallel processing framework
    • Provides scalability
    • Provides fault-tolerance
    • Provides a programming paradigm that makes it easy to write code in a parallel manner
Spark and its speed
  • Lightning fast speeds due to in-memory caching and a DAG-based processing engine.
  • 100 times faster than Hadoop's MapReduce for in-memory computations and 10 time faster for on-disk.
  • Well suited for iterative algorithms in machine learning
  • Fast, real-time response to user queries on large in-memory data sets.
  • Low latency data analysis applied to processing live data streams
Spark is easy to use
  • General purpose programming model using expressive languages like Scala, Python, and Java.
  • Existing libraries and API makes it easy to write programs combining batch, streaming, interactive machine learning and complex queries in a single application.
  • An interactive shell is available for Python and Scala
  • Built for performance and reliability, written in Scala and runs on top of Java Virtual Machine (JVM).
  • Operational and debugging tools from the Java stack are available for programmers.
Spark components
  • Spark SQL
  • Spark Streaming
  • MLib (machine learning)
  • GraphX (graph)
  • Spark R
Spark is a comprehensive unified framework for Big Data analytics
  • Collapses the data science pipeline by allowing pre-processing of data to model evaluation in one single system.
  • Spark provides an API for data munging, Extract, transform, load (ETL), machine learning, graph processing, streaming, interactive, and batch processing. Can replace several SQL, streaming, and complex analytics systems with one unified environment.
  • Simplifies application development, deployment, and maintenance.
  • Strong integration with a variety of tools in the Hadoop ecosystem.
  • Can read and write to different data formats and data sources, including HDFS, Cassandra, S3, and HBase.
Spark is not a data storage system
  • Spark is not a data store but is versatile in reading from and writing to a variety of data sources.
  • Can access traditional business intelligence (BI) tools using a server mode that provides standard Java Database Connectivity (JDBC) and Open Database Connectivity (ODBC).
  • The DataFrame API provides a pluggable mechanism to access structured data using Spark SQL.
  • Its API provides tight optimization integration, thereby enhances the speed of the Spark jobs that process vast amounts of data.
History of Spark
  • Originated as a research project in 2009 at UC Berkeley AMPLab.
  • Motivated by MapReduce and the need to apply machine learning in a scalable fashion.
  • Open sourced in 2010 and transferred to Apache in 2013.
  • A top-level Apache project, as of 2017.
  • Spark is winner of Daytona GraySort contesting 2014, sorting a petabyte 3 times faster and using 10 times less hardware than Hadoop's MapReduce.
  • "Apache Spark is the Taylor Swift of big data software. The open source technology has been around and popular for a few years. But 2015 was the year Spark went from an ascendant technology to a bona fide superstar".[1]

Spark use cases

  • Fraud detection: Spark streaming an machine learning applied to prevent fraud
  • Network intrusion detection: Machine learning applied to detect cyber hacks
  • Customer segmentation and personalization: Spark SQL and machine learning applied to maximize customer lifetime value
  • Social media sentiment analysis: Spark streaming, Spark SQL, and Stanford's CoreNLP wrapper helps achieve sentiment analysis
  • Real-time ad targeting: Spark used to maximize Online ad revenues
  • Predictive healthcare: Spark used to optimize healthcare costs
Example - Spark at Uber
  • Business Problem: A simple problem of getting people around a city with an army of more than 100,000 drivers and to use data to intelligently perfect the business in an automated and real-time fashion.
  • Requirements:
    • Accurately pay drivers per trips in the dataset
    • Maximize profits by positioning vehicles optimally
    • Help drivers avoid accidents
    • Calculate surge pricing
  • Solution: Use Spark Streaming and Spark SQL as the ELT system and Spark MLlib and GraphX for advanced analytics
  • Reference: Talk by Uber engineers at Apache Spark meetup: https://youtu.be/zKbds9ZPjLE
Example - Spark at Netflix
  • Business Problem: A video streaming service with emphasis on data quality, agility, and availability. Using analytics to help users discover films and TV shows that they like is key to Netflix's success.
  • Requirements:
    • Streaming applications are long-running tasks that need to be resilient in Cloud deployments.
    • Optimize content buying
    • Renowned personalization algorithms
  • Solution: Use Spark Streaming in AWS and Spark GraphX for recommendation system.
  • Reference: Talk by Netflix engineers at Apache Spark meetup: https://youtu.be/gqgPtcDmLGs
Example - Spark at Pinterest
  • Business Problem: Provide a recommendation and visual bookmarking tool that lets users discover, save, and share ideas. Also get an immediate view of Pinterest engagement activity with high-thoughput and minimal latency.
  • Requirements:
    • Real-time analytics to process user's activity.
    • Process petabytes of data to provide recommendations and personalization.
    • Apply sophisticated deep learning techniques to a Pin image in order to suggest related Pins.
  • Solution: Use Spark Streaming, Spark SQL, MemSSQL's Spark connector for real-time analytics, and Spark MLlib for machine learning use cases.
  • Reference: https://medium.com/@Pinterest_Engineering/real-time-analytics-at-pinterest-1ef11fdb1099#.y1cdhb9c3
Example - Spark at ADAM (a Big Data genomics project)
  • Business Problem: ADAM is a genomics analysis platform that provides large-scale analyses to support population-based genomics studies, which is essential for precision medicine.
  • Requirements:
    • Parallelize genomics analysis in the Cloud
    • Replace developing, custom distributed computing code.
    • Support for file formats well suited for genomic data, like Apache Parquet and Avro.
    • Support for languages like R, which are popular in the genomics community.
  • Solution: Use Spark on Amazon EMR
  • Reference: https://github.com/bigdatagenomics/adam and http://bdgenomics.org/
Example - Spark at Yahoo
  • Business Problem: Deep learning is critical for Yahoo's product teams to acquire intelligence from huge amounts of Online data. Examples are image recognition and speech recognition for improved search on photo sharing service Flickr.
  • Requirements:
    • Run deep learning software on existing infrastructure.
    • Distribute deep learning processes across multiple Big Data clusters.
    • Handle potential system failures on long running deep learning jobs.
  • Solution: Create a way to run deep learning system CaffeOnSpark.
  • Reference: https://youtu.be/bqj7nML-aHk

Architecture and components of Spark

The Spark stack:

+-----------------+ +-----------------+ +------------------+ +------------------+
|    Spark SQL    | | Spark Streaming | |      MLlib       | |      GraphX      |  <= APIs
| structured data | |    real-time    | | machine learning | | graph processing |
+-----------------+ +-----------------+ +------------------+ +------------------+

+-------------------------------------------------------------------------------+
|                                Spark Core                                     |
+-------------------------------------------------------------------------------+

+---------------------------+    +------------------+    +-----------------+
|   Stand alone scheduler   |    |   Apache Mesos   |    |   Hadoop YARN   |  <= Cluster managers
+---------------------------+    +------------------+    +-----------------+
Spark Core (the base engine)
  • Distributes workloads
  • Monitors applications across the cluster
  • Schedules tasks
  • Memory management
  • Fault recovery
  • Interacts with storage systems
  • Houses API that defines Resilient Distributed Datasets (RDDs) - the primary data abstraction in Spark
Spark API libraries
  • Spark SQL: Provides structured data processing
  • Spark Streaming: Enables processing of live streams of data
  • Spark MLlib: A library containing common machine learning functionality
  • GraphX: A library for manipulating graphs and performing graph-parallel computations
  • SparkR: An R package that provides a lightweight frontend to use Spark from within R
Spark cluster managers
  • Cluster managers allocate resources across applications on a cluster.
  • As of March 2017, the following are the cluster managers Spark supports:
    • Standalone — a simple cluster manager included with Spark that makes it easy to setup a cluster.
    • Apache Mesos — a general cluster manager that can also run Hadoop MapReduce and service applications.
    • Hadoop YARN — the resource manager in Hadoop 2.
Spark runtime architecture
  • In distributed mode, Spark uses a master/slave architecture.
  • The master is called the "driver" and the slaves are the "executors" (live on worker nodes)
  • Drivers and executors run in their own Java process.
  • A driver and its executors put together form a Spark application.
  • A Spark application is launched using the cluster manager.
SparkContext
  • Main entry point to everything Spark.
  • Defined in the main/driver program.
  • Tells Spark how and where to access a cluster.
  • Connects to cluster managers.
  • Coordinates Spark processes running on different cluster nodes.
  • Use to create RDDs and shared variables on the cluster.
The Driver
  • The process where the main() method of the Spark program runs.
  • Responsible for converting a user program into tasks.
  • The driver schedules the tasks on executors (live on worker nodes).
Executors (aka worker processes)
  • Launched once at the beginning of the application and typically run for the entire lifetime of an application.
  • Executors register themselves with the driver, thereby allowing the driver to schedule tasks on the executors.
  • Worker/executor processes run the individual tasks and return results to the driver.
  • Provide in-memory storage for RDDs, as well as disk storage.
Spark running on clusters
  • Application: User program built on Spark; consists of a driver program and executors on the cluster.
  • Cluster Manager: A pluggable service for acquiring resources on the cluster.
  • WOrker node: Any node that can run application code in the cluster.
  • Driver program: The process running the main() function of the application and creating the SparkContext.
  • Executor: A process launched on a worker node that runs tasks.
  • Task: The smallest unit of work sent to one executor. Tasks are bundled into "stages".
Flow of execution in a Spark program
  • User submits an application, which launches the driver program.
  • The driver program invokes the main() function specified by the user.
  • The driver program contacts the cluster manager to ask for resources to launch executors.
  • The cluster manager launches executors.
  • The driver program divides the user program into tasks and sends them to the executors.
  • The executors run the tasks, computes, saves results (either in-memory or to disk), and returns results to the driver.
  • When the driver's main() function exits or SparkContext.stop() is called, the executors are terminated and the cluster manager releases the resources.

Resilient Distributed Datasets (RDDs)

Data API in Spark
  • RDD API: The core data API (introduced in Spark 1.0).
  • DataFrame API: Uses a schema to describe the data (introduced in Spark 1.3). More suitable for query building.
  • DataSet API: Combines the best of the RDD and DataFrame APIs (released via Spark 2.0).
RDD basics
  • Core concept in Spark.
  • Abstraction for working with data in Spark.
  • RDD API in Python, Scala, and Java.
  • Simply described as a distributed collection of elements.
  • Spark automatically distributes RDDs across nodes in a cluster and can parallelize operations.
  • A fault-tolerant collection of elements, since they can be re-built from the lineage (i.e., it keeps track of how the RDD was constructed and what operations were performed on it).
  • An RDD is a read-only collection of elements, immutable once constructed.
Creating RDDs
  • Three methods for creating an RDD:
  1. Parallelize or distributing an existing collection (e.g., a list or a set)
    lines = sc.parallelize(["I", "am", "learning", "Spark"])
  2. Loading an external dataset from files on an external storage system
    lines = sc.textFile("README.md")
    or,
    text_file = spark.textFile("hdfs://...")
  3. Operations on an existing RDD
     newLines = lines.transform(...)
where sc = SparkContext
Operations on RDDs
Spark lineage graph
  • Transformations:
    • Create a new dataset from an existing one
    • Returns an RDD
    • Lazy evaluation, not computed immediately
    • Generally works element-wise
    • Relationships between transformations are recorded in a lineage graph, which is DAG
    • Examples:
      inputRDD = sc.textFile("log.txt")
      errorsRDD = inputRDD.filter(lambda x: "error" in x)
      warningRDD = inputRDD.filter(lambda x: "warning" in x)
      badLinesRDD = errorsRDD.union(warningRDD)
  • Actions:
    • RDDs are evaluated when an action is called (i.e., nothing happens on the above transformations until the action is called)
    • They are a mechanism to get results out of Spark
    • They return result(s) to the driver or to persistent storage
    • When an action is called, the entire RDD is computed from scratch. Therefore, it is a good practice to persist intermediate results.
    • Examples:
      print "Number of problems in log file: " + badLinesRDD.count()
      badLinesRDD.take(10)
Lazy evaluation and lineage graph of RDDs
  • When an RDD is created, a lineage graph is created (a DAG);
  • DAG is not evaluated until an action is called, thus a "lazy evaluation";
  • When more transformations are called and new RDDs are derived, the lineage graph is updated and keeps track of the relationships between RDDs; and
  • If a failure occurs in a given task, the DAG/lineage graph is replicated and each RDD can be computed on demand. Thus, it is fault-tolerant.
A Spark program lifecycle
  1. Create RDD from external data or existing collection;
  2. Lazily transform into new RDD;
  3. Cache() some RDD for repeated usage; and
  4. Perform actions on RDD to execute parallel operations and to compute results.
Where RDD operations are run
  1. RDD created at the driver and distributed to executors;
  2. Driver lazily evaluates transformations and creates a lineage graph;
  3. Upon encountering an action, the driver schedules tasks on the executors;
  4. Executors run in parallel and execute transformations and/or actions on RDDs;
  5. Executors have more memory and, therefore, can cache an RDD if necessary;
  6. Executors run actions to compute results and return results to the driver; and
  7. Drivers run actions on the results returned from executors.
RDD operations
  • Spark applications are essentially the manipulation of RDDs through transformations and actions.
  • Operations are invoked on RDDs by executing functions on each element of the RDD.
  • Spark offers over 80 high-level operations (for both transformations and actions) beyond Map and Reduce.
  • Common transformation operators: map, file, distinct, flatMap
  • Common action operators: collect, count, take, reduce
  • For a complete set of operators, refer to the Spark documentation on RDD (Python).
Passing functions to Spark
  • Operations are invoked on RDDs by passing functions to each element of the RDD.
  • In Python, pass lambda expressions for shorter functions or locally defined functions or top-level functions.
  • Note: While passing a function to Spark, if it is a member of an object or contains references to fields of an object, the entire object gets sent to worker nodes. A best practice is to extract fields into a local variable and pass that in.
Transformation examples on RDD
  • Note: The following examples use Scala syntax.
    rdd = {4,5,10,10}
  • Apply a function to each element in the RDD and return new RDD:
    rdd.map(x => x + 1)  #=> {5,6,11,11}
  • Apply a function to each element in the RDD and return elements that are true for the function:
    rdd.filter(x => x < 10)  #=> {4,5}
  • Remove duplicate elements in the RDD and return new RDD:
    rdd.distinct()  #=> {4,5,10}
  • Apply a function to each element in the RDD and then flatten results (similar to map, but the function returns a sequence rather than a single element):
    rdd.flatmap(x => list(x-1, x+1)  #=> {3,5,4,6,9,11,9,11}
Action examples on RDD
  • rdd = {4,5,10,10}
  • Return all elements from the RDD to the driver:
    rdd.collect()  #=> {4,5,10,10}
  • Return number of elements in the RDD:
    rdd.count()  #=> 4
  • Return n elements from the RDD:
    rdd.take(2)  #=> {4,5}
  • Combine elements in the RDD in parallel:
    rdd.reduce((x,y) => x+y)  #=> 29
Example of simple transformations and actions together
data = xrange(1,30)  # initialize data

#== SparkContext ==
xrangeRDD = sc.parallelize(data, 4)
# Dataset is broken into 4 partitions by driver and sent to workers' memory. E.g.:
# RDD_p1 = {1,2,3,4,5,6} => worker #1's memory
# RDD_p2 = {7,8,9,10} => worker #1's memory
# RDD_p3 = {11,12,13,14,15,16,17,18,19,20,21,22,23,24 => worker #2's memory
# RDD_p4 = {25,26,27,28,29,30} => worker #3's memory

#== Transformations ==
subRDD = xrangeRDD.map(lambda x: x-1)
# {1,2,3,4,5,6} => {0,1,2,3,4,5}
# {7,8,9,10} => {6,7,8,9}
# Etc.

filteredRDD = xrangeRDD.filter(lambda x: x<10)
# {1,2,3,4,5,6} => {1,2,3,4,5,6}
# {7,8,9,10} => {7,8,9}

#== Actions ==
subRDD.collect() # Gathers the entries from all partitions into the driver
# {0,1,2,...,29}
subRDD.count() # Results sent to SparkContext where they are summed => 30

Key-value pair RDDs

  • Pair RDDs are RDDs containing key-value pairs.
  • Commonly used in applications involving a customerID or eventID, or some unique identifier.
  • Operations are invoked on RDDs based on the keys (e.g., group data across nodes by key).
  • Pair RDDs can be created by directly loading key-value data.
  • Pair RDDs can also be created using map(), which returns the key-value pair.
  • Operations like shuffle are only available on Pair RDDs.
Example - Word count
  • The goal is to count the frequency of each word in a document.
  • Used in the bag-of-words model in Natural Language Processing (NLP) and information retrieval.
  • Applications include:
    • Document classification
    • Finding matching documents on web searches (e.g., looking for plagarism)
    • Email spam classification
  • Input: "Bob likes to watch films. Alice likes films too."
  • Output: [("Bob",1),("likes",2),("to",1),("watch",1),("films",2),("Alice",1),("too",1)]
# Python method #1:
input = sc.textFile("data.txt")
lines = input.flatMap(lambda line: line.split())
pairs = lines.map(lambda word: (word, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)

# Python method #2 (generally slower than method #1):
input = sc.textFile("data.txt")
lines = input.flatMap(labmda line: line.split())
pairs = lines.map(lambda word: (word, 1))
groups = pairs.groupByKey()
counts = groups.map(lambda (word, count): (word, sum(count))

# Scala
val input = sc.textFile("data.txt")
val lines = input.flatMap(l => l.split(" "))
val pairs = lines.map(word => (word, 1))
val counts = pairs.reduceByKey(_ + _)
Compare reduceByKey with groupByKey
  • Aggregate operations on Pair RDDs trigger a shuffle of data.
  • Shuffle occurs to transfer all data with the same key to the same worker node.
  • reduceByKey combines data on each worker node such that only one value per key is sent of the network to the reducer worker nodes.
  • groupByKey sends all data over the network to reducer worker nodes and is, therefore, less efficient.
  • Thus, reduceByKey is generally preferred over groupByKey.

Installing Spark

Overview of install steps
  • Install Java and verify installation (since Spark runs inside a JVM)
  • Download and extract Spark package
  • Configure Spark environment
  • Invoke the Spark shell and verify Spark installation

Install Oracle Java 8 JDK

CentOS

Note: Visit the Oracle Java 8 JDK Downloads page, accept the license agreement, and copy the download link for the Linux 64-bit RPM.

$ RPM=http://download.oracle.com/otn-pub/java/jdk/8u121-b13/e9e7ea248e2c4826b92b3f075a80e441/jdk-8u121-linux-x64.rpm

Replace the above link with the latest link from Oracle's website.

$ wget --no-cookies --no-check-certificate \
    --header "Cookie: gpw_e24=http%3A%2F%2Fwww.oracle.com%2F; oraclelicense=accept-securebackup-cookie" \
    "${RPM}"
$ sudo yum localinstall jdk-8u121-linux-x64.rpm
$ java -version

Java should now be installed at /usr/java/jdk1.8.0_121/jre/bin/java and linked from /usr/bin/java.

Ubuntu
$ sudo apt-add-repository ppa:webupd8team/java
$ sudo apt-get update
$ sudo apt-get install oracle-java8-installer
$ java -version

Java should now be installed at /usr/lib/jvm/java-8-oracle/jre/bin/java and linked from /usr/bin/java.

Install Apache Spark

$ sudo mkdir -p /opt/spark
$ tar zxvf spark-2.1.0-bin-hadoop2.7.tgz -O /opt/spark
  • Add the following lines to your ~/.bashrc file:
export SPARK_HOME=/opt/spark
export PATH=$PATH:/opt/spark/bin
$ source ~/.bashrc
$ cd $SPARK_HOME
$ vi ./bin/load_env.sh  # add the following line to the top of the file:
export SPARK_LOCAL_IP=127.0.0.1

Using Spark

REPL (Read, Eval, Print, Loop)
  • Python REPL invoked by executing pyspark
  • Scala REPL invoked by executing spark-shell
  • SparkContext is initialized automatically in the REPL and available as "sc"
  • As of March 2017, Java does not support REPL
Spark applications
  • These are what are used in a production environment.
  • The <cpde>./bin/spark-submit</code> script is used to launch bundled Spark applications.
  • Scala applications are typically bundled using sbt (simple build tool).
  • Java applications are typically bundled using Maven.
  • Python programs are not required to be packaged, since the pyspark script takes care of setting up dependencies.
  • SparkContext must be manually initialized in Spark applications, unlike in the REPL.
Spark submit example
  • Run application locally on 8 cores:
$ ./bin/spark-submit \
    --class org.apache.spark.examples.SparkPi \
    --master local[8] \
    ./examples/jars/spark-examples_2.11-2.1.0.jar 2>/tmp/run.log

Running the above should return:

Pi is roughly 3.1431557157785788

The /tmp/run.log file will have the details for the Spark run log.

Pyspark

NOTE: If you have IPython installed, set the following to force pyspark to use IPython for its REPL:

export PYSPARK_DRIVER_PYTHON=ipython
  • Navigate to the Spark installation directory and invoke the Spark Python REPL (interactive shell) using the "pyspark" command:
cd $SPARK_HOME
./bin/pyspark
  • Create an RDD of strings using method textfile(), count the number of lines, and print the first line using first():
input = sc.textFile("README.md")
input.count()
input.first()
  • Call the filter() transformation to filter lines containing the word "Python":
pythonLines = input.filter(lambda line : "Python" in line)
pythonLines.collect()
  • Create an RDD using parallelize() on an array of words and check its contents calling the collect() method:
words = sc.parallelize(['pencil', 'paper', 'computer', 'mouse'])
words.collect()
  • Write a map function to append the letter "s" to all the words in the above RDD:
pluralWords = words.map(lambda word: word + 's')
pluralWords.collect()
  • Create an RDD using parallelize() on an array of integers:
nums = sc.parallelize([1,2,3,4,5])
nums.collect()
  • Write a map() function to compute the square of each integer in the above RDD:
squaredNums = nums.map(lambda num : num * num)
squaredNums.collect()
  • Write a reduce() function that computes the sum of all squared integers:
squaredNums.reduce(lambda x,y : x+y)
  • Example word count script:
lines = sc.textFile("README.md")
words = lines.flatMap(lambda line: line.split(" "))
counts = words.map(lambda word: (word,1))
total = counts.reduceByKey(lambda x,y : x+y)
# Filter words where the frequency is greater than 10
total.filter(lambda x : x[1]>10).collect()
# [(u, 72), (u'Spark', 16), (u'for', 12), (u'the', 24), (u'to', 17)]

See also

References

  1. Survey shows huge popularity spike for Apache Spark. Fortune.com. 2015-09-25.

External links