Skip to content

Commit

Permalink
Added SQLite db read and JOIN
Browse files Browse the repository at this point in the history
  • Loading branch information
tirthajyoti committed Jul 13, 2019
1 parent 7ec4a63 commit b338bb8
Show file tree
Hide file tree
Showing 4 changed files with 305 additions and 9 deletions.
Binary file added Data/chinook.db
Binary file not shown.
Binary file added Data/chinook.zip
Binary file not shown.
Binary file added Data/sqlite_latest.jar
Binary file not shown.
314 changes: 305 additions & 9 deletions Dataframe_SQL_query.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
},
{
"cell_type": "code",
"execution_count": 11,
"execution_count": 4,
"metadata": {},
"outputs": [
{
Expand Down Expand Up @@ -160,7 +160,7 @@
},
{
"cell_type": "code",
"execution_count": 8,
"execution_count": 7,
"metadata": {},
"outputs": [
{
Expand All @@ -169,7 +169,7 @@
"['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']"
]
},
"execution_count": 8,
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
Expand All @@ -180,7 +180,7 @@
},
{
"cell_type": "code",
"execution_count": 9,
"execution_count": 8,
"metadata": {},
"outputs": [
{
Expand Down Expand Up @@ -214,7 +214,7 @@
},
{
"cell_type": "code",
"execution_count": 22,
"execution_count": 9,
"metadata": {},
"outputs": [
{
Expand Down Expand Up @@ -243,7 +243,7 @@
},
{
"cell_type": "code",
"execution_count": 36,
"execution_count": 10,
"metadata": {},
"outputs": [
{
Expand All @@ -267,12 +267,13 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"### Read a file (and create dataframe) by directly running a `spark.sql` method on the file"
"### Read a file (and create dataframe) by directly running a `spark.sql` method on the file\n",
"Notice the syntax of `csv.<path->filename.csv>` inside the SQL query"
]
},
{
"cell_type": "code",
"execution_count": 34,
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -281,7 +282,7 @@
},
{
"cell_type": "code",
"execution_count": 35,
"execution_count": 12,
"metadata": {},
"outputs": [
{
Expand Down Expand Up @@ -312,6 +313,301 @@
"source": [
"df_sales.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Read tables from a local SQLite database file using `JDBC` connection"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### First `cd` to the directory where all the PySpark jar files are kept. Then download the SQLite jar file from the [given URL](https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc)"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Warning: Binary output can mess up your terminal. Use \"--output -\" to tell \n",
"Warning: curl to output it to your terminal anyway, or consider \"--output \n",
"Warning: <FILE>\" to save to a file.\n"
]
}
],
"source": [
"!cd /home/tirtha/Spark/spark-2.3.1-bin-hadoop2.7/jars/\n",
"!curl https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.28.0/sqlite-jdbc-3.28.0.jar"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Define driver, path to local .db file, and append that path to `jdbc:sqlite` to construct the `url`"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [],
"source": [
"driver = \"org.sqlite.JDBC\"\n",
"path = \"Data/chinook.db\"\n",
"url = \"jdbc:sqlite:\" + path"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Define `tablename` to be read"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [],
"source": [
"tablename = \"albums\""
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [],
"source": [
"df_albums = spark1.read.format(\"jdbc\").option(\"url\", url).option(\"dbtable\", tablename).option(\"driver\", driver).load()"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-------+--------------------+--------+\n",
"|AlbumId| Title|ArtistId|\n",
"+-------+--------------------+--------+\n",
"| 1|For Those About T...| 1|\n",
"| 2| Balls to the Wall| 2|\n",
"| 3| Restless and Wild| 2|\n",
"| 4| Let There Be Rock| 1|\n",
"| 5| Big Ones| 3|\n",
"| 6| Jagged Little Pill| 4|\n",
"| 7| Facelift| 5|\n",
"| 8| Warner 25 Anos| 6|\n",
"| 9|Plays Metallica B...| 7|\n",
"| 10| Audioslave| 8|\n",
"| 11| Out Of Exile| 8|\n",
"| 12| BackBeat Soundtrack| 9|\n",
"| 13|The Best Of Billy...| 10|\n",
"| 14|Alcohol Fueled Br...| 11|\n",
"| 15|Alcohol Fueled Br...| 11|\n",
"| 16| Black Sabbath| 12|\n",
"| 17|Black Sabbath Vol...| 12|\n",
"| 18| Body Count| 13|\n",
"| 19| Chemical Wedding| 14|\n",
"| 20|The Best Of Buddy...| 15|\n",
"+-------+--------------------+--------+\n",
"only showing top 20 rows\n",
"\n"
]
}
],
"source": [
"df_albums.show()"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- AlbumId: integer (nullable = true)\n",
" |-- Title: string (nullable = true)\n",
" |-- ArtistId: integer (nullable = true)\n",
"\n"
]
}
],
"source": [
"df_albums.printSchema()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Don't forget to create temp view to use later"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [],
"source": [
"df_albums.createTempView('albums')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Read another table - `artists`"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [],
"source": [
"tablename = \"artists\""
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [],
"source": [
"df_artists = spark1.read.format(\"jdbc\").option(\"url\", url).option(\"dbtable\", tablename).option(\"driver\", driver).load()"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------+--------------------+\n",
"|ArtistId| Name|\n",
"+--------+--------------------+\n",
"| 1| AC/DC|\n",
"| 2| Accept|\n",
"| 3| Aerosmith|\n",
"| 4| Alanis Morissette|\n",
"| 5| Alice In Chains|\n",
"| 6|Antônio Carlos Jobim|\n",
"| 7| Apocalyptica|\n",
"| 8| Audioslave|\n",
"| 9| BackBeat|\n",
"| 10| Billy Cobham|\n",
"| 11| Black Label Society|\n",
"| 12| Black Sabbath|\n",
"| 13| Body Count|\n",
"| 14| Bruce Dickinson|\n",
"| 15| Buddy Guy|\n",
"| 16| Caetano Veloso|\n",
"| 17| Chico Buarque|\n",
"| 18|Chico Science & N...|\n",
"| 19| Cidade Negra|\n",
"| 20| Cláudio Zoli|\n",
"+--------+--------------------+\n",
"only showing top 20 rows\n",
"\n"
]
}
],
"source": [
"df_artists.show()"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [],
"source": [
"df_artists.createTempView('artists')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Test if SQL query is working fine"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------+---------+\n",
"|ArtistId| Name|\n",
"+--------+---------+\n",
"| 1| AC/DC|\n",
"| 2| Accept|\n",
"| 3|Aerosmith|\n",
"| 9| BackBeat|\n",
"| 15|Buddy Guy|\n",
"| 26| Azymuth|\n",
"| 36| O Rappa|\n",
"| 37| Ed Motta|\n",
"| 46|Jorge Ben|\n",
"| 50|Metallica|\n",
"+--------+---------+\n",
"\n"
]
}
],
"source": [
"spark1.sql(\"SELECT * FROM artists WHERE length(Name)<10 LIMIT 10\").show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Join the `albums` and `artists` tables in a single dataframe using SQL query (and order by `ArtistId`)"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [],
"source": [
"df_combined = spark1.sql(\"SELECT * FROM artists LEFT JOIN albums ON artists.ArtistId=albums.ArtistId order by artists.ArtistId\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df_combined.show()"
]
}
],
"metadata": {
Expand Down

0 comments on commit b338bb8

Please sign in to comment.