Distributed SQL Query using SparkSQL, HDFS and Sqoop

Spark SQL: A brief introduction

Spark SQL is a component of Spark framework. It allows to manipulate big unstructured data file and extract useful information using SQL. It introduces a new data abstraction called DataFrames allowing the analysis of structured and semi-structured data. Spark SQL provides API in Scala,Python and Java in order to manipulate DataFrames. It also provides the support for SQL language, a command line interface and an ODBC/JDBC server. The example described in this post shows how to write a simple Spark application in order to execute an SQL query using Spark.

Import MySQL data into HDFS

In this paragraph I show how import a MySQL database in hadoop using sqoop in the following paragraph I use this data loaded in HDFS in order to execute an SQL query.

I’m using the “world” database that can be downloaded from this [link] (https://dev.mysql.com/doc/index-other.html). It contains data about cities and countries around the world and the languages spoken in each country.

example entity relationship diagram

I import all tables of world database in hdfs using as output format a text file separated by tab character. The following command imports all the table in the hdfs directory /user/cloudera/world

[cloudera@quickstart ~]$ sqoop import-all-tables --connect jdbc:mysql://localhost/world --username root -P --warehouse-dir  /user/cloudera/world  --fields-terminated-by '\t'

As you can observe watching the following command Sqoop has created a sub directory for each MySQL table and it has divided table data in different files with the same prefix “part-m-“.

[cloudera@quickstart ~]$ hadoop fs -ls /user/cloudera/world
Found 3 items
drwxr-xr-x   - cloudera cloudera          0 2016-02-29 21:54 /user/cloudera/world/City
drwxr-xr-x   - cloudera cloudera          0 2016-02-29 21:55 /user/cloudera/world/Country
drwxr-xr-x   - cloudera cloudera          0 2016-02-29 21:55 /user/cloudera/world/CountryLanguage

[cloudera@quickstart ~]$ hadoop fs -ls /user/cloudera/world/City
Found 5 items
-rw-r--r--   1 cloudera cloudera          0 2016-02-29 21:54 /user/cloudera/world/City/_SUCCESS
-rw-r--r--   1 cloudera cloudera      37088 2016-02-29 21:54 /user/cloudera/world/City/part-m-00000
-rw-r--r--   1 cloudera cloudera      35361 2016-02-29 21:54 /user/cloudera/world/City/part-m-00001
-rw-r--r--   1 cloudera cloudera      35884 2016-02-29 21:54 /user/cloudera/world/City/part-m-00002
-rw-r--r--   1 cloudera cloudera      36148 2016-02-29 21:54 /user/cloudera/world/City/part-m-00003

[cloudera@quickstart ~]$ hadoop fs -cat  /user/cloudera/world/City/part-m-00000|head
1       Kabul   AFG     Kabol   1780000
2       Qandahar        AFG     Qandahar        237500
3       Herat   AFG     Herat   186800
4       Mazar-e-Sharif  AFG     Balkh   127800
5       Amsterdam       NLD     Noord-Holland   731200
6       Rotterdam       NLD     Zuid-Holland    593321
7       Haag    NLD     Zuid-Holland    440900
8       Utrecht NLD     Utrecht 234323
9       Eindhoven       NLD     Noord-Brabant   201843
10      Tilburg NLD     Noord-Brabant   193238

Spark SQL application

This paragraph describes the simple application that I wrote in order to execute the SQL Query using Spark SQL on the HDFS data imported in the last paragraph of this post.

The SQL query performs a join between all the three table of the database and it allows to extract the top ten country names and their capitals ordered by life expectancy where more than 50% of people speaks English.

First of all, i created the function “loadCSV” in order to load the data from HDFS in my application. This function accepts three parameters: the HDFS location, the name of the table used in Spark SQL and a lamba function mapping each field of HDFS text files in a specific type (string,integer,float) of a table. It parse each line of the text file in the HDFS location, split them by line and parse each line in order to extract the content of each field; at the end this function register the data on a Spark temporary table.

Later I execute the query described above and save the result on a parquet file.

from pyspark import SparkContext
from pyspark.sql import SQLContext, Row

sc = SparkContext("local", "Simple SparqSQL Join")

sqlContext = SQLContext(sc)

def loadCSV(path,tableName,function):
    csv = sc.textFile(path)
    columns = csv.map(lambda l: l.split("\t"))
    table = columns.map(function)
    schema=sqlContext.createDataFrame(table)
    schema.registerTempTable(tableName)

loadCSV("/user/giovanni/world/City/*","City",
    lambda p:
        Row(
            id=int(p[0]),
            name=p[1],
            countrycode=p[2],
            district=p[3],
            population= int(p[4])
        )
    )
loadCSV("/user/giovanni/world/CountryLanguage/*","CountryLanguage",
    lambda p:
        Row(countrycode=p[0],
            language=p[1],
            isofficial=p[2],
            percentage=float(p[3])
        )
    )
loadCSV("/user/giovanni/world/Country/*","Country",
    lambda p:
        Row(code=p[0],
            name=p[1],
            continent=p[2],
            region=p[3],
            surfacearea=None if p[4]=='null' else float(p[4]),
            indepyear=None if p[5]=='null' else int(p[5]),
            population=int(p[6]),
            lifeexpectancy=None if p[7]=='null' else float(p[7]),
            gnp=float(p[8]),
            gnpold=None if p[9]=='null' else float(p[9]),
            localname=p[10],
            governmentform=p[11],
            headofstate=p[12],
            capital=None if p[13]=='null' else int(p[13]),
            code2=p[14]
        )
    )

outputResult = sqlContext.sql(
"""SELECT  Country.name as CountryName,
           Country.lifeexpectancy as CountryLifeExpectancy,
           City.name as CapitalName,
           CountryLanguage.language
    FROM
           Country JOIN City on Country.capital = City.id
    JOIN
           CountryLanguage ON CountryLanguage.countrycode = Country.code
    WHERE  CountryLanguage.language ="English"
           AND CountryLanguage.percentage > 50
    ORDER BY CountryLifeExpectancy desc limit 10""")

outputResult.save("sparkSQLResult")

Execute the application and view the result

To run the application in my Hadoop cluster I simply wrote the source code described in the previous paragraph in the file spark_sql.py and I run it using the command spark-submit

[cloudera@quickstart ~]$ spark-submit /home/cloudera/workspace/pyspark_examples/spark_sql.py

The next snippet use parquet tools in order to view in a human readable format the result stored in the parquet file.

[cloudera@quickstart ~]$ hadoop fs -ls /user/cloudera/sparkSQLResult
Found 4 items
-rw-r--r--   1 cloudera cloudera          0 2016-02-29 22:07 /user/cloudera/sparkSQLResult/_SUCCESS
-rw-r--r--   1 cloudera cloudera        496 2016-02-29 22:07 /user/cloudera/sparkSQLResult/_common_metadata
-rw-r--r--   1 cloudera cloudera        866 2016-02-29 22:07 /user/cloudera/sparkSQLResult/_metadata
-rw-r--r--   1 cloudera cloudera       1371 2016-02-29 22:07 /user/cloudera/sparkSQLResult/part-r-00001.parquet

[root@quickstart ~]# hadoop parquet.tools.Main cat /user/cloudera/sparkSQLResult
CountryName = Australia
CountryLifeExpectancy = 79.8
CapitalName = Canberra
language = English

CountryName = Canada
CountryLifeExpectancy = 79.4
CapitalName = Ottawa
language = English

CountryName = Gibraltar
CountryLifeExpectancy = 79.0
CapitalName = Gibraltar
language = English

CountryName = Virgin Islands, U.S.
CountryLifeExpectancy = 78.1
CapitalName = Charlotte Amalie
language = English

CountryName = New Zealand
CountryLifeExpectancy = 77.8
CapitalName = Wellington
language = English

CountryName = United Kingdom
CountryLifeExpectancy = 77.7
CapitalName = London
language = English

CountryName = United States
CountryLifeExpectancy = 77.1
CapitalName = Washington
language = English

CountryName = Bermuda
CountryLifeExpectancy = 76.9
CapitalName = Hamilton
language = English

CountryName = Ireland
CountryLifeExpectancy = 76.8
CapitalName = Dublin
language = English

CountryName = Belize
CountryLifeExpectancy = 70.9
CapitalName = Belmopan
language = English

Updated: