• Articles
  • Tutorials
  • Interview Questions

Spark SQL Tutorial

Table of content

Show More

What is Spark SQL?

Spark SQL is one of the main components of the Apache Spark framework. It is mainly used for structured data processing. It provides various Application Programming Interfaces (APIs) in Python, Java, Scala, and R. Spark SQL integrates relational data processing with the functional programming API of Spark.

Spark SQL provides a programming abstraction called DataFrame and can also act as a distributed query engine (querying on different nodes of a cluster). It supports querying either with Hive Query Language (HiveQL) or with SQL.

Check out this insightful video on Spark Tutorial for Beginners:

Video Thumbnail

Youtube subscribe

If you are familiar with the Relational Database Management System (RDBMS) and its tools, then you can say that Spark SQL is just an extension of relational data processing. Big Data can be processed using Spark SQL, which is difficult to implement on a traditional database system.

Why did Spark SQL come into the picture?

Before Spark SQL, there was Apache Hive, which was used for structured data processing. Apache Hive was originally developed to run on Apache Spark, but it had certain limitations as follows:

  • Hive deploys MapReduce algorithms for ad-hoc querying. We know that MapReduce algorithms lag in performance when it comes to medium-sized datasets.
  • During the execution of a workflow, if the processing fails, Hive does not have the capability to resume from the point where it failed.
  • Apache Hive does not support real-time data processing; it uses batch processing instead. It collects the data and processes it in bulk later.

Having outlined all these drawbacks of Hive, it is clear that there was a scope for improvement, which is why Spark SQL came into the picture.

Components of Spark

The different components of Spark are mentioned in the illustration below:

Components of Spark

Apache Spark Core

This underlying general execution engine provides distributed task dispatching, basic I/O functionalities, and scheduling. Spark Core offers in-memory computing and dataset referencing in external storage systems.

Spark SQL

Spark SQL is a Spark module on top of Spark Core and is responsible for structured data processing. Spark SQL introduces SchemaRDD, a new data abstraction that provides support for structured and semi-structured data.

Spark Streaming

Spark streaming uses the fast scheduling capability of Spark Core to perform streaming analytics. Spark streaming takes data in mini-batches and performs Resilient Distributed Datasets (RDDs) transformations on that data.

MLlib (Machine Learning Library)

MLlib is a Machine Learning (ML) library that attempts to make practical machine learning scalable and easy. Owing to the distributed memory-based architecture of Spark, MLlib is a distributed machine learning framework above Spark. It is done by the MLlib developers against Alternating Least Squares (ALS) implementations. At a high level, MLlib provides tools such as ML Algorithms.

GraphX

GraphX is a distributed framework for graph processing. GraphX provides an API for the purpose of expressing graph computation that can model user-defined graphs with the help of the Pregel abstraction API. GraphX also provides an optimized runtime for abstraction.

Get 100% Hike!

Master Most in Demand Skills Now!

Features of Spark SQL

Let’s take a stroll into the aspects that make Spark SQL immensely popular in the data processing.

  • Easy to Integrate: One can mix SQL queries with Spark programs easily. Structured data can be queried inside Spark programs using either SQL or a Dataframe API. Running SQL queries alongside analytic algorithms is easy because of this tight integration.
  • Compatibility with Hive: Hive queries can be executed in Spark SQL as they are.
  • Unified Data Access: Loading and querying data from various sources is possible.
  • Standard Connectivity: Spark SQL can connect to Java and Oracle using JDBC (Java Database Connectivity) and ODBC (Oracle Database Connectivity) APIs.
  • Performance and Scalability: To make queries agile, alongside computing hundreds of nodes using the Spark engine, Spark SQL incorporates a code generator, a cost-based optimizer, and columnar storage. This provides complete mid-query fault tolerance.

Spark SQL Example

Consider a scenario where you wish to create and load two tables along with selecting rows from the tables. Let us use Spark SQL to implement this.

  • As the first step, copy the Hue csv and sample_08.csv files to your object store in a location that can be easily accessed by Spark cluster.
  • Next, launch the spark-shell.
  • Once the launch is initiated and completed, create Hive tables sample_07 and Refer the below code:
scala> spark.sql("CREATE EXTERNAL TABLE sample_07 (code string,description string,total_emp int,salary int) ROW FORMAT DELIMITED FIELDS TERMINATED BY 't' STORED AS TextFile LOCATION 's3a://<bucket_name>/s07/'")

 

scala> spark.sql("CREATE EXTERNAL TABLE sample_08 (code string,description string,total_emp int,salary int) ROW FORMAT DELIMITED FIELDS TERMINATED BY 't' STORED AS TextFile LOCATION 's3a://<bucket_name>/s08/'")
  • Now launch Beeline and show the Hive tables.
  • Also, load the CSV files into tables and create DataFrames from the content of Sample 7 and 8
  • Now save the DataFrame to show the tables.

Spark SQL Functions

Spark SQL comes with several built-in standard functions (org.apache.spark.sql.functions) to work with DataFrame/Dataset and SQL queries. These Spark SQL functions return org.apache.spark.sql.Column type.

Apart from the standard functions, there are also a few unpopular ones. We will not be covering those in this blog. However, you can still access them using the functions.expr() API and calling them through an SQL expression string.

Spark groups all standard library functions into the below-mentioned categories.

Spark SQL String Functions

In Spark SQL, string functions are grouped as “string_funcs”. Below is a list of only a few of the many such functions:

String Function Syntax String Function Description
ascii(e: Column): Column Computes the numeric value of the first character of a string column. The result returned is the int column
instr(str: Column, substring: String): Column Finds the position of the first instance of substr column in a given string. It returns null if either of the arguments is null
length(e: Column): Column Gives the character length of a given string or the number of bytes in a binary string. The length includes trailing spaces and binary zeros
lower(e: Column): Column Converts a string column to lowercase
trim(e: Column): Column Trims the spaces from both ends of a string column
trim(e: Column, trimString: String): Column Trims the specified character from both ends of a given string column
upper(e: Column): Column Converts a string column to uppercase

Spark SQL Date and Time Functions

Date Functions Syntax Date Function Description
current_date () : Column Returns the current date as a date column
to_date(e: Column): Column Converts the column into `DateType` by casting rules to `DateType`
to_date(e: Column, fmt: String): Column Converts the column into a `DateType` with a specified format
datediff(end: Column, start: Column): Column Returns the number of days from `start` to `end`
year(e: Column): Column The year gets extracted as an integer from a given date/timestamp/string
quarter(e: Column): Column The quarter is extracted as an integer from a given date/timestamp/string
month(e: Column): Column Extracts the month as an integer from a given date/timestamp/string
Timestamp Function Syntax Timestamp Function Description
current_timestamp () : Column Returns the current timestamp as a timestamp column
hour(e: Column): Column Extracts the hours in the form of an integer from a given date/timestamp/string
minute(e: Column): Column Extracts the minutes in the form of an integer from a given date/timestamp/string
second(e: Column): Column Extracts the seconds as an integer from a given date/timestamp/string
to_timestamp(s: Column): Column Converts to a timestamp by casting rules to `TimestampType`
to_timestamp(s: Column, fmt: String): Column Converts a time string with a given pattern to timestamp
Date & Time Window Function Syntax Date & Time Window Function Description
window(timeColumn: Column, windowDuration: String,

slideDuration: String, startTime: String): Column

Bucketizes rows into one or more time windows given a timestamp specifying column. Window starts are inclusive, while window ends are exclusive. Microsecond precisions are supported but the order of months is not
window(timeColumn: Column, windowDuration: String, slideDuration: String): Column Bucketizes rows into one or more time windows given a timestamp specifying column. Window starts are inclusive, while window ends are exclusive. Windows can support microsecond precision but not the order of months. The windows start at 1970-01-01 00:00:00 UTC
window(timeColumn: Column, windowDuration: String): Column Generates tumbling time windows given a timestamp specifying column. Window starts are inclusive, while window ends are exclusive. Windows support microsecond precision but not the order of months. The windows start at 1970-01-01 00:00:00 UTC

Spark SQL Collection Functions

Collection functions involve arrays and maps.

Array Function Syntax Array Function Description
array_contains(column: Column, value: Any) Checks if a value is present in an array column. Returns the below-mentioned values:

true – If a value is present in an array

false – When a value is not present in an array

null – When the array is null

array_distinct(e: Column) Returns distinct values from the array after removing duplicates
array_except(col1: Column, col2: Column) Returns all elements from col1 array but not from col2 array
array_intersect(col1: Column, col2: Column) Returns all elements that are present in col1 and col2 arrays
array_max(e: Column) Returns maximum values in an array
array_min(e: Column) Returns minimum values in an array
array_remove(column: Column, element: Any) Returns an array after removing all given ‘values’ from the given array
array_repeat(e: Column, count: Int) Creates an array containing the first argument repeated the number of times given by the second argument
Map Function Syntax Map Function Description
map Creates a new map column
map_keys Returns an array containing the keys of the map
map_values Returns an array containing the values of the map
map_concat Merges maps specified in arguments
map_from_entries Returns a map from the given array of StructType entries
map_entries Returns an array of all StructType in the given map
transform_keys(expr: Column, f: (Column, Column) => Column) Transforms map by applying functions to every key-value pair and returns a transformed map
transform_values(expr: Column, f: (Column, Column) => Column) Transforms map by applying functions to every key-value pair and returns a transformed map

Spark SQL Math Functions

Mentioned below are subsets of mathematical and statistical functions:

Math Trigonometric Function Syntax Math Trigonometric Function Description
asin(e: Column): Column

asin(columnName: String): Column

Returns the arcsin or inverse sine of the input argument, same as java.lang.Math.asin() function
acos(e: Column): Column

acos(columnName: String): Column

Returns the arcsin or inverse cosine of input argument, same as java.lang.Math.acos() function
atan(e: Column): Column

atan(columnName: String): Column

Returns the arctangent or inverse tangent of input argument, same as java.lang.Math.atan() function
sin ( e : Column ) : Column

sin(columnName: String): Column

Returns the sine of the angle, same as java.lang.Math.sin() function
sinh ( e : Column ) : Column

sinh(columnName: String): Column

Returns the hyperbolic sine of the given value, same as java.lang.Math.sinh() function
cos(e: Column): Column

cos(columnName: String): Column

Returns the cosine of the angle, same as java.lang.Math.cos() function
cosh(e: Column): Column

cosh(columnName: String): Column

Returns the hyperbolic cosine of the angle, same as java.lang.Math.cosh() function
tan(e: Column): Column

tan(columnName: String): Column

Returns the tangent of the given value, same as java.lang.Math.tan() function
Spark SQL Log Math Functions Syntax Spark Functions Description
log(columnName: String): Column Computes the natural logarithm of the given column
log(base: Double, a: Column): Column

log(base: Double, columnName: String): Column

Returns the first argument-base logarithm of the second argument
log10(e: Column): Column

log10(columnName: String): Column

Computes the logarithm of the given value in base 10
log1p(e: Column): Column

log1p(columnName: String): Column

Computes the natural logarithm of the given value plus one
log2(expr: Column): Column

log2(columnName: String): Column

Computes the logarithm of the given column in base 2

Spark SQL Aggregate Functions

Aggregate Function Syntax Aggregate Function Description
approx_count_distinct(e: Column) Returns the number of distinct items in a group
approx_count_distinct(e: Column, rsd: Double) Returns the approximate number of distinct items in a group
avg(e: Column) Returns the average of values in the input column
collect_list(e: Column) Returns all values from an input column with duplicates
collect_set(e: Column) Returns all values from an input column with duplicate values getting eliminated
corr(column1: Column, column2: Column) Returns the Pearson Correlation Coefficient for two columns
count(e: Column) Returns the count of elements in a column
countDistinct(expr: Column, exprs: Column*) Returns the count of distinct elements in the columns

Spark SQL Window Functions

Window Function Syntax Window Function Description
row_number(): Column Returns a sequential number starting from 1 within a window partition
rank(): Column Returns the rank of rows within a window partition with gaps
percent_rank(): Column Returns the percentile rank of rows within a window partition
dense_rank(): Column Returns the rank of rows within a window partition without any gaps while using Rank() returns rank with gaps
ntile(n: Int): Column Returns the ntile id in a window partition
cume_dist(): Column Returns the cumulative distribution of values within a window partition
lag(e: Column, offset: Int): Column

lag(columnName: String, offset: Int): Column

lag(columnName: String, offset: Int, defaultValue: Any): Column

Returns the value that is `offset` rows before the current row, and `null` if there are less than `offset` rows before the current row
lead(columnName: String, offset: Int): Column

lead(columnName: String, offset: Int): Column

lead(columnName: String, offset: Int, defaultValue: Any): Column

Returns the value that is `offset` rows after the current row, and `null` if there are less than `offset` rows after the current row

Advantages of Spark SQL

The following are the various advantages of using Spark SQL:

  • It helps in easy data querying. The SQL queries are mixed with Spark programs for querying structured data as a distributed dataset (RDD). Also, the SQL queries are run with analytic algorithms using Spark SQL’s integration property.
  • Another important advantage of Spark SQL is that the loading and querying can be done for data from different sources. Hence, the data access is unified.
  • It offers standard connectivity as Spark SQL can be connected through JDBC or ODBC.
  • It can be used for faster processing of Hive tables.
  • Another important offering of Spark SQL is that it can run unmodified Hive queries on existing warehouses as it allows easy compatibility with existing Hive data and queries.

Disadvantages of Spark SQL

The following are the disadvantages of Spark SQL:

  • Creating or reading tables containing union fields is not possible with Spark SQL.
  • It does not convey if there is any error in situations where the varchar is oversized.
  • It does not support Hive transactions.
  • It also does not support the Char type (fixed-length strings). Hence, reading or creating a table with such fields is not possible.

Sorting Functions

Sort Function Syntax Sort Function Description
asc(columnName: String): Column Specifies the ascending order of the sorting column on DataFrame or DataSet
asc_nulls_first(columnName: String): Column Similar to asc function but null values return before non-null values
asc_nulls_last(columnName: String): Column Similar to asc function but non-null values return before null values
desc(columnName: String): Column Specifies the descending order of the DataFrame or DataSet sorting column
desc_nulls_first(columnName: String): Column Similar to desc function but null values return before non-null values
desc_nulls_last(columnName: String): Column Similar to desc function but non-null values return before null values

Adding a Schema to RDDs Programmatically

We can add a schema to an existing RDD programmatically. It can be done by importing the respective API, applying transformations, etc.

In the below example, a text file is being stored into an RDD. And then, we are adding a schema to that RDD.

The file employees.txt has the following data:

Adding a Schema

Programmatically

Caching Tables In-memory

Here, we are creating a temporary view in memory for the above text file. Temporary views can be accessed faster as they are stored in memory. Since queries are performed on a cached table, we will get the desired results in no time.

Caching is mainly done for faster execution.

Caching

Course Schedule

Name Date Details
Big Data Course 23 Nov 2024(Sat-Sun) Weekend Batch View Details
30 Nov 2024(Sat-Sun) Weekend Batch
07 Dec 2024(Sat-Sun) Weekend Batch

About the Author

Technical Research Analyst - Big Data Engineering

Abhijit is a Technical Research Analyst specialising in Big Data and Azure Data Engineering. He has 4+ years of experience in the Big data domain and provides consultancy services to several Fortune 500 companies. His expertise includes breaking down highly technical concepts into easy-to-understand content.