From eb98a6cc7260b508fde31bfe89297879b3e1bf94 Mon Sep 17 00:00:00 2001 From: abhay Date: Sun, 10 May 2020 20:59:25 -0500 Subject: [PATCH] add streaming examples --- .../streaming-activities-checkpoint.ipynb | 343 ++++++++++++++++++ .../streaming-wordcount-checkpoint.ipynb | 132 +++++++ streaming/streaming-activities.ipynb | 343 ++++++++++++++++++ streaming/streaming-wordcount.ipynb | 132 +++++++ 4 files changed, 950 insertions(+) create mode 100644 streaming/.ipynb_checkpoints/streaming-activities-checkpoint.ipynb create mode 100644 streaming/.ipynb_checkpoints/streaming-wordcount-checkpoint.ipynb create mode 100644 streaming/streaming-activities.ipynb create mode 100644 streaming/streaming-wordcount.ipynb diff --git a/streaming/.ipynb_checkpoints/streaming-activities-checkpoint.ipynb b/streaming/.ipynb_checkpoints/streaming-activities-checkpoint.ipynb new file mode 100644 index 0000000..c5235a0 --- /dev/null +++ b/streaming/.ipynb_checkpoints/streaming-activities-checkpoint.ipynb @@ -0,0 +1,343 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "\n", + "
\n", + "

SparkSession - hive

\n", + " \n", + "
\n", + "

SparkContext

\n", + "\n", + "

Spark UI

\n", + "\n", + "
\n", + "
Version
\n", + "
v2.4.3
\n", + "
Master
\n", + "
local[*]
\n", + "
AppName
\n", + "
PySparkShell
\n", + "
\n", + "
\n", + " \n", + "
\n", + " " + ], + "text/plain": [ + "" + ] + }, + "execution_count": 1, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "spark" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "static = spark.read.json(\"../data/activity-data/\")\n", + "dataSchema = static.schema" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "root\n", + " |-- Arrival_Time: long (nullable = true)\n", + " |-- Creation_Time: long (nullable = true)\n", + " |-- Device: string (nullable = true)\n", + " |-- Index: long (nullable = true)\n", + " |-- Model: string (nullable = true)\n", + " |-- User: string (nullable = true)\n", + " |-- gt: string (nullable = true)\n", + " |-- x: double (nullable = true)\n", + " |-- y: double (nullable = true)\n", + " |-- z: double (nullable = true)\n", + "\n" + ] + } + ], + "source": [ + "static.printSchema()" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-------------+-------------------+--------+-----+------+----+-----+-------------+------------+------------+\n", + "| Arrival_Time| Creation_Time| Device|Index| Model|User| gt| x| y| z|\n", + "+-------------+-------------------+--------+-----+------+----+-----+-------------+------------+------------+\n", + "|1424686735090|1424686733090638193|nexus4_1| 18|nexus4| g|stand| 3.356934E-4|-5.645752E-4|-0.018814087|\n", + "|1424686735292|1424688581345918092|nexus4_2| 66|nexus4| g|stand| -0.005722046| 0.029083252| 0.005569458|\n", + "|1424686735500|1424686733498505625|nexus4_1| 99|nexus4| g|stand| 0.0078125|-0.017654419| 0.010025024|\n", + "|1424686735691|1424688581745026978|nexus4_2| 145|nexus4| g|stand| -3.814697E-4| 0.0184021|-0.013656616|\n", + "|1424686735890|1424688581945252808|nexus4_2| 185|nexus4| g|stand| -3.814697E-4|-0.031799316| -0.00831604|\n", + "|1424686736094|1424686734097840342|nexus4_1| 218|nexus4| g|stand| -7.324219E-4|-0.013381958| 0.01109314|\n", + "|1424686736294|1424688582347932252|nexus4_2| 265|nexus4| g|stand| -0.005722046| 0.015197754| 0.022659302|\n", + "|1424686736495|1424688582549592408|nexus4_2| 305|nexus4| g|stand| -3.814697E-4|0.0087890625|0.0034332275|\n", + "|1424686736697|1424688582750703248|nexus4_2| 345|nexus4| g|stand| 0.002822876|-0.008300781|-0.015792847|\n", + "|1424686736898|1424688582952241334|nexus4_2| 385|nexus4| g|stand| 6.866455E-4|-0.008300781| 0.004501343|\n", + "|1424686737100|1424686735109928643|nexus4_1| 418|nexus4| g|stand| 0.003540039|-0.010177612|-0.026290894|\n", + "|1424686737300|1424688583355164918|nexus4_2| 465|nexus4| g|stand| 0.002822876|0.0045166016|-0.014724731|\n", + "|1424686737505|1424686735512935017|nexus4_1| 498|nexus4| g|stand| 0.0024719238|-0.010177612|-0.017745972|\n", + "|1424686737707|1424686735709254597|nexus4_1| 537|nexus4| g|stand|-0.0028686523|-0.003768921| 0.020706177|\n", + "|1424686737908|1424686735915675495|nexus4_1| 578|nexus4| g|stand|-0.0028686523| 0.026138306| 0.007888794|\n", + "|1424686738109|1424688584160372793|nexus4_2| 625|nexus4| g|stand| -3.814697E-4| 2.441406E-4| 0.033340454|\n", + "|1424686738326|1424688584381747305|nexus4_2| 661|nexus4| g|stand| 0.0017547607| 0.019470215|-0.011520386|\n", + "|1424686738529|1424686736534938191|nexus4_1| 701|nexus4| g|stand| 0.0024719238|-0.033676147|0.0068206787|\n", + "|1424686738744|1424688584799723655|nexus4_2| 744|nexus4| g|stand| -3.814697E-4|-0.002960205|-0.027542114|\n", + "|1424686738935|1424686736943477009|nexus4_1| 782|nexus4| g|stand| -0.009277344|-0.009109497| -0.0690155|\n", + "+-------------+-------------------+--------+-----+------+----+-----+-------------+------------+------------+\n", + "only showing top 20 rows\n", + "\n" + ] + } + ], + "source": [ + "static.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Read one file at a time.. " + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "streaming = spark.readStream.schema(dataSchema)\\\n", + " .option(\"maxFilesPerTrigger\", 1)\\\n", + " .json(\"../data/activity-data\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Agrregation on streaming dataframe" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "activityCounts = streaming.groupBy(\"gt\").count()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Start stremaing query. Write output to memory. OuputMode : `append`, `complete`, `update`" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [], + "source": [ + "activityQuery = activityCounts.writeStream.queryName(\"activity_counts\")\\\n", + " .format(\"memory\")\\\n", + " .outputMode(\"complete\")\\\n", + " .start()" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[]" + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "spark.streams.active" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Show output from in memory table" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+----------+------+\n", + "| gt| count|\n", + "+----------+------+\n", + "| stairsup|104570|\n", + "| sit|123093|\n", + "| stand|113850|\n", + "| walk|132549|\n", + "| bike|107962|\n", + "|stairsdown| 93637|\n", + "| null|104463|\n", + "+----------+------+\n", + "\n", + "+----------+------+\n", + "| gt| count|\n", + "+----------+------+\n", + "| stairsup|104570|\n", + "| sit|123093|\n", + "| stand|113850|\n", + "| walk|132549|\n", + "| bike|107962|\n", + "|stairsdown| 93637|\n", + "| null|104463|\n", + "+----------+------+\n", + "\n", + "+----------+------+\n", + "| gt| count|\n", + "+----------+------+\n", + "| stairsup|115029|\n", + "| sit|135401|\n", + "| stand|125235|\n", + "| walk|145805|\n", + "| bike|118759|\n", + "|stairsdown|102996|\n", + "| null|114910|\n", + "+----------+------+\n", + "\n", + "+----------+------+\n", + "| gt| count|\n", + "+----------+------+\n", + "| stairsup|125490|\n", + "| sit|147709|\n", + "| stand|136619|\n", + "| walk|159061|\n", + "| bike|129556|\n", + "|stairsdown|112356|\n", + "| null|125355|\n", + "+----------+------+\n", + "\n", + "+----------+------+\n", + "| gt| count|\n", + "+----------+------+\n", + "| stairsup|135951|\n", + "| sit|160017|\n", + "| stand|148003|\n", + "| walk|172317|\n", + "| bike|140353|\n", + "|stairsdown|121717|\n", + "| null|135801|\n", + "+----------+------+\n", + "\n" + ] + } + ], + "source": [ + "from time import sleep\n", + "for x in range(5):\n", + " spark.sql(\"SELECT * FROM activity_counts\").show()\n", + " sleep(1)" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [], + "source": [ + "activityQuery.stop()" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[]" + ] + }, + "execution_count": 12, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "spark.streams.active" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.3" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/streaming/.ipynb_checkpoints/streaming-wordcount-checkpoint.ipynb b/streaming/.ipynb_checkpoints/streaming-wordcount-checkpoint.ipynb new file mode 100644 index 0000000..ec6461c --- /dev/null +++ b/streaming/.ipynb_checkpoints/streaming-wordcount-checkpoint.ipynb @@ -0,0 +1,132 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.sql import SparkSession\n", + "from pyspark.sql.functions import explode\n", + "from pyspark.sql.functions import split" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "spark = SparkSession \\\n", + " .builder \\\n", + " .appName(\"StructuredNetworkWordCount\") \\\n", + " .getOrCreate()" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "# Create DataFrame representing the stream of input lines from connection to localhost:9999\n", + "lines = spark \\\n", + " .readStream \\\n", + " .format(\"socket\") \\\n", + " .option(\"host\", \"localhost\") \\\n", + " .option(\"port\", 9999) \\\n", + " .load()" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "# Split the lines into words\n", + "words = lines.select(\n", + " explode(\n", + " split(lines.value, \" \")\n", + " ).alias(\"word\")\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "# Generate running word count\n", + "wordCounts = words.groupBy(\"word\").count()" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "# Start running the query that prints the running counts to the console\n", + "query = wordCounts \\\n", + " .writeStream \\\n", + " .outputMode(\"complete\") \\\n", + " .format(\"console\") \\\n", + " .start()\n", + "\n", + "#query.awaitTermination()" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [], + "source": [ + "#nc -lk 9999 " + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "#Submit Job\n", + "#unset PYSPARK_DRIVER_PYTHON\n", + "#unset PYSPARK_DRIVER_PYTHON_OPTS\n", + "#\n", + "#$SPARK_HOME/bin/spark-submit $SPARK_HOME/examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.3" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/streaming/streaming-activities.ipynb b/streaming/streaming-activities.ipynb new file mode 100644 index 0000000..d154885 --- /dev/null +++ b/streaming/streaming-activities.ipynb @@ -0,0 +1,343 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "\n", + "
\n", + "

SparkSession - hive

\n", + " \n", + "
\n", + "

SparkContext

\n", + "\n", + "

Spark UI

\n", + "\n", + "
\n", + "
Version
\n", + "
v2.4.3
\n", + "
Master
\n", + "
local[*]
\n", + "
AppName
\n", + "
PySparkShell
\n", + "
\n", + "
\n", + " \n", + "
\n", + " " + ], + "text/plain": [ + "" + ] + }, + "execution_count": 1, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "spark" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "static = spark.read.json(\"../data/activity-data/\")\n", + "dataSchema = static.schema" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "root\n", + " |-- Arrival_Time: long (nullable = true)\n", + " |-- Creation_Time: long (nullable = true)\n", + " |-- Device: string (nullable = true)\n", + " |-- Index: long (nullable = true)\n", + " |-- Model: string (nullable = true)\n", + " |-- User: string (nullable = true)\n", + " |-- gt: string (nullable = true)\n", + " |-- x: double (nullable = true)\n", + " |-- y: double (nullable = true)\n", + " |-- z: double (nullable = true)\n", + "\n" + ] + } + ], + "source": [ + "static.printSchema()" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-------------+-------------------+--------+-----+------+----+-----+-------------+------------+------------+\n", + "| Arrival_Time| Creation_Time| Device|Index| Model|User| gt| x| y| z|\n", + "+-------------+-------------------+--------+-----+------+----+-----+-------------+------------+------------+\n", + "|1424686735090|1424686733090638193|nexus4_1| 18|nexus4| g|stand| 3.356934E-4|-5.645752E-4|-0.018814087|\n", + "|1424686735292|1424688581345918092|nexus4_2| 66|nexus4| g|stand| -0.005722046| 0.029083252| 0.005569458|\n", + "|1424686735500|1424686733498505625|nexus4_1| 99|nexus4| g|stand| 0.0078125|-0.017654419| 0.010025024|\n", + "|1424686735691|1424688581745026978|nexus4_2| 145|nexus4| g|stand| -3.814697E-4| 0.0184021|-0.013656616|\n", + "|1424686735890|1424688581945252808|nexus4_2| 185|nexus4| g|stand| -3.814697E-4|-0.031799316| -0.00831604|\n", + "|1424686736094|1424686734097840342|nexus4_1| 218|nexus4| g|stand| -7.324219E-4|-0.013381958| 0.01109314|\n", + "|1424686736294|1424688582347932252|nexus4_2| 265|nexus4| g|stand| -0.005722046| 0.015197754| 0.022659302|\n", + "|1424686736495|1424688582549592408|nexus4_2| 305|nexus4| g|stand| -3.814697E-4|0.0087890625|0.0034332275|\n", + "|1424686736697|1424688582750703248|nexus4_2| 345|nexus4| g|stand| 0.002822876|-0.008300781|-0.015792847|\n", + "|1424686736898|1424688582952241334|nexus4_2| 385|nexus4| g|stand| 6.866455E-4|-0.008300781| 0.004501343|\n", + "|1424686737100|1424686735109928643|nexus4_1| 418|nexus4| g|stand| 0.003540039|-0.010177612|-0.026290894|\n", + "|1424686737300|1424688583355164918|nexus4_2| 465|nexus4| g|stand| 0.002822876|0.0045166016|-0.014724731|\n", + "|1424686737505|1424686735512935017|nexus4_1| 498|nexus4| g|stand| 0.0024719238|-0.010177612|-0.017745972|\n", + "|1424686737707|1424686735709254597|nexus4_1| 537|nexus4| g|stand|-0.0028686523|-0.003768921| 0.020706177|\n", + "|1424686737908|1424686735915675495|nexus4_1| 578|nexus4| g|stand|-0.0028686523| 0.026138306| 0.007888794|\n", + "|1424686738109|1424688584160372793|nexus4_2| 625|nexus4| g|stand| -3.814697E-4| 2.441406E-4| 0.033340454|\n", + "|1424686738326|1424688584381747305|nexus4_2| 661|nexus4| g|stand| 0.0017547607| 0.019470215|-0.011520386|\n", + "|1424686738529|1424686736534938191|nexus4_1| 701|nexus4| g|stand| 0.0024719238|-0.033676147|0.0068206787|\n", + "|1424686738744|1424688584799723655|nexus4_2| 744|nexus4| g|stand| -3.814697E-4|-0.002960205|-0.027542114|\n", + "|1424686738935|1424686736943477009|nexus4_1| 782|nexus4| g|stand| -0.009277344|-0.009109497| -0.0690155|\n", + "+-------------+-------------------+--------+-----+------+----+-----+-------------+------------+------------+\n", + "only showing top 20 rows\n", + "\n" + ] + } + ], + "source": [ + "static.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Read one file at a time.. " + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [], + "source": [ + "streaming = spark.readStream.schema(dataSchema)\\\n", + " .option(\"maxFilesPerTrigger\", 1)\\\n", + " .json(\"../data/activity-data\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Agrregation on streaming dataframe" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [], + "source": [ + "activityCounts = streaming.groupBy(\"gt\").count()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Start stremaing query. Write output to memory. OuputMode : `append`, `complete`, `update`" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "metadata": {}, + "outputs": [], + "source": [ + "activityQuery = activityCounts.writeStream.queryName(\"activity_counts\")\\\n", + " .format(\"memory\")\\\n", + " .outputMode(\"complete\")\\\n", + " .start()" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[]" + ] + }, + "execution_count": 18, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "spark.streams.active" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Show output from in memory table" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+----------+-----+\n", + "| gt|count|\n", + "+----------+-----+\n", + "| stairsup|62741|\n", + "| sit|73860|\n", + "| stand|68311|\n", + "| walk|79526|\n", + "| bike|64775|\n", + "|stairsdown|56183|\n", + "| null|62678|\n", + "+----------+-----+\n", + "\n", + "+----------+-----+\n", + "| gt|count|\n", + "+----------+-----+\n", + "| stairsup|73196|\n", + "| sit|86169|\n", + "| stand|79694|\n", + "| walk|92781|\n", + "| bike|75572|\n", + "|stairsdown|65546|\n", + "| null|73127|\n", + "+----------+-----+\n", + "\n", + "+----------+------+\n", + "| gt| count|\n", + "+----------+------+\n", + "| stairsup| 83653|\n", + "| sit| 98477|\n", + "| stand| 91079|\n", + "| walk|106036|\n", + "| bike| 86367|\n", + "|stairsdown| 74914|\n", + "| null| 83573|\n", + "+----------+------+\n", + "\n", + "+----------+------+\n", + "| gt| count|\n", + "+----------+------+\n", + "| stairsup| 94108|\n", + "| sit|110785|\n", + "| stand|102466|\n", + "| walk|119292|\n", + "| bike| 97164|\n", + "|stairsdown| 84277|\n", + "| null| 94019|\n", + "+----------+------+\n", + "\n", + "+----------+------+\n", + "| gt| count|\n", + "+----------+------+\n", + "| stairsup|104570|\n", + "| sit|123093|\n", + "| stand|113850|\n", + "| walk|132549|\n", + "| bike|107962|\n", + "|stairsdown| 93637|\n", + "| null|104463|\n", + "+----------+------+\n", + "\n" + ] + } + ], + "source": [ + "from time import sleep\n", + "for x in range(5):\n", + " spark.sql(\"SELECT * FROM activity_counts\").show()\n", + " sleep(1)" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "metadata": {}, + "outputs": [], + "source": [ + "activityQuery.stop()" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[]" + ] + }, + "execution_count": 21, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "spark.streams.active" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.3" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/streaming/streaming-wordcount.ipynb b/streaming/streaming-wordcount.ipynb new file mode 100644 index 0000000..ec6461c --- /dev/null +++ b/streaming/streaming-wordcount.ipynb @@ -0,0 +1,132 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.sql import SparkSession\n", + "from pyspark.sql.functions import explode\n", + "from pyspark.sql.functions import split" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "spark = SparkSession \\\n", + " .builder \\\n", + " .appName(\"StructuredNetworkWordCount\") \\\n", + " .getOrCreate()" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "# Create DataFrame representing the stream of input lines from connection to localhost:9999\n", + "lines = spark \\\n", + " .readStream \\\n", + " .format(\"socket\") \\\n", + " .option(\"host\", \"localhost\") \\\n", + " .option(\"port\", 9999) \\\n", + " .load()" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "# Split the lines into words\n", + "words = lines.select(\n", + " explode(\n", + " split(lines.value, \" \")\n", + " ).alias(\"word\")\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "# Generate running word count\n", + "wordCounts = words.groupBy(\"word\").count()" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "# Start running the query that prints the running counts to the console\n", + "query = wordCounts \\\n", + " .writeStream \\\n", + " .outputMode(\"complete\") \\\n", + " .format(\"console\") \\\n", + " .start()\n", + "\n", + "#query.awaitTermination()" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [], + "source": [ + "#nc -lk 9999 " + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "#Submit Job\n", + "#unset PYSPARK_DRIVER_PYTHON\n", + "#unset PYSPARK_DRIVER_PYTHON_OPTS\n", + "#\n", + "#$SPARK_HOME/bin/spark-submit $SPARK_HOME/examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.3" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}