Writing Spark batches only in SQL

Apache SparkTM is known as popular big data framework which is faster than Hadoop MapReduce, easy-to-use, and fault-tolerant. Most of the Spark tutorials require readers to understand Scala, Java, or Python as base programming language. But, in my opinion, SQL is enough to write a spark batch script.

In this article, I will show that you can write Spark batches only in SQL if your input data is ready as structured dataset. This means that you don’t need to learn Scala or Python, RDD, DataFrame if your job can be expressed in SQL. Moreover, the expression power of SparkSQL may be stronger than you think.

The SQL scripts as follows can be found at https://github.com/sanori/spark-sql-example. You can test them yourself.

Spark batches

Typical Spark batches are a program that

  1. read data from data sources,
  2. transform and calculate the data, and
  3. save the result.

Most of the Spark tutorials require Scala or Python (or R) programming language to write a Spark batch. For example, you may write a Python script to calculate the lines of each plays of Shakespeare when you are provided the full text in parquet format as follows. (Some codes are included for illustration purpose.)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from pyspark.sql import SparkSession
import pyspark.sql.functions as sf
import os.path

# Start Spark
spark = SparkSession \
.builder \
.appName("pyspark example") \
.getOrCreate()

basedir = os.path.dirname(os.path.realpath(__file__))

# Read all of Shakespeare's plays
df = spark.read.parquet(os.path.join(basedir, "data/shakespeare.gz.parquet"))

# Print the schema to the console
df.printSchema()

# Calculate number of lines of each work (play)
result = df \
.groupBy("play_name") \
.agg(sf.count("line_id").alias("lines")) \
.orderBy("lines", ascending=False)

# Print a part of the result to the console
result.show()

# Save the result as one file in JSON Lines format
result \
.repartition(1) \
.write \
.json(os.path.join(basedir, "length_of_play"), mode="overwrite")

# Stop Spark
spark.stop()

The data calculation part of the above code is to get the result DataFrame, which is from line 20 to 23. This part can be written using SQL as follows:

1
2
3
4
5
6
7
8
# Calculate number of lines of each work (play)
df.createOrReplaceTempView("shakespeare") # register df as shakespeare table
result = spark.sql("""
SELECT play_name, count(line_id) AS lines
FROM shakespeare
GROUP BY play_name
ORDER BY lines DESC
""")

As I learn more about Spark, I realized that most of the data processing code can be replaced by SQL. And I found that most of the Spark batch script can be rewritten in SQL. For example, the above batch script can be rewritten in SQL as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
-- Read all of Shakespeare's plays
CREATE TEMPORARY VIEW shakespeare
USING parquet
OPTIONS (path "data/shakespeare.gz.parquet");

-- Print the table schema and additional informations to the console
DESCRIBE EXTENDED shakespeare;

-- Calculate number of lines of each work and print to the console
SELECT play_name, count(line_id) AS lines
FROM shakespeare
GROUP BY play_name
ORDER BY lines DESC
LIMIT 20;

-- Save the result as one file in JSON Lines format
DROP TABLE IF EXISTS lengthOfPlay; -- to overwrite, remove existing table
SET spark.sql.shuffle.partitions=1; -- to make single output file
CREATE TABLE lengthOfPlay
USING json
LOCATION "length_of_play"
AS SELECT play_name, count(line_id) AS lines
FROM shakespeare
GROUP BY play_name
ORDER BY lines DESC;

The above SQL script can be executed by spark-sql which is included in default Spark distribution. This may imply that Spark creators consider SQL as one of the main programming language. In fact, most of the SQL references are from the official Spark programming guide named Spark SQL, DataFrames and Datasets Guide. To see the SQL examples, you should click “Sql” tab as follows:

Writing SQL only script

The most problematic part of writing a Spark script only in SQL would be file operations. Most of the Spark tutorials suggest to use Scala or Python Language to read a data file and/or save the result of data processing. But, there are several ways to read or write a data file in SparkSQL as follows.

The file path in the following examples can be a HDFS URI path. That is, you can write SQL script for cluster or distributed environment.

Read a data file

Read directly from SQL FROM

You can simply read the data as a table using SQL FROM statement.

1
2
SELECT DISTINCT play_name
FROM parquet.`data/shakespeare.gz.parquet`

Reference: http://spark.apache.org/docs/latest/sql-programming-guide.html#run-sql-on-files-directly

Register a file as VIEW or TABLE

You may use data definition language such as CREATE VIEW or CREATE TABLE if you want to load data as a table or a view. I recommend to use CREATE TEMPORARY VIEW if you don’t want to modify the original data since a TABLE can be modified by INSERT INTO, TRUNCATE, etc.

1
2
3
CREATE TEMPORARY VIEW shakespeare
USING parquet
OPTIONS (path "data/shakespeare.gz.parquet")

Reference:

Write a result to a file

In Spark tutorials with Scala or Python, the processed result can be saved to a file using write API of Dataset. This is intuitive for most programmers.

In SQL, all the data are abstracted as tables. Therefore, you should create a table to save or write the resultant data, instead of searching for write API for SQL.

CREATE TABLE AS SELECT

If you save a new dataset, you may use CREATE TABLE AS SELECT statement.

1
2
3
4
5
6
7
CREATE TABLE lengthOfPlay
USING json
LOCATION "length_of_play"
AS SELECT play_name, count(line_id) AS lines
FROM shakespeare
GROUP BY play_name
ORDER BY lines DESC;

INSERT SELECT

If you want to append new data to existing table or directory, you may use INSERT INTO <table> SELECT statement.

1
2
3
4
5
INSERT INTO lengthOfPlay
SELECT play_name, count(line_id) AS lines
FROM shakespeare
GROUP BY play_name
ORDER BY lines DESC;

The table where the data are added must be created before to use INSERT. A typical table definition example is as follows.

1
2
3
4
5
6
CREATE TABLE IF NOT EXISTS lengthOfPlay (
play_name STRING,
lines INT
)
USING json
LOCATION "length_of_play";

For more information, consult https://docs.databricks.com/spark/latest/spark-sql/language-manual/insert.html

You may consider INSERT OVERWRITE TABLE statement to create a new dataset. But, in my humble opinion, CREATE TABLE AS SELECT is better to create new dataset since the table should be created before INSERT statement, and there is a chance of schema inconsistency between CREATE TABLE and INSERT SELECT.

Note that there is no UPDATE statement in SparkSQL since all the data in Spark are immutable, or not changeable. You may create, append, or delete data, but you cannot change existing data.

Running SQL only script

spark-sql

If you have installed Spark, you can execute the SQL script file as follows:

1
$SPARK_HOME/bin/spark-sql -f <script.sql>

If you have added the spark executables to your PATH, spark-sql -f <script.sql> is enough.

Like spark-submit, spark-sql supports --master, --conf, --executor-memory MEM, --executor-cores NUM options. Therefore you can tune Spark parameters for better performance.

For more information on options and its meaning, consult http://spark.apache.org/docs/latest/submitting-applications.html#launching-applications-with-spark-submit

Setting parameters by SET

You can set Spark parameters using SET statement in SparkSQL instead of setting parameters in spark-sql options. For example, you can replace --master option in spark-sql as follows.

1
SET spark.master="local[*]";

For more configuration variables, consult http://spark.apache.org/docs/latest/configuration.html#available-properties

Reference: https://docs.databricks.com/spark/latest/spark-sql/language-manual/set.html

Difference from RDBMS

Although Spark supports SQL including data definition language, Spark is NOT a relational DBMS. Spark just have taken SQL as data processing language.

All the data in Spark are immutable. That is, you cannot update existing tables, rows, and columns. There is no UPDATE statement.

Motivation of this article

  • Increasing Boilerplates in Spark batch scripts
    • Boilerplate codes are increasing as time goes by and it makes hard to manage.
    • Most of Python/Scala batches are the same except SQL part which process data.
    • Shell scripts are also required to submit Spark jobs using spark-submit with parameters.
  • SparkSQL is similar to Dataset API
    • Writing Dataset processing code using Dataset API is very similar to write SQL script.
    • Most of the Spark batches can be expressed in SQL language only.
    • SparkSQL examples exist in Dataset API.
  • Dataset is recommended than RDD since Spark 2
    • Dataset, which is general type of DataFrame, is supported by query optimizer (Catalyst) and efficient code generator and serializer (Tungsten).
    • It is recommended to use Dataset instead of RDD due to the efficiency when the developer is not a Spark expert.

Further Works

The expresion power of SparkSQL may be more than we think.

I will write a article about these topics when I get time.

References