Skip to content

Commit

Permalink
add streaming examples
Browse files Browse the repository at this point in the history
  • Loading branch information
abhayibm committed May 11, 2020
1 parent ed5a4df commit eb98a6c
Show file tree
Hide file tree
Showing 4 changed files with 950 additions and 0 deletions.
343 changes: 343 additions & 0 deletions streaming/.ipynb_checkpoints/streaming-activities-checkpoint.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,343 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
" <div>\n",
" <p><b>SparkSession - hive</b></p>\n",
" \n",
" <div>\n",
" <p><b>SparkContext</b></p>\n",
"\n",
" <p><a href=\"http://abhays-mbp-2:4040\">Spark UI</a></p>\n",
"\n",
" <dl>\n",
" <dt>Version</dt>\n",
" <dd><code>v2.4.3</code></dd>\n",
" <dt>Master</dt>\n",
" <dd><code>local[*]</code></dd>\n",
" <dt>AppName</dt>\n",
" <dd><code>PySparkShell</code></dd>\n",
" </dl>\n",
" </div>\n",
" \n",
" </div>\n",
" "
],
"text/plain": [
"<pyspark.sql.session.SparkSession at 0x11d8e2f60>"
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"spark"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"static = spark.read.json(\"../data/activity-data/\")\n",
"dataSchema = static.schema"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- Arrival_Time: long (nullable = true)\n",
" |-- Creation_Time: long (nullable = true)\n",
" |-- Device: string (nullable = true)\n",
" |-- Index: long (nullable = true)\n",
" |-- Model: string (nullable = true)\n",
" |-- User: string (nullable = true)\n",
" |-- gt: string (nullable = true)\n",
" |-- x: double (nullable = true)\n",
" |-- y: double (nullable = true)\n",
" |-- z: double (nullable = true)\n",
"\n"
]
}
],
"source": [
"static.printSchema()"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-------------+-------------------+--------+-----+------+----+-----+-------------+------------+------------+\n",
"| Arrival_Time| Creation_Time| Device|Index| Model|User| gt| x| y| z|\n",
"+-------------+-------------------+--------+-----+------+----+-----+-------------+------------+------------+\n",
"|1424686735090|1424686733090638193|nexus4_1| 18|nexus4| g|stand| 3.356934E-4|-5.645752E-4|-0.018814087|\n",
"|1424686735292|1424688581345918092|nexus4_2| 66|nexus4| g|stand| -0.005722046| 0.029083252| 0.005569458|\n",
"|1424686735500|1424686733498505625|nexus4_1| 99|nexus4| g|stand| 0.0078125|-0.017654419| 0.010025024|\n",
"|1424686735691|1424688581745026978|nexus4_2| 145|nexus4| g|stand| -3.814697E-4| 0.0184021|-0.013656616|\n",
"|1424686735890|1424688581945252808|nexus4_2| 185|nexus4| g|stand| -3.814697E-4|-0.031799316| -0.00831604|\n",
"|1424686736094|1424686734097840342|nexus4_1| 218|nexus4| g|stand| -7.324219E-4|-0.013381958| 0.01109314|\n",
"|1424686736294|1424688582347932252|nexus4_2| 265|nexus4| g|stand| -0.005722046| 0.015197754| 0.022659302|\n",
"|1424686736495|1424688582549592408|nexus4_2| 305|nexus4| g|stand| -3.814697E-4|0.0087890625|0.0034332275|\n",
"|1424686736697|1424688582750703248|nexus4_2| 345|nexus4| g|stand| 0.002822876|-0.008300781|-0.015792847|\n",
"|1424686736898|1424688582952241334|nexus4_2| 385|nexus4| g|stand| 6.866455E-4|-0.008300781| 0.004501343|\n",
"|1424686737100|1424686735109928643|nexus4_1| 418|nexus4| g|stand| 0.003540039|-0.010177612|-0.026290894|\n",
"|1424686737300|1424688583355164918|nexus4_2| 465|nexus4| g|stand| 0.002822876|0.0045166016|-0.014724731|\n",
"|1424686737505|1424686735512935017|nexus4_1| 498|nexus4| g|stand| 0.0024719238|-0.010177612|-0.017745972|\n",
"|1424686737707|1424686735709254597|nexus4_1| 537|nexus4| g|stand|-0.0028686523|-0.003768921| 0.020706177|\n",
"|1424686737908|1424686735915675495|nexus4_1| 578|nexus4| g|stand|-0.0028686523| 0.026138306| 0.007888794|\n",
"|1424686738109|1424688584160372793|nexus4_2| 625|nexus4| g|stand| -3.814697E-4| 2.441406E-4| 0.033340454|\n",
"|1424686738326|1424688584381747305|nexus4_2| 661|nexus4| g|stand| 0.0017547607| 0.019470215|-0.011520386|\n",
"|1424686738529|1424686736534938191|nexus4_1| 701|nexus4| g|stand| 0.0024719238|-0.033676147|0.0068206787|\n",
"|1424686738744|1424688584799723655|nexus4_2| 744|nexus4| g|stand| -3.814697E-4|-0.002960205|-0.027542114|\n",
"|1424686738935|1424686736943477009|nexus4_1| 782|nexus4| g|stand| -0.009277344|-0.009109497| -0.0690155|\n",
"+-------------+-------------------+--------+-----+------+----+-----+-------------+------------+------------+\n",
"only showing top 20 rows\n",
"\n"
]
}
],
"source": [
"static.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Read one file at a time.. "
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"streaming = spark.readStream.schema(dataSchema)\\\n",
" .option(\"maxFilesPerTrigger\", 1)\\\n",
" .json(\"../data/activity-data\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Agrregation on streaming dataframe"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"activityCounts = streaming.groupBy(\"gt\").count()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Start stremaing query. Write output to memory. OuputMode : `append`, `complete`, `update`"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"activityQuery = activityCounts.writeStream.queryName(\"activity_counts\")\\\n",
" .format(\"memory\")\\\n",
" .outputMode(\"complete\")\\\n",
" .start()"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[<pyspark.sql.streaming.StreamingQuery at 0x11d8ec6a0>]"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"spark.streams.active"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Show output from in memory table"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------+------+\n",
"| gt| count|\n",
"+----------+------+\n",
"| stairsup|104570|\n",
"| sit|123093|\n",
"| stand|113850|\n",
"| walk|132549|\n",
"| bike|107962|\n",
"|stairsdown| 93637|\n",
"| null|104463|\n",
"+----------+------+\n",
"\n",
"+----------+------+\n",
"| gt| count|\n",
"+----------+------+\n",
"| stairsup|104570|\n",
"| sit|123093|\n",
"| stand|113850|\n",
"| walk|132549|\n",
"| bike|107962|\n",
"|stairsdown| 93637|\n",
"| null|104463|\n",
"+----------+------+\n",
"\n",
"+----------+------+\n",
"| gt| count|\n",
"+----------+------+\n",
"| stairsup|115029|\n",
"| sit|135401|\n",
"| stand|125235|\n",
"| walk|145805|\n",
"| bike|118759|\n",
"|stairsdown|102996|\n",
"| null|114910|\n",
"+----------+------+\n",
"\n",
"+----------+------+\n",
"| gt| count|\n",
"+----------+------+\n",
"| stairsup|125490|\n",
"| sit|147709|\n",
"| stand|136619|\n",
"| walk|159061|\n",
"| bike|129556|\n",
"|stairsdown|112356|\n",
"| null|125355|\n",
"+----------+------+\n",
"\n",
"+----------+------+\n",
"| gt| count|\n",
"+----------+------+\n",
"| stairsup|135951|\n",
"| sit|160017|\n",
"| stand|148003|\n",
"| walk|172317|\n",
"| bike|140353|\n",
"|stairsdown|121717|\n",
"| null|135801|\n",
"+----------+------+\n",
"\n"
]
}
],
"source": [
"from time import sleep\n",
"for x in range(5):\n",
" spark.sql(\"SELECT * FROM activity_counts\").show()\n",
" sleep(1)"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"activityQuery.stop()"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[]"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"spark.streams.active"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.3"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Loading

0 comments on commit eb98a6c

Please sign in to comment.