Apache Spark

I started working with big data technologies in July 2014, I was having hands-on experience on map-reduce code but in late 2014, I got introduced to another computing engine i.e Apache Spark, and that’s how I started with Scala since Spark itself is written in Scala. I did start with a fun data science project trying to recommend item on the basis of their attributes. This further how it gets turned into a great way of understanding the core concept of Spark and it’s programming.

Later on, I did work on building Search Engine based on Syntactic Similarity, while doing this I got performance issues, I was working on 30 GB Ram machine with 12 cores in spark cluster mode, my data set was about 1 GB (not too big), but still, it was taking 1.5 hours and that’s how I started reading about tuning in spark for certain behaviours and also learned how to play with spark tasks getting created. Finally, I ended with my application working in just 10 seconds, Is’nt awesome?

Earlier in 2016, things get changed, GB becomes PT, this was my first time when I was getting introduced to such a large cluster and complex mechanisms. This tutorial is mainly about:

  1. How to get started with Spark.
  2. Working with core functionalities of Spark SQL.
  3. Some troubleshooting steps.
  4. Tunning spark applications.

Let’s get started

Spark provides four major components:

  1. SQL and DataFrames
  2. MLlib for machine learning
  3. GraphX
  4. Spark Streaming

The beauty of Spark is that these libraries can be combined into one spark application and hence sky is the limit, whether you want to perform Data Wrangling or you want to perform operation on streamed dataset, whether you want to apply complex algorithm design by yourself or want to use a commonly known machine learning libraries, everything is there in one single project.

Clone this project to get spark ready for your system: https://github.com/horizon23/spark-learning.git

Examples discussed in this tutorial will also be the part of this project.

Spark Session

It’s simply an entry point to get interacted with spark functionalities and getting started with spark programming.

Creating a SparkSession

val spark: SparkSession = SparkSession.builder()
.appName("Spark Learning")
.master("Local")
.getOrCreate()

We are using Local mode for learning besides this, it supports deployment on several Cluster modes:

 More about cluster architectural knowledge can be studied at Spark official website.

Our focus will be on the usage and practical implementation which is quite scattered all over the internet. So let’s get back to an example. There are certain properties you need to define for the creation of Spark Session.

Function Usage
appName(name)

Define the name of an application, which will be shown on spark web UI.

master(master)

Sets the Spark master URL to connect to, such as “local” to run locally, “local[4]” to * run locally with 4 cores, or “spark://master:7077” to run on a Spark standalone cluster.

enableHiveSupport()

This will enable hive support by getting connected to Hive Metastore. (We will learn more about this in Spark-SQL tutorial).

Whole spark revolve around RDDS (resilient distributed dataset), these are fault-tolerant elements collection, which can be processed in parallel.

 

Screen Shot 2561-01-16 at 11.08.48 PM.png.jpg

RDD is a logical reference to a distributed dataset across many servers in the cluster.

There are two ways of creating RDD:

  1. Parallelizing the existing components in driver program.

    val collection = Seq("A", "B", "C", "D")
    val rdd = spark.sparkContext.parallelize(data)
    

    Spark will create partitions and spark will run 1 task per partition and hence more tasks, more parallelism, and each task will engage 1 core of an executor machine and hence more cores will lead to more tasks, which will lead to having more parallelism.
    Although spark will create partitions automatically on the basis of a cluster but still a user can provide his own partition number as the second parameter of a parallelize function (e.g..spark.sparkContext.parallelize(data, 10))

  2. External Datasets.
    val rdd = spark.sparkContext.textFile("input_data.txt")
    

    Spark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.

RDD supports two major operations:

  1. Transformation: If a method is responsible for the modification in existing partition then that particular operation is basically a transformation.
  2. Action: If a method is demanding to get resultant of all transformation so far then it’s surely an action.Example:
    case class Data(name: String, id: Int, age: Int)
    case class DataTransform(first_name: String, last_name: String, id: Int, age: Int)
    
    val data = Seq(
    Data("Ram Kumar", 1201, 24),
    Data("Mukesh Beil", 1202, 24),
    Data("Ramesh Kumar", 1203, 25),
    Data("Sham Jain", 1204, 25),
    Data("Puja Singh", 1205, 20)
    )
    //Parallelizing data
    val rdd = sc.parallelize(data)
    
    //Transformation => Splitting name to first and last name
    val rdd1 = rdd.map(x => {
    val name = x.name.split(" ")
    DataTransform(name.head, name.last, x.id, x.age)
    })
    
    //Transformation => Filtering people having age greater than 21
    val rdd2 = rdd1.filter(_.age >= 21)
    
    //Action: To count all the rows of transformed RDD
    val numberOfRows = rdd2.count()
    

Here are some features of RDD as follow:

  1. Fault-Tolerant: If a partition of RDD gets lost, then it has enough information to recompute that partition since spark log all transformations across all machines.
  2. Data-Storage: RDD’s are distributed in form of partitions and this can be either on memory or on disk as well (it depends on the memory allocation). If it has enough memory to hold all partitions in memory then keep on main-memory else it will spill out the partitions to the disk.
  3. Immutable: Every time a new RDD will be generated on every transformation, this helps in achieving consistency in computation.
  4. Coarse-grained operations: It supports operations which can be applied to the whole dataset like groupBy, map or a filter.
  5. Lazy evaluations: Spark prepare a lineage, a lineage is basically a DAG (Direct Acyclic Graph), it’s a graph of operations to be performed in order, here is the DAG described for an example shown above. A DAG is continuous until there is no action. In our case count() is an action, hence this whole pipeline will be called when count() action is invoked.Screen Shot 2561-01-17 at 7.32.35 AM.png

    With the help of this DAG, a spark can recompute a damaged partition since all the information about a particular transform along with the connectivity to another transformations resides in a DAG.

     

Checkpoint

Another beauty of spark is it will club all the transformation from a single job to 1 transformation and hence only 1-time scanning will be done, but if you have very complex transformation then it can be a bit non-performing operation in case of a failure because to recompute spark will go over all the transformation back again.
To get rid of this issue we can use Checkpoint in spark.

A checkpoint will break a DAG and save the output after executing transformation until this point, hence even if any failure occurs recomputing will be done from this point.

Cache/Persist

Let’s take an example:

  1. Without cache
    case class Data(name: String, id: Int, age: Int)
    case class DataTransform(first_name: String, last_name: String, id: Int, age: Int)
    
    val data = Seq(
    Data("Ram Kumar", 1201, 24),
    Data("Mukesh Beil", 1202, 24),
    Data("Ramesh Kumar", 1203, 25),
    Data("Sham Jain", 1204, 25),
    Data("Puja Singh", 1205, 20)
    )
    //Parallelizing data
    val rdd = sc.parallelize(data)
    
    //Transformation => Splitting name to first and last name
    val rdd1 = rdd.map(x => {
    val name = x.name.split(" ")
    DataTransform(name.head, name.last, x.id, x.age)
    })
    
    //Transformation => Filtering people having age greater than or equal to 21
    val rdd2 = rdd1.filter(_.age >= 21)
    
    //Action => To save output in a text file
    rdd2.saveAsTextFile("output_>=21")
    
    //Transformation => Filtering people having age lesser than 21
    val rdd3 = rdd1.filter(_.age < 21)
    
    //Action => To save output in a text file
    rdd3.saveAsTextFile("output<21")
    

    Screen Shot 2561-01-17 at 8.55.33 AM.png
    As it’s clearly visible rdd1 is getting calculated 2 times because it’s getting used independently by 2 different filters. The problem is spark remove RDD once it gets used.

  2. With Cache
    case class Data(name: String, id: Int, age: Int)
    case class DataTransform(first_name: String, last_name: String, id: Int, age: Int)
    
    val data = Seq(
    Data("Ram Kumar", 1201, 24),
    Data("Mukesh Beil", 1202, 24),
    Data("Ramesh Kumar", 1203, 25),
    Data("Sham Jain", 1204, 25),
    Data("Puja Singh", 1205, 20)
    )
    //Parallelizing data
    val rdd = sc.parallelize(data)
    
    //Transformation => Splitting name to first and last name
    val rdd1 = rdd.map(x => {
    val name = x.name.split(" ")
    DataTransform(name.head, name.last, x.id, x.age)
    })
    
    rdd1.cache()
    
    //Transformation => Filtering people having age greater than or equal to 21
    val rdd2 = rdd1.filter(_.age >= 21)
    
    //Action => To save output in a text file
    rdd2.saveAsTextFile("output_>=21")
    
    //Transformation => Filtering people having age lesser than 21
    val rdd3 = rdd1.filter(_.age < 21)
    
    //Action => To save output in a text file
    rdd3.saveAsTextFile("output<21")
    

    Screen Shot 2561-01-17 at 8.54.24 AM.png

This green color indicates that this RDD has been cached and hence, it’s not getting recomputed which save a lot of time.

Therefore, a caching is required for a situation when 1 RDD can be used multiple times because of the limitation that spark will destroy RDD after each usage.
There are 2 functions cache() and persist(), cache is simple caching on main memory whereas persist takes parameter to specify where the caching will be done: Disk Only,
Memory Only or Disk and Memory both.

 

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

Create a website or blog at WordPress.com

Up ↑

%d bloggers like this: