What is Spark SQL?
Spark SQL is one of the main components of the Apache Spark framework. It is mainly used for structured data processing. It provides various Application Programming Interfaces (APIs) in Python, Java, Scala, and R. Spark SQL integrates relational data processing with the functional programming API of Spark.
Spark SQL provides a programming abstraction called DataFrame and can also act as a distributed query engine (querying on different nodes of a cluster). It supports querying either with Hive Query Language (HiveQL) or with SQL.
Check out this insightful video on Spark Tutorial for Beginners:
If you are familiar with the Relational Database Management System (RDBMS) and its tools, then you can say that Spark SQL is just an extension of relational data processing. Big Data can be processed using Spark SQL, which is difficult to implement on a traditional database system.
Why did Spark SQL come into the picture?
Before Spark SQL, there was Apache Hive, which was used for structured data processing. Apache Hive was originally developed to run on Apache Spark, but it had certain limitations as follows:
- Hive deploys MapReduce algorithms for ad-hoc querying. We know that MapReduce algorithms lag in performance when it comes to medium-sized datasets.
- During the execution of a workflow, if the processing fails, Hive does not have the capability to resume from the point where it failed.
- Apache Hive does not support real-time data processing; it uses batch processing instead. It collects the data and processes it in bulk later.
Having outlined all these drawbacks of Hive, it is clear that there was a scope for improvement, which is why Spark SQL came into the picture.
Components of Spark
The different components of Spark are mentioned in the illustration below:
Apache Spark Core
This underlying general execution engine provides distributed task dispatching, basic I/O functionalities, and scheduling. Spark Core offers in-memory computing and dataset referencing in external storage systems.
Spark SQL
Spark SQL is a Spark module on top of Spark Core and is responsible for structured data processing. Spark SQL introduces SchemaRDD, a new data abstraction that provides support for structured and semi-structured data.
Spark Streaming
Spark streaming uses the fast scheduling capability of Spark Core to perform streaming analytics. Spark streaming takes data in mini-batches and performs Resilient Distributed Datasets (RDDs) transformations on that data.
MLlib (Machine Learning Library)
MLlib is a Machine Learning (ML) library that attempts to make practical machine learning scalable and easy. Owing to the distributed memory-based architecture of Spark, MLlib is a distributed machine learning framework above Spark. It is done by the MLlib developers against Alternating Least Squares (ALS) implementations. At a high level, MLlib provides tools such as ML Algorithms.
GraphX
GraphX is a distributed framework for graph processing. GraphX provides an API for the purpose of expressing graph computation that can model user-defined graphs with the help of the Pregel abstraction API. GraphX also provides an optimized runtime for abstraction.
Get 100% Hike!
Master Most in Demand Skills Now!
Features of Spark SQL
Let’s take a stroll into the aspects that make Spark SQL immensely popular in the data processing.
- Easy to Integrate: One can mix SQL queries with Spark programs easily. Structured data can be queried inside Spark programs using either SQL or a Dataframe API. Running SQL queries alongside analytic algorithms is easy because of this tight integration.
- Compatibility with Hive: Hive queries can be executed in Spark SQL as they are.
- Unified Data Access: Loading and querying data from various sources is possible.
- Standard Connectivity: Spark SQL can connect to Java and Oracle using JDBC (Java Database Connectivity) and ODBC (Oracle Database Connectivity) APIs.
- Performance and Scalability: To make queries agile, alongside computing hundreds of nodes using the Spark engine, Spark SQL incorporates a code generator, a cost-based optimizer, and columnar storage. This provides complete mid-query fault tolerance.
Spark SQL Example
Consider a scenario where you wish to create and load two tables along with selecting rows from the tables. Let us use Spark SQL to implement this.
- As the first step, copy the Hue csv and sample_08.csv files to your object store in a location that can be easily accessed by Spark cluster.
- Next, launch the spark-shell.
- Once the launch is initiated and completed, create Hive tables sample_07 and Refer the below code:
scala> spark.sql("CREATE EXTERNAL TABLE sample_07 (code string,description string,total_emp int,salary int) ROW FORMAT DELIMITED FIELDS TERMINATED BY 't' STORED AS TextFile LOCATION 's3a://<bucket_name>/s07/'")
scala> spark.sql("CREATE EXTERNAL TABLE sample_08 (code string,description string,total_emp int,salary int) ROW FORMAT DELIMITED FIELDS TERMINATED BY 't' STORED AS TextFile LOCATION 's3a://<bucket_name>/s08/'")
- Now launch Beeline and show the Hive tables.
- Also, load the CSV files into tables and create DataFrames from the content of Sample 7 and 8
- Now save the DataFrame to show the tables.
Spark SQL Functions
Spark SQL comes with several built-in standard functions (org.apache.spark.sql.functions) to work with DataFrame/Dataset and SQL queries. These Spark SQL functions return org.apache.spark.sql.Column type.
Apart from the standard functions, there are also a few unpopular ones. We will not be covering those in this blog. However, you can still access them using the functions.expr() API and calling them through an SQL expression string.
Spark groups all standard library functions into the below-mentioned categories.
Spark SQL String Functions
In Spark SQL, string functions are grouped as “string_funcs”. Below is a list of only a few of the many such functions:
String Function Syntax |
String Function Description |
ascii(e: Column): Column |
Computes the numeric value of the first character of a string column. The result returned is the int column |
instr(str: Column, substring: String): Column |
Finds the position of the first instance of substr column in a given string. It returns null if either of the arguments is null |
length(e: Column): Column |
Gives the character length of a given string or the number of bytes in a binary string. The length includes trailing spaces and binary zeros |
lower(e: Column): Column |
Converts a string column to lowercase |
trim(e: Column): Column |
Trims the spaces from both ends of a string column |
trim(e: Column, trimString: String): Column |
Trims the specified character from both ends of a given string column |
upper(e: Column): Column |
Converts a string column to uppercase |
Spark SQL Date and Time Functions
Date Functions Syntax |
Date Function Description |
current_date () : Column |
Returns the current date as a date column |
to_date(e: Column): Column |
Converts the column into `DateType` by casting rules to `DateType` |
to_date(e: Column, fmt: String): Column |
Converts the column into a `DateType` with a specified format |
datediff(end: Column, start: Column): Column |
Returns the number of days from `start` to `end` |
year(e: Column): Column |
The year gets extracted as an integer from a given date/timestamp/string |
quarter(e: Column): Column |
The quarter is extracted as an integer from a given date/timestamp/string |
month(e: Column): Column |
Extracts the month as an integer from a given date/timestamp/string |
Timestamp Function Syntax |
Timestamp Function Description |
current_timestamp () : Column |
Returns the current timestamp as a timestamp column |
hour(e: Column): Column |
Extracts the hours in the form of an integer from a given date/timestamp/string |
minute(e: Column): Column |
Extracts the minutes in the form of an integer from a given date/timestamp/string |
second(e: Column): Column |
Extracts the seconds as an integer from a given date/timestamp/string |
to_timestamp(s: Column): Column |
Converts to a timestamp by casting rules to `TimestampType` |
to_timestamp(s: Column, fmt: String): Column |
Converts a time string with a given pattern to timestamp |
Date & Time Window Function Syntax |
Date & Time Window Function Description |
window(timeColumn: Column, windowDuration: String,
slideDuration: String, startTime: String): Column |
Bucketizes rows into one or more time windows given a timestamp specifying column. Window starts are inclusive, while window ends are exclusive. Microsecond precisions are supported but the order of months is not |
window(timeColumn: Column, windowDuration: String, slideDuration: String): Column |
Bucketizes rows into one or more time windows given a timestamp specifying column. Window starts are inclusive, while window ends are exclusive. Windows can support microsecond precision but not the order of months. The windows start at 1970-01-01 00:00:00 UTC |
window(timeColumn: Column, windowDuration: String): Column |
Generates tumbling time windows given a timestamp specifying column. Window starts are inclusive, while window ends are exclusive. Windows support microsecond precision but not the order of months. The windows start at 1970-01-01 00:00:00 UTC |
Spark SQL Collection Functions
Collection functions involve arrays and maps.
Array Function Syntax |
Array Function Description |
array_contains(column: Column, value: Any) |
Checks if a value is present in an array column. Returns the below-mentioned values:
true – If a value is present in an array
false – When a value is not present in an array
null – When the array is null |
array_distinct(e: Column) |
Returns distinct values from the array after removing duplicates |
array_except(col1: Column, col2: Column) |
Returns all elements from col1 array but not from col2 array |
array_intersect(col1: Column, col2: Column) |
Returns all elements that are present in col1 and col2 arrays |
array_max(e: Column) |
Returns maximum values in an array |
array_min(e: Column) |
Returns minimum values in an array |
array_remove(column: Column, element: Any) |
Returns an array after removing all given ‘values’ from the given array |
array_repeat(e: Column, count: Int) |
Creates an array containing the first argument repeated the number of times given by the second argument |
Map Function Syntax |
Map Function Description |
map |
Creates a new map column |
map_keys |
Returns an array containing the keys of the map |
map_values |
Returns an array containing the values of the map |
map_concat |
Merges maps specified in arguments |
map_from_entries |
Returns a map from the given array of StructType entries |
map_entries |
Returns an array of all StructType in the given map |
transform_keys(expr: Column, f: (Column, Column) => Column) |
Transforms map by applying functions to every key-value pair and returns a transformed map |
transform_values(expr: Column, f: (Column, Column) => Column) |
Transforms map by applying functions to every key-value pair and returns a transformed map |
Spark SQL Math Functions
Mentioned below are subsets of mathematical and statistical functions:
Math Trigonometric Function Syntax |
Math Trigonometric Function Description |
asin(e: Column): Column
asin(columnName: String): Column |
Returns the arcsin or inverse sine of the input argument, same as java.lang.Math.asin() function |
acos(e: Column): Column
acos(columnName: String): Column |
Returns the arcsin or inverse cosine of input argument, same as java.lang.Math.acos() function |
atan(e: Column): Column
atan(columnName: String): Column |
Returns the arctangent or inverse tangent of input argument, same as java.lang.Math.atan() function |
sin ( e : Column ) : Column
sin(columnName: String): Column |
Returns the sine of the angle, same as java.lang.Math.sin() function |
sinh ( e : Column ) : Column
sinh(columnName: String): Column |
Returns the hyperbolic sine of the given value, same as java.lang.Math.sinh() function |
cos(e: Column): Column
cos(columnName: String): Column |
Returns the cosine of the angle, same as java.lang.Math.cos() function |
cosh(e: Column): Column
cosh(columnName: String): Column |
Returns the hyperbolic cosine of the angle, same as java.lang.Math.cosh() function |
tan(e: Column): Column
tan(columnName: String): Column |
Returns the tangent of the given value, same as java.lang.Math.tan() function |
Spark SQL Log Math Functions Syntax |
Spark Functions Description |
log(columnName: String): Column |
Computes the natural logarithm of the given column |
log(base: Double, a: Column): Column
log(base: Double, columnName: String): Column |
Returns the first argument-base logarithm of the second argument |
log10(e: Column): Column
log10(columnName: String): Column |
Computes the logarithm of the given value in base 10 |
log1p(e: Column): Column
log1p(columnName: String): Column |
Computes the natural logarithm of the given value plus one |
log2(expr: Column): Column
log2(columnName: String): Column |
Computes the logarithm of the given column in base 2 |
Spark SQL Aggregate Functions
Aggregate Function Syntax |
Aggregate Function Description |
approx_count_distinct(e: Column) |
Returns the number of distinct items in a group |
approx_count_distinct(e: Column, rsd: Double) |
Returns the approximate number of distinct items in a group |
avg(e: Column) |
Returns the average of values in the input column |
collect_list(e: Column) |
Returns all values from an input column with duplicates |
collect_set(e: Column) |
Returns all values from an input column with duplicate values getting eliminated |
corr(column1: Column, column2: Column) |
Returns the Pearson Correlation Coefficient for two columns |
count(e: Column) |
Returns the count of elements in a column |
countDistinct(expr: Column, exprs: Column*) |
Returns the count of distinct elements in the columns |
Spark SQL Window Functions
Window Function Syntax |
Window Function Description |
row_number(): Column |
Returns a sequential number starting from 1 within a window partition |
rank(): Column |
Returns the rank of rows within a window partition with gaps |
percent_rank(): Column |
Returns the percentile rank of rows within a window partition |
dense_rank(): Column |
Returns the rank of rows within a window partition without any gaps while using Rank() returns rank with gaps |
ntile(n: Int): Column |
Returns the ntile id in a window partition |
cume_dist(): Column |
Returns the cumulative distribution of values within a window partition |
lag(e: Column, offset: Int): Column
lag(columnName: String, offset: Int): Column
lag(columnName: String, offset: Int, defaultValue: Any): Column |
Returns the value that is `offset` rows before the current row, and `null` if there are less than `offset` rows before the current row |
lead(columnName: String, offset: Int): Column
lead(columnName: String, offset: Int): Column
lead(columnName: String, offset: Int, defaultValue: Any): Column |
Returns the value that is `offset` rows after the current row, and `null` if there are less than `offset` rows after the current row |
Advantages of Spark SQL
The following are the various advantages of using Spark SQL:
- It helps in easy data querying. The SQL queries are mixed with Spark programs for querying structured data as a distributed dataset (RDD). Also, the SQL queries are run with analytic algorithms using Spark SQL’s integration property.
- Another important advantage of Spark SQL is that the loading and querying can be done for data from different sources. Hence, the data access is unified.
- It offers standard connectivity as Spark SQL can be connected through JDBC or ODBC.
- It can be used for faster processing of Hive tables.
- Another important offering of Spark SQL is that it can run unmodified Hive queries on existing warehouses as it allows easy compatibility with existing Hive data and queries.
Disadvantages of Spark SQL
The following are the disadvantages of Spark SQL:
- Creating or reading tables containing union fields is not possible with Spark SQL.
- It does not convey if there is any error in situations where the varchar is oversized.
- It does not support Hive transactions.
- It also does not support the Char type (fixed-length strings). Hence, reading or creating a table with such fields is not possible.
Sorting Functions
Sort Function Syntax |
Sort Function Description |
asc(columnName: String): Column |
Specifies the ascending order of the sorting column on DataFrame or DataSet |
asc_nulls_first(columnName: String): Column |
Similar to asc function but null values return before non-null values |
asc_nulls_last(columnName: String): Column |
Similar to asc function but non-null values return before null values |
desc(columnName: String): Column |
Specifies the descending order of the DataFrame or DataSet sorting column |
desc_nulls_first(columnName: String): Column |
Similar to desc function but null values return before non-null values |
desc_nulls_last(columnName: String): Column |
Similar to desc function but non-null values return before null values |
Adding a Schema to RDDs Programmatically
We can add a schema to an existing RDD programmatically. It can be done by importing the respective API, applying transformations, etc.
In the below example, a text file is being stored into an RDD. And then, we are adding a schema to that RDD.
The file employees.txt has the following data:
Caching Tables In-memory
Here, we are creating a temporary view in memory for the above text file. Temporary views can be accessed faster as they are stored in memory. Since queries are performed on a cached table, we will get the desired results in no time.
Caching is mainly done for faster execution.