The previous systems which were developed for Big Data applications, such as MapReduce, offered a strong, but low-level procedural programming interface. By carrying up the development of the new systems for a better user experience, multiple techniques have been introduced to the relational interface, such as Pig, Hive, and Shark.
These Systems used declarative queries for providing richer optimizations. To get users out of the confusion of picking up only one system (either relational systems or procedural system), Spark SQL a new model of the Spark Ecosystem has been developed for integrating relational processing with procedural API.
Spark SQL provides the facility to let users use intermix of two models. This facility is achieved by two contributions. First is the DataFrame API which is a component of Spark. DataFrame has a facility to evaluate operations in a lazy manner so that it can perform relational optimizations. Second is the Spark SQL which provides an expendable optimizer called Catalyst. This optimizer makes it easy to add optimization rules, data sources and data types for domains such as machine learning.
DataFrame API is more efficient and provides more functionality over Spark’s existing traditional APIs. This API is a collection of structured records and can be created directly from the distributed collection of objects of Spark. All the operations of Spark SQL go through the catalyst optimizer. The Optimizer uses the characteristics of the Scala programming language. Spark SQL provides a number of benefits including richer optimizations. If Spark SQL processes the SQL queries which contains Joins and Filters.
DataFrame API is the contribution which made possible to mix up procedural and relational processing. The SQL interface, which is provided by Spark SQL, is accessible by command line console or JDBC/ODBC.
If the user wants to interact with Spark SQL then there are multiple ways inclusive of SQL and Dataset API. The same execution engine is used for calculating the results without caring about the API/language users are using for computation. This facility allows developers to easily switch back and forth between different APIs according to the proper way to express a given transformation.
Here we discuss following interface points by which users are able to utilize the richest facilities of the Spark SQL query engine.
One of the most technical purposes of Spark SQL is to execute SQL queries in Spark. Spark SQL behaves as a distributed query engine when a user uses its JDBC/ODBC or Spark SQL CLI to execute SQL queries. In this manner, users or applications keep in touch with Spark SQL straight away to run SQL queries, without writing any code. When SQL queries are executed by using Spark SQL CLI or over its JDBC/ODBC then DataFrame/Datasets are returned as a result.
2. Dataframe API
DataFrame API is a distributed collection of rows and it is a tabular data abstraction of Spark SQL, equivalent to the tables of the relational database. Manipulation in the DataFrames is done in the same way as it is performed in native Spark RDDs . For providing the optimized execution, it supports for several relational operators and it keeps track of their schema.
DataFrames are the collection of the column type. DataFrame can be built from tables (based on external data sources), from structured file formats (Parquet, Avro etc.) or from Java/Python object’s native RDD. Avro, a self-describing binary format for nested data. Parquet, a columnar file format for which we support column pruning as well as filters. These constructed DataFrames can be manipulated by using the clauses such as Where or Group By.
DataFrame can be assumed as the native RDDs because users are able to apply procedural programming because of it. It is much powerful than RDDs because of two features which are Custom Memory Management and Optimized Execution Plan. Spark’s DataFrames evaluates in a lazy manner (like RDD), so after the logical planning, It does not execute until a specific operation such as saveAs, is called by the users.
To illustrate, a Scala code is written for defining DataFrame:
val employee = spark.table("employee") val average = employee. where(employee(" salary ") &amp;amp;gt; 20000) println(average.count())
In this code, employee and average are DataFrames. Finally, each DataFrame reflects a logical plan (i.e. read the employee table and filter for salary >20000). When Count (output operation) is called, Spark SQL constructs a physical plan to compute the final result. Some optimizations might be done like scanning of the “salary” column only if data is stored in columnar format. DataFrame supports almost all relation operations, so users can apply these operations on DataFrames by using Domain Specific Language (DSL).
Let’s understand it by the following example:
teacher.join(subject, teacher("sub_ID") === subject ("id")) .where(teacher("gender") === "male") .groupBy (subject ("id"), subject ("name")) .agg (count ("name"))
Aside from using DSL on DataFrame for applying operations, DataFrame can also be saved as a temporary table in the system catalog. Then we can apply SQL queries to these temp tables.
Below code illustrates it:
val averageDF = employee.where (employee ("salary") &amp;amp;lt; 20000) averageDF.registerTempTable("average") spark.sql ("SELECT count (*), avg (salary) FROM average")
3. Datasets API
The dataset is an abstraction in Spark SQL. For the result’s aspects of users, It is equivalent to DataFrame but it differs when it comes about the performance and the way in which they get executed. In other words, we can say that Dataset is an extension of DataFrame API and it is more developer friendly in comparison with DataFrame.Datasets also take the benefits of Catalyst optimizer by revealing the expressions and data fields to the SQL query planner and it also grasps the fast in-memory encoding of Tungsten.
Datasets are designed to work alongside the existing RDD, but efficiency improvement takes place when data can be represented in a structured format. A dataset object is basically a strongly type-safe API which is immutable in nature. These objects are mapped to a relational schema.
A new concept of Dataset makes it more efficient that is “encoder”. It converts between JVM objects and tabular representation. The runtime code generation mechanism is used by the encoders to construct custom bytecode for datasets during serialization and deserialization. In other side, DataFrames are transformed into Java bytecode during serialization/deserialization. Because of the presence of the Encoders, custom bytecodes are less bulky than Java bytecode. So it makes datasets more efficient than DataFrame.
Encoders keep track of the process that our data match with expected schema and provide error messages to the users before user attempt to incorrectly process a huge quantity of data. Bytecode is produced by encoders to handle off-heap data and without de-serializing an overall object, encoders provide on-demand access to each attribute. A concept of “Case Class” is used for a dataset to define the structure of the schema. The RDDs, which contain case classes, can be converted into DataFrames by using the Scala interface. All the arguments of case class are read by using reflection and these arguments become the column names.
Apache Spark is a most technically challenged and an open-source big data processing engine which yields the extra benefits over Map Reduce. Spark Driver and Spark Workers are responsible for executing the tasks related to the job. The driver keeps track and controls the workflow and Workers are responsible for launching the executors for each part of the job which is submitted to the Spark Driver. RDD, Serializer, Scheduler, and Shuffling are the main components of Spark Driver. Here we discuss only shuffling in detail.
Shuffling is one of the key reasons for optimization in Spark. It is a phase of Spark Driver. It is a process of partitioning data (map side shuffle) and aggregating (reduce side shuffle) the intermediate or resultant data during the computation of the operations. Shuffling is mostly considered as a part of reducing phase. In shuffling phase, data is converted into a large number of partitions and according to the scenario, a large number of shuffle files are also created. Shuffling is basically a process to maintain a shuffle file for each partition. Normally in Spark, a number of shuffle file generated during shuffling, is M*R.
M= total no of map tasks
R= total no of reduce tasks
By using a consolidate feature of Spark, a number of the shuffle files is equal to E*C*R/T rather than per Map task M. Each Machine has to take care about C*R number of shuffle files in place of M*R. By using this feature, instead of generating a new file for each reducer, a group of output files is created. During the generation of the output data by map tasks, it appeals to a group of R files from this group. As soon as the map task gets finished, this group of R files is returned back to the group by the Map task. As C/T tasks are executed by every executor in a parallel fashion. After completing the foremost C/T parallel “map” tasks, each following “map” task would reuse an existing group from this group. By doing so, no of shuffle files created here will be less than M*R no of files. Shuffling is improved in Spark by using the Sort-based technique for particular scenarios.
E=total no of executors in cluster
C=total no of cores per executor
T=total no of CPUs for one task
For understanding clearly, let’s consider the shuffling on Map side and Reduce side:
1. Map Side Shuffle
Every map task in Spark writes a shuffle file for each reducer. Now all M*R files have to deliver the corresponding reducer, which could produce significant overheads. So Spark provides the facility to compress the map outputs and significantly reduces the risk of occurring out-of-memory error.
2. Reduce Side Shuffle
All the output data of map task come to the reduce side and each executor (which is dedicated to the reduce task) keeps the relevant data and this output data passes to all reduce task and final results are calculated at reduce side. Spark needs all shuffled data per Reduce task to settle down in memory when the Reduce task demands it. This situation occurs where the reducer task demands all the shuffled data for a GroupByKey or a ReduceByKey operation, for instance, If the memory required by a Reduce task exceeds the limit, which is allocated earlier, then an out of memory exception is thrown and the entire job gets aborted. To avoid this issue, the application must specify a high enough value for R, possibly through trial and error. So Shuffling can be comprehended as a reallocation of data among several Spark stages. It can be clarified in figure 1. “Shuffle Write” can be interpreted as a sum of all written serialized data on all executors before transmitting (normally at the extremity of a stage) and “Shuffle Read” is the total amount of reading serialized data on all executors at the starting.
Shuffle Write and Shuffle Read during Shuffling
QUERY PLANNING IN SPARK SQL
Catalyst Optimizer is designed in such a way so that new optimization techniques and features can be added easily to Spark SQL, particularly for regulating various problems with big data, like with semistructured data or advanced analytics. Developers can enhance the optimizer like, by adding a data source specific rules that can push filter or aggregation into external storage systems, or support for new data types. A tree is the main datatype in a catalyst which is a collected from of node objects. These objects are immutable and can be altered by functional transformations. Spark SQL starts it’s processing either from an Abstract Syntax Tree (AST) of a SQL query or from a dataframe object. AST is computed by SQL parser and dataframe objects are returned by API. There are specific libraries for relational processing and some sets of rules which are used for tree transformation. Rules may need to execute multiple times for complete transformation of a tree. Catalyst’s tree transformation is used in different phases of query execution. The phases are: Analysis, Logical Optimization, Physical Planning, and Code Generation.
Here we describe each phase:
Phases of query planning in Spark SQL
Abstract Syntax Tree and Dataframe objects can have some unresolved attribute references or relations. Let’s consider a query: Select col1 from table1. In this query, the type of col1 or whether the name of col1 is correct, is not known until the parser looks up for the table1 and the tree containing unresolved references is called an “unresolved logical plan”. An analysis phase takes an unresolved plan as an input and convert it into resolved logical plan by applying some rules to analyzer.
Logical optimization is applied to the analyzed logical plan. It is a cost-based optimization. Some rules are applied during optimization on logical plans – such as constant folding, projection pruning, predicate pushdown, null propagation, Boolean expression simplification, and other rules.
Physical planning is the third phase which takes optimized logical plan as an input. In this phase, one or more physical plan is created by applying some physical operators and the best plan is selected by using cost-based optimization. Some rule-based optimizations are also applied by this phase. Beside this projection or predicate push down are also performed in this phase.
Code generation is the final phase of query optimization in which Java bytecode is generated by using a great feature of Scala programming language i.e. quasi-quote. Generated Java bytecode runs on each machine.