From 76882467621e5151205e0394d6da134ac684ebeb Mon Sep 17 00:00:00 2001 From: abhay Date: Sun, 10 May 2020 20:57:50 -0500 Subject: [PATCH] dataframe examples --- dataframes/joins.ipynb | 156 +++++++ dataframes/spark-ui.ipynb | 104 +++++ dataframes/top-queries.ipynb | 584 ++++++++++++++++++++++++++ dataframes/udf.ipynb | 715 ++++++++++++++++++++++++++++++++ dataframes/vectorized_udf.ipynb | 354 ++++++++++++++++ dataframes/window.ipynb | 347 ++++++++++++++++ 6 files changed, 2260 insertions(+) create mode 100644 dataframes/joins.ipynb create mode 100644 dataframes/spark-ui.ipynb create mode 100644 dataframes/top-queries.ipynb create mode 100644 dataframes/udf.ipynb create mode 100644 dataframes/vectorized_udf.ipynb create mode 100644 dataframes/window.ipynb diff --git a/dataframes/joins.ipynb b/dataframes/joins.ipynb new file mode 100644 index 0000000..1f1689c --- /dev/null +++ b/dataframes/joins.ipynb @@ -0,0 +1,156 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "# in Python\n", + "person = spark.createDataFrame([\n", + " (0, \"Bill Chambers\", 0, [100]),\n", + " (1, \"Matei Zaharia\", 1, [500, 250, 100]),\n", + " (2, \"Michael Armbrust\", 1, [250, 100])])\\\n", + " .toDF(\"id\", \"name\", \"graduate_program\", \"spark_status\")\n", + "graduateProgram = spark.createDataFrame([\n", + " (0, \"Masters\", \"School of Information\", \"UC Berkeley\"),\n", + " (2, \"Masters\", \"EECS\", \"UC Berkeley\"),\n", + " (1, \"Ph.D.\", \"EECS\", \"UC Berkeley\")])\\\n", + " .toDF(\"id\", \"degree\", \"department\", \"school\")\n", + "sparkStatus = spark.createDataFrame([\n", + " (500, \"Vice President\"),\n", + " (250, \"PMC Member\"),\n", + " (100, \"Contributor\")])\\\n", + " .toDF(\"id\", \"status\")" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+----------------+----------------+---------------+\n", + "| id| name|graduate_program| spark_status|\n", + "+---+----------------+----------------+---------------+\n", + "| 0| Bill Chambers| 0| [100]|\n", + "| 1| Matei Zaharia| 1|[500, 250, 100]|\n", + "| 2|Michael Armbrust| 1| [250, 100]|\n", + "+---+----------------+----------------+---------------+\n", + "\n" + ] + } + ], + "source": [ + "person.show()" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+-------+--------------------+-----------+\n", + "| id| degree| department| school|\n", + "+---+-------+--------------------+-----------+\n", + "| 0|Masters|School of Informa...|UC Berkeley|\n", + "| 2|Masters| EECS|UC Berkeley|\n", + "| 1| Ph.D.| EECS|UC Berkeley|\n", + "+---+-------+--------------------+-----------+\n", + "\n" + ] + } + ], + "source": [ + "graduateProgram.show()" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+--------------+\n", + "| id| status|\n", + "+---+--------------+\n", + "|500|Vice President|\n", + "|250| PMC Member|\n", + "|100| Contributor|\n", + "+---+--------------+\n", + "\n" + ] + } + ], + "source": [ + "sparkStatus.show()" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [], + "source": [ + "person.createOrReplaceTempView(\"person\")\n", + "graduateProgram.createOrReplaceTempView(\"graduateProgram\")\n", + "sparkStatus.createOrReplaceTempView(\"sparkStatus\")" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+----------------+----------------+---------------+---+-------+--------------------+-----------+\n", + "| id| name|graduate_program| spark_status| id| degree| department| school|\n", + "+---+----------------+----------------+---------------+---+-------+--------------------+-----------+\n", + "| 0| Bill Chambers| 0| [100]| 0|Masters|School of Informa...|UC Berkeley|\n", + "| 1| Matei Zaharia| 1|[500, 250, 100]| 1| Ph.D.| EECS|UC Berkeley|\n", + "| 2|Michael Armbrust| 1| [250, 100]| 1| Ph.D.| EECS|UC Berkeley|\n", + "+---+----------------+----------------+---------------+---+-------+--------------------+-----------+\n", + "\n" + ] + } + ], + "source": [ + "spark.sql(\"SELECT * FROM person JOIN graduateProgram ON person.graduate_program = graduateProgram.id\").show()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.3" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/dataframes/spark-ui.ipynb b/dataframes/spark-ui.ipynb new file mode 100644 index 0000000..1ba7ded --- /dev/null +++ b/dataframes/spark-ui.ipynb @@ -0,0 +1,104 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[Row(sum(id)=2500000000000)]" + ] + }, + "execution_count": 1, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# in Python\n", + "df1 = spark.range(2, 10000000, 2)\n", + "df2 = spark.range(2, 10000000, 4)\n", + "step1 = df1.repartition(5)\n", + "step12 = df2.repartition(6)\n", + "step2 = step1.selectExpr(\"id * 5 as id\")\n", + "step3 = step2.join(step12, [\"id\"])\n", + "step4 = step3.selectExpr(\"sum(id)\")\n", + "\n", + "step4.collect() # 2500000000000" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "\n", + "
\n", + "

SparkSession - hive

\n", + " \n", + "
\n", + "

SparkContext

\n", + "\n", + "

Spark UI

\n", + "\n", + "
\n", + "
Version
\n", + "
v2.4.3
\n", + "
Master
\n", + "
local[*]
\n", + "
AppName
\n", + "
PySparkShell
\n", + "
\n", + "
\n", + " \n", + "
\n", + " " + ], + "text/plain": [ + "" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "spark" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.3" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/dataframes/top-queries.ipynb b/dataframes/top-queries.ipynb new file mode 100644 index 0000000..9462709 --- /dev/null +++ b/dataframes/top-queries.ipynb @@ -0,0 +1,584 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.sql.functions import *" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "spark = SparkSession \\\n", + " .builder \\\n", + " .appName(\"TopQueries\") \\\n", + " .getOrCreate()" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "\n", + "
\n", + "

SparkSession - hive

\n", + " \n", + "
\n", + "

SparkContext

\n", + "\n", + "

Spark UI

\n", + "\n", + "
\n", + "
Version
\n", + "
v2.4.3
\n", + "
Master
\n", + "
local[*]
\n", + "
AppName
\n", + "
PySparkShell
\n", + "
\n", + "
\n", + " \n", + "
\n", + " " + ], + "text/plain": [ + "" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "spark" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "### Read data form local filesystem" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "INPUT_PATH = '../data/w3data/search-click-logs/2019*.avro' # Input path with REGEX to use the avro files\n", + "OUTPUT_PATH = '../data/w3data/search-click-logs/output' # Output folder name to store generated reports" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Read data from stores with given format\n", + "\n", + "#### Many Supported Data Sources \"CSV\", \"JSON\", \"parquet\", \"avro\", \"JDBC/ODBC\", \"Plain text\", \"HDFS\", \"Cassandra\", \"HBase\", \"MongoDb\" \n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "metrics_df = spark.read.format(\"avro\").load(INPUT_PATH)" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "root\n", + " |-- detectedDuplicate: boolean (nullable = true)\n", + " |-- detectedCorruption: boolean (nullable = true)\n", + " |-- firstInSession: boolean (nullable = true)\n", + " |-- timestamp: long (nullable = true)\n", + " |-- remoteHost: string (nullable = true)\n", + " |-- referer: string (nullable = true)\n", + " |-- location: string (nullable = true)\n", + " |-- viewportPixelWidth: integer (nullable = true)\n", + " |-- viewportPixelHeight: integer (nullable = true)\n", + " |-- screenPixelWidth: integer (nullable = true)\n", + " |-- screenPixelHeight: integer (nullable = true)\n", + " |-- partyId: string (nullable = true)\n", + " |-- sessionId: string (nullable = true)\n", + " |-- pageViewId: string (nullable = true)\n", + " |-- eventType: string (nullable = true)\n", + " |-- userAgentString: string (nullable = true)\n", + " |-- userAgentName: string (nullable = true)\n", + " |-- userAgentFamily: string (nullable = true)\n", + " |-- userAgentVendor: string (nullable = true)\n", + " |-- userAgentType: string (nullable = true)\n", + " |-- userAgentVersion: string (nullable = true)\n", + " |-- userAgentDeviceCategory: string (nullable = true)\n", + " |-- userAgentOsFamily: string (nullable = true)\n", + " |-- userAgentOsVersion: string (nullable = true)\n", + " |-- userAgentOsVendor: string (nullable = true)\n", + " |-- page: integer (nullable = true)\n", + " |-- title: string (nullable = true)\n", + " |-- rank: integer (nullable = true)\n", + " |-- url: string (nullable = true)\n", + " |-- resultType: string (nullable = true)\n", + " |-- ibmer: string (nullable = true)\n", + " |-- ibmEvType: string (nullable = true)\n", + " |-- ibmEvAction: string (nullable = true)\n", + " |-- ibmQuery: string (nullable = true)\n", + " |-- queryID: string (nullable = true)\n", + " |-- ibmSort: string (nullable = true)\n", + " |-- ibmCountries: array (nullable = true)\n", + " | |-- element: string (containsNull = true)\n", + " |-- ibmLanguages: array (nullable = true)\n", + " | |-- element: string (containsNull = true)\n", + " |-- ibmDate: string (nullable = true)\n", + " |-- categoryId: string (nullable = true)\n", + " |-- hkey: string (nullable = true)\n", + " |-- documentDate: string (nullable = true)\n", + " |-- dwellResultType: string (nullable = true)\n", + " |-- dwellTime: string (nullable = true)\n", + " |-- esqsOrSolr: string (nullable = true)\n", + " |-- qtime: string (nullable = true)\n", + " |-- organicResultsCount: integer (nullable = true)\n", + "\n" + ] + } + ], + "source": [ + "metrics_df.printSchema()" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+--------------+--------------------+--------------------+----------+\n", + "| ibmQuery| url| title| ibmEvType|\n", + "+--------------+--------------------+--------------------+----------+\n", + "| kyle freeman|https://w3.ibm.co...|FREEMAN, KYLE W (...|Dwell Time|\n", + "| what is BAIW|https://w3-connec...|Tech Support Appl...|Impression|\n", + "|IBM counseling|https://w3-connec...|LA Legal Counsels...|Impression|\n", + "| what is BAIW|https://w3-connec...|STATS (BAIW) Refe...|Impression|\n", + "|IBM counseling|https://d01db034....|MX-GS-0345 Rev. 1...|Impression|\n", + "| what is BAIW|https://w3-connec...|Creating user id ...|Impression|\n", + "| null| null| null| null|\n", + "| what is BAIW|https://w3.ibm.co...|CEDP BAIW *APPLIC...|Impression|\n", + "| John petri| null| null| Query|\n", + "| John petri|https://w3.ibm.co...|Petri, John E (John)|Impression|\n", + "| what is BAIW|https://w3.ibm.co...|CEDP BAIW ENTERCO...|Impression|\n", + "| recall email| null| null| Query|\n", + "| John petri|https://w3-connec...|Rochester Master ...|Impression|\n", + "| recall email|https://w3.ibm.co...|Recall an Email Y...|Impression|\n", + "| what is BAIW|https://w3.ibm.co...|baiw_build_read *...|Impression|\n", + "| John petri|https://w3-connec...|Watson Health Inn...|Impression|\n", + "| null| null| null| null|\n", + "| recall email|https://w3-connec...| Watson Marketing|Impression|\n", + "| recall email|https://w3.ibm.co...|Compare Email Cli...|Impression|\n", + "| John petri|https://w3-connec...| Instrumentation|Impression|\n", + "+--------------+--------------------+--------------------+----------+\n", + "only showing top 20 rows\n", + "\n" + ] + } + ], + "source": [ + "metrics_df.select(\"ibmQuery\", \"url\", \"title\", \"ibmEvType\").show()" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "### TOP Queries ###" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [], + "source": [ + "top_queries_df = metrics_df.select(\"ibmQuery\", \"ibmEvType\") \\\n", + " .where(col(\"ibmQuery\").isNotNull()) \\\n", + " .where (col(\"ibmEvType\") == \"Query\") \\\n", + " .groupBy(\"ibmQuery\") \\\n", + " .count() \\\n", + " .withColumnRenamed('count', 'total')\\\n", + " .orderBy(\"total\", ascending=False)" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+--------------------+-----+\n", + "| ibmQuery|total|\n", + "+--------------------+-----+\n", + "| workday| 11|\n", + "|Client executive ...| 11|\n", + "| checkpoint| 10|\n", + "| box| 9|\n", + "| buy@ibm| 8|\n", + "| service apartment| 7|\n", + "| itsc 300| 7|\n", + "| murali| 7|\n", + "|Trans Tree Corpor...| 6|\n", + "|tuition reimbursment| 6|\n", + "| bond| 5|\n", + "|red hat enterpris...| 5|\n", + "| pmr| 5|\n", + "| webex| 5|\n", + "|time off service ...| 5|\n", + "| field engineering| 5|\n", + "| bluepages| 5|\n", + "| retiree benefits| 5|\n", + "| issi| 5|\n", + "| Offering manager| 5|\n", + "+--------------------+-----+\n", + "only showing top 20 rows\n", + "\n" + ] + } + ], + "source": [ + "top_queries_df.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Spark SQL" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [], + "source": [ + "metrics_df.registerTempTable(\"metrics\")" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [], + "source": [ + "sql_df = spark.sql(\"SELECT ibmQuery, count(*) as total \"\n", + " \"FROM metrics \"\n", + " \"WHERE ibmQuery is NOT null AND ibmEvType == 'Query'\"\n", + " \"GROUP BY ibmQuery ORDER BY total desc \")" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+--------------------+-----+\n", + "| ibmQuery|total|\n", + "+--------------------+-----+\n", + "| workday| 11|\n", + "|Client executive ...| 11|\n", + "| checkpoint| 10|\n", + "| box| 9|\n", + "| buy@ibm| 8|\n", + "| murali| 7|\n", + "| service apartment| 7|\n", + "| itsc 300| 7|\n", + "|Trans Tree Corpor...| 6|\n", + "|tuition reimbursment| 6|\n", + "| bond| 5|\n", + "| webex| 5|\n", + "| career conversation| 5|\n", + "| Offering manager| 5|\n", + "| field engineering| 5|\n", + "|red hat enterpris...| 5|\n", + "| bluepages| 5|\n", + "| retiree benefits| 5|\n", + "|time off service ...| 5|\n", + "| issi| 5|\n", + "+--------------------+-----+\n", + "only showing top 20 rows\n", + "\n" + ] + } + ], + "source": [ + "sql_df.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Logical and Physical Plans" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "== Parsed Logical Plan ==\n", + "'Sort ['total DESC NULLS LAST], true\n", + "+- Project [ibmQuery#33, count#141L AS total#144L]\n", + " +- Aggregate [ibmQuery#33], [ibmQuery#33, count(1) AS count#141L]\n", + " +- Filter (ibmEvType#31 = Query)\n", + " +- Filter isnotnull(ibmQuery#33)\n", + " +- Project [ibmQuery#33, ibmEvType#31]\n", + " +- Relation[detectedDuplicate#0,detectedCorruption#1,firstInSession#2,timestamp#3L,remoteHost#4,referer#5,location#6,viewportPixelWidth#7,viewportPixelHeight#8,screenPixelWidth#9,screenPixelHeight#10,partyId#11,sessionId#12,pageViewId#13,eventType#14,userAgentString#15,userAgentName#16,userAgentFamily#17,userAgentVendor#18,userAgentType#19,userAgentVersion#20,userAgentDeviceCategory#21,userAgentOsFamily#22,userAgentOsVersion#23,... 23 more fields] avro\n", + "\n", + "== Analyzed Logical Plan ==\n", + "ibmQuery: string, total: bigint\n", + "Sort [total#144L DESC NULLS LAST], true\n", + "+- Project [ibmQuery#33, count#141L AS total#144L]\n", + " +- Aggregate [ibmQuery#33], [ibmQuery#33, count(1) AS count#141L]\n", + " +- Filter (ibmEvType#31 = Query)\n", + " +- Filter isnotnull(ibmQuery#33)\n", + " +- Project [ibmQuery#33, ibmEvType#31]\n", + " +- Relation[detectedDuplicate#0,detectedCorruption#1,firstInSession#2,timestamp#3L,remoteHost#4,referer#5,location#6,viewportPixelWidth#7,viewportPixelHeight#8,screenPixelWidth#9,screenPixelHeight#10,partyId#11,sessionId#12,pageViewId#13,eventType#14,userAgentString#15,userAgentName#16,userAgentFamily#17,userAgentVendor#18,userAgentType#19,userAgentVersion#20,userAgentDeviceCategory#21,userAgentOsFamily#22,userAgentOsVersion#23,... 23 more fields] avro\n", + "\n", + "== Optimized Logical Plan ==\n", + "Sort [total#144L DESC NULLS LAST], true\n", + "+- Aggregate [ibmQuery#33], [ibmQuery#33, count(1) AS total#144L]\n", + " +- Project [ibmQuery#33]\n", + " +- Filter ((isnotnull(ibmEvType#31) && isnotnull(ibmQuery#33)) && (ibmEvType#31 = Query))\n", + " +- Relation[detectedDuplicate#0,detectedCorruption#1,firstInSession#2,timestamp#3L,remoteHost#4,referer#5,location#6,viewportPixelWidth#7,viewportPixelHeight#8,screenPixelWidth#9,screenPixelHeight#10,partyId#11,sessionId#12,pageViewId#13,eventType#14,userAgentString#15,userAgentName#16,userAgentFamily#17,userAgentVendor#18,userAgentType#19,userAgentVersion#20,userAgentDeviceCategory#21,userAgentOsFamily#22,userAgentOsVersion#23,... 23 more fields] avro\n", + "\n", + "== Physical Plan ==\n", + "*(3) Sort [total#144L DESC NULLS LAST], true, 0\n", + "+- Exchange rangepartitioning(total#144L DESC NULLS LAST, 200)\n", + " +- *(2) HashAggregate(keys=[ibmQuery#33], functions=[count(1)], output=[ibmQuery#33, total#144L])\n", + " +- Exchange hashpartitioning(ibmQuery#33, 200)\n", + " +- *(1) HashAggregate(keys=[ibmQuery#33], functions=[partial_count(1)], output=[ibmQuery#33, count#154L])\n", + " +- *(1) Project [ibmQuery#33]\n", + " +- *(1) Filter ((isnotnull(ibmEvType#31) && isnotnull(ibmQuery#33)) && (ibmEvType#31 = Query))\n", + " +- *(1) FileScan avro [ibmEvType#31,ibmQuery#33] Batched: false, Format: org.apache.spark.sql.avro.AvroFileFormat@2ea71000, Location: InMemoryFileIndex[file:/Users/abhay/learning/spark-tutorial/data/w3data/search-click-logs/2019082..., PartitionFilters: [], PushedFilters: [IsNotNull(ibmEvType), IsNotNull(ibmQuery), EqualTo(ibmEvType,Query)], ReadSchema: struct\n" + ] + } + ], + "source": [ + "top_queries_df.explain(extended=True)" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "== Parsed Logical Plan ==\n", + "'Sort ['total DESC NULLS LAST], true\n", + "+- 'Aggregate ['ibmQuery], ['ibmQuery, 'count(1) AS total#137]\n", + " +- 'Filter (isnotnull('ibmQuery) && ('ibmEvType = Query))\n", + " +- 'UnresolvedRelation `metrics`\n", + "\n", + "== Analyzed Logical Plan ==\n", + "ibmQuery: string, total: bigint\n", + "Sort [total#137L DESC NULLS LAST], true\n", + "+- Aggregate [ibmQuery#33], [ibmQuery#33, count(1) AS total#137L]\n", + " +- Filter (isnotnull(ibmQuery#33) && (ibmEvType#31 = Query))\n", + " +- SubqueryAlias `metrics`\n", + " +- Relation[detectedDuplicate#0,detectedCorruption#1,firstInSession#2,timestamp#3L,remoteHost#4,referer#5,location#6,viewportPixelWidth#7,viewportPixelHeight#8,screenPixelWidth#9,screenPixelHeight#10,partyId#11,sessionId#12,pageViewId#13,eventType#14,userAgentString#15,userAgentName#16,userAgentFamily#17,userAgentVendor#18,userAgentType#19,userAgentVersion#20,userAgentDeviceCategory#21,userAgentOsFamily#22,userAgentOsVersion#23,... 23 more fields] avro\n", + "\n", + "== Optimized Logical Plan ==\n", + "Sort [total#137L DESC NULLS LAST], true\n", + "+- Aggregate [ibmQuery#33], [ibmQuery#33, count(1) AS total#137L]\n", + " +- Project [ibmQuery#33]\n", + " +- Filter ((isnotnull(ibmEvType#31) && isnotnull(ibmQuery#33)) && (ibmEvType#31 = Query))\n", + " +- Relation[detectedDuplicate#0,detectedCorruption#1,firstInSession#2,timestamp#3L,remoteHost#4,referer#5,location#6,viewportPixelWidth#7,viewportPixelHeight#8,screenPixelWidth#9,screenPixelHeight#10,partyId#11,sessionId#12,pageViewId#13,eventType#14,userAgentString#15,userAgentName#16,userAgentFamily#17,userAgentVendor#18,userAgentType#19,userAgentVersion#20,userAgentDeviceCategory#21,userAgentOsFamily#22,userAgentOsVersion#23,... 23 more fields] avro\n", + "\n", + "== Physical Plan ==\n", + "*(3) Sort [total#137L DESC NULLS LAST], true, 0\n", + "+- Exchange rangepartitioning(total#137L DESC NULLS LAST, 200)\n", + " +- *(2) HashAggregate(keys=[ibmQuery#33], functions=[count(1)], output=[ibmQuery#33, total#137L])\n", + " +- Exchange hashpartitioning(ibmQuery#33, 200)\n", + " +- *(1) HashAggregate(keys=[ibmQuery#33], functions=[partial_count(1)], output=[ibmQuery#33, count#149L])\n", + " +- *(1) Project [ibmQuery#33]\n", + " +- *(1) Filter ((isnotnull(ibmEvType#31) && isnotnull(ibmQuery#33)) && (ibmEvType#31 = Query))\n", + " +- *(1) FileScan avro [ibmEvType#31,ibmQuery#33] Batched: false, Format: org.apache.spark.sql.avro.AvroFileFormat@7a1b8084, Location: InMemoryFileIndex[file:/Users/abhay/learning/spark-tutorial/data/w3data/search-click-logs/2019082..., PartitionFilters: [], PushedFilters: [IsNotNull(ibmEvType), IsNotNull(ibmQuery), EqualTo(ibmEvType,Query)], ReadSchema: struct\n" + ] + } + ], + "source": [ + "sql_df.explain(extended=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Partitions, Repartitions, and coalesce" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "4" + ] + }, + "execution_count": 17, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "metrics_df.rdd.getNumPartitions()" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "7" + ] + }, + "execution_count": 18, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "top_queries_df.rdd.getNumPartitions()" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "50" + ] + }, + "execution_count": 19, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "top_queries_df.repartition(50).rdd.getNumPartitions()" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "2" + ] + }, + "execution_count": 20, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "top_queries_df.coalesce(2).rdd.getNumPartitions()" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "metadata": {}, + "outputs": [], + "source": [ + "top_queries_df.coalesce(1).write.format(\"csv\") \\\n", + " .option(\"header\", \"true\") \\\n", + " .mode(\"Overwrite\") \\\n", + " .option(\"ignoreLeadingWhiteSpace\", \"false\")\\\n", + " .option(\"ignoreTrailingWhiteSpace\", \"false\")\\\n", + " .save(OUTPUT_PATH+\"/top-queries\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.3" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/dataframes/udf.ipynb b/dataframes/udf.ipynb new file mode 100644 index 0000000..d1d25b5 --- /dev/null +++ b/dataframes/udf.ipynb @@ -0,0 +1,715 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.sql.functions import *\n", + "from pyspark.sql.types import *" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "spark = SparkSession \\\n", + " .builder \\\n", + " .appName(\"TopDomains\") \\\n", + " .getOrCreate()" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "df=spark.read.format(\"csv\").load(\"../data/w3data/nutch_URLs_0422/PRD_nutch_urls_20200422.txt\")" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "df1=spark.read.format(\"csv\").load(\"../data/w3data/nutch_URLs_0422/PRD_nutch1_urls_20200422.txt\")" + ] + }, + { + "cell_type": "code", + "execution_count": 37, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "root\n", + " |-- _c0: string (nullable = true)\n", + "\n" + ] + } + ], + "source": [ + "df.printSchema()" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "706251" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df.count()" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "112630" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df1.count()" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "df=df.select(col(\"_c0\").alias(\"url\"))" + ] + }, + { + "cell_type": "code", + "execution_count": 69, + "metadata": {}, + "outputs": [], + "source": [ + "df1=df1.select(col(\"_c0\").alias(\"url\"))" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "root\n", + " |-- url: string (nullable = true)\n", + "\n" + ] + } + ], + "source": [ + "df.printSchema()" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "from urllib.parse import urlparse\n", + "\n", + "@udf\n", + "def getHost(url):\n", + " #parsed_uri = urlparse('http://stackoverflow.com/questions/1234567/blah-blah-blah-blah' )\n", + " #result = '{uri.scheme}://{uri.netloc}/'.format(uri=parsed_uri)\n", + " return urlparse(url).hostname\n", + " #return result" + ] + }, + { + "cell_type": "code", + "execution_count": 47, + "metadata": {}, + "outputs": [], + "source": [ + "from urllib.parse import urlparse\n", + "import os.path\n", + "\n", + "@udf\n", + "def getPathOne(url):\n", + " #parsed_uri = urlparse('http://stackoverflow.com/questions/1234567/blah-blah-blah-blah' )\n", + " #result = '{uri.scheme}://{uri.netloc}/'.format(uri=parsed_uri)\n", + " #return urlparse(url).hostname\n", + " #return result\n", + " path = urlparse(url).path\n", + " paths = path.split('/')\n", + " if len(paths) > 1 :\n", + " return paths[1]" + ] + }, + { + "cell_type": "code", + "execution_count": 52, + "metadata": {}, + "outputs": [], + "source": [ + "from urllib.parse import urlparse\n", + "import os.path\n", + "\n", + "@udf\n", + "def getPathTwo(url):\n", + " #parsed_uri = urlparse('http://stackoverflow.com/questions/1234567/blah-blah-blah-blah' )\n", + " #result = '{uri.scheme}://{uri.netloc}/'.format(uri=parsed_uri)\n", + " #return urlparse(url).hostname\n", + " #return result\n", + " path = urlparse(url).path\n", + " paths = path.split('/')\n", + " if len(paths) > 2 :\n", + " return paths[2]" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [], + "source": [ + "@udf\n", + "def getHostId(url):\n", + " return url.split(':')[0] " + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [], + "source": [ + "@udf\n", + "def getProtocol(url):\n", + " paths = url.split(':')\n", + " if len(paths) > 1 :\n", + " return paths[1].split('/')[0] " + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [], + "source": [ + "@udf\n", + "def getPath1(url):\n", + " paths = url.split(':')\n", + " if len(paths) > 1 :\n", + " paths2 = paths[1].split('/')\n", + " if len(paths2) > 1:\n", + " return paths2[1]" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [], + "source": [ + "@udf\n", + "def getPath2(url):\n", + " paths = url.split(':')\n", + " if len(paths) > 1 :\n", + " paths2 = paths[1].split('/')\n", + " if len(paths2) > 2 :\n", + " return paths2[2]" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [], + "source": [ + "@udf\n", + "def getPath2(url):\n", + " paths = url.split(':')\n", + " if len(paths) > 1 :\n", + " paths2 = paths[1].split('/')\n", + " if len(paths2) > 2 :\n", + " return paths2[2]" + ] + }, + { + "cell_type": "code", + "execution_count": 95, + "metadata": {}, + "outputs": [], + "source": [ + "hostDf = df.withColumn('domain', getHost('url'))\n", + "hostDf = hostDf.withColumn('path1', getPathOne('url'))\n", + "hostDf = hostDf.withColumn('path2', getPathTwo('url'))" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "metadata": {}, + "outputs": [], + "source": [ + "#hostDf = hostDf.withColumn('protocol', getProtocol('url'))" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "metadata": {}, + "outputs": [], + "source": [ + "#hostDf = hostDf.withColumn('path1', getPath1('url'))" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "metadata": {}, + "outputs": [], + "source": [ + "#hostDf = hostDf.withColumn('path2', getPath2('url'))" + ] + }, + { + "cell_type": "code", + "execution_count": 96, + "metadata": {}, + "outputs": [], + "source": [ + "hostDf1 = df1.withColumn('domain', getHost('url'))\n", + "hostDf1 = hostDf1.withColumn('path1', getPathOne('url'))\n", + "hostDf1 = hostDf1.withColumn('path2', getPathTwo('url'))" + ] + }, + { + "cell_type": "code", + "execution_count": 97, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "root\n", + " |-- url: string (nullable = true)\n", + " |-- domain: string (nullable = true)\n", + " |-- path1: string (nullable = true)\n", + " |-- path2: string (nullable = true)\n", + "\n" + ] + } + ], + "source": [ + "hostDf.printSchema()" + ] + }, + { + "cell_type": "code", + "execution_count": 98, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "root\n", + " |-- url: string (nullable = true)\n", + " |-- domain: string (nullable = true)\n", + " |-- path1: string (nullable = true)\n", + " |-- path2: string (nullable = true)\n", + "\n" + ] + } + ], + "source": [ + "hostDf1.printSchema()" + ] + }, + { + "cell_type": "code", + "execution_count": 65, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+----------------------+\n", + "|count(DISTINCT domain)|\n", + "+----------------------+\n", + "| 40|\n", + "+----------------------+\n", + "\n" + ] + } + ], + "source": [ + "from pyspark.sql.functions import countDistinct\n", + "hostDf.select(countDistinct('domain')).show()\n" + ] + }, + { + "cell_type": "code", + "execution_count": 194, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+--------------+\n", + "| path1|\n", + "+--------------+\n", + "| null|\n", + "| |\n", + "| activities|\n", + "| appid|\n", + "| appid|\n", + "| impact|\n", + "| impact|\n", + "| impact|\n", + "| initiatives|\n", + "| initiatives|\n", + "|responsibility|\n", + "|responsibility|\n", + "|responsibility|\n", + "|responsibility|\n", + "| activities|\n", + "| activities|\n", + "| activities|\n", + "| activities|\n", + "| activities|\n", + "| activities|\n", + "+--------------+\n", + "only showing top 20 rows\n", + "\n" + ] + } + ], + "source": [ + "hostDf.select('path1').show()" + ] + }, + { + "cell_type": "code", + "execution_count": 108, + "metadata": {}, + "outputs": [], + "source": [ + "hostnames = hostDf.select(\"domain\", \"path1\") \\\n", + " .groupBy(\"domain\", \"path1\")\\\n", + " .count()\\\n", + " .withColumnRenamed(\"count\",\"nutch2count\")\\\n", + " .orderBy(\"nutch2count\", ascending=False)#\\\n", + " #.show(100, False)" + ] + }, + { + "cell_type": "code", + "execution_count": 109, + "metadata": {}, + "outputs": [], + "source": [ + "hostnames1 = hostDf1.select(\"domain\", \"path1\") \\\n", + " .groupBy(\"domain\",\"path1\")\\\n", + " .count()\\\n", + " .withColumnRenamed(\"count\",\"nutch1count\")\\\n", + " .orderBy(\"count\", ascending=False)\n", + " #.show(100, False)" + ] + }, + { + "cell_type": "code", + "execution_count": 110, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "root\n", + " |-- domain: string (nullable = true)\n", + " |-- path1: string (nullable = true)\n", + " |-- nutch2count: long (nullable = false)\n", + "\n" + ] + } + ], + "source": [ + "hostnames.printSchema()" + ] + }, + { + "cell_type": "code", + "execution_count": 113, + "metadata": {}, + "outputs": [], + "source": [ + "nutch1v2 = hostnames1.join(hostnames, on=['domain', 'path1'], how='outer')\\\n", + " .orderBy(\"nutch2count\", ascending=False)" + ] + }, + { + "cell_type": "code", + "execution_count": 114, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-----------------------------------+-----------------------+-----------+-----------+\n", + "|domain |path1 |nutch1count|nutch2count|\n", + "+-----------------------------------+-----------------------+-----------+-----------+\n", + "|www.ibm.com |blogs |40370 |228736 |\n", + "|developer.ibm.com |answers |null |160757 |\n", + "|procure.sby1.ibm.com |whk |1906 |86669 |\n", + "|w3-03.ibm.com |services |361 |78741 |\n", + "|d01db034.pok.ibm.com |q_dir |37139 |73962 |\n", + "|w3-01.ibm.com |hr |1157 |10137 |\n", + "|productpages.w3ibm.mybluemix.net |ProductPages |204 |6511 |\n", + "|incentivesworkplace.atlanta.ibm.com|node |1 |5223 |\n", + "|secure.video.ibm.com |channel |1490 |3651 |\n", + "|developer.ibm.com |tv |null |3343 |\n", + "|developer.ibm.com |profiles |null |3086 |\n", + "|developer.ibm.com |recipes |null |3036 |\n", + "|incentivesworkplace.atlanta.ibm.com|sites |null |2407 |\n", + "|www.ibm.com |cloud |2 |2369 |\n", + "|developer.ibm.com |code |null |1957 |\n", + "|w3-03.ibm.com |globalization |null |1808 |\n", + "|developer.ibm.com |integration |null |1465 |\n", + "|developer.ibm.com |dwblog |null |1270 |\n", + "|developer.ibm.com |kr |null |1269 |\n", + "|developer.ibm.com |apiconnect |null |1251 |\n", + "|bluepedia.w3ibm.mybluemix.net |index.php |6604 |1151 |\n", + "|developer.ibm.com |hadoop |null |1036 |\n", + "|developer.ibm.com |customer-engagement |null |1031 |\n", + "|developer.ibm.com |wasdev |null |980 |\n", + "|developer.ibm.com |mainframe |null |931 |\n", + "|developer.ibm.com |urbancode |null |901 |\n", + "|developer.ibm.com |opentech |null |818 |\n", + "|w3.ibm.com |developer |400 |784 |\n", + "|developer.ibm.com |predictiveanalytics |null |763 |\n", + "|developer.ibm.com |apm |null |722 |\n", + "|blueprint.sl.bluecloud.ibm.com |b_dir |3 |602 |\n", + "|developer.ibm.com |clouddataservices |null |597 |\n", + "|developer.ibm.com |messaging |null |585 |\n", + "|developer.ibm.com |events |null |583 |\n", + "|w3.ibm.com |thinkacademy |1 |567 |\n", + "|developer.ibm.com |bpm |null |563 |\n", + "|developer.ibm.com |storage |null |560 |\n", + "|developer.ibm.com |tutorials |null |557 |\n", + "|developer.ibm.com |linuxonpower |null |542 |\n", + "|developer.ibm.com |streamsdev |null |536 |\n", + "|developer.ibm.com |patterns |null |491 |\n", + "|developer.ibm.com |cics |null |475 |\n", + "|w3.itso.ibm.com |itsoapps |550 |461 |\n", + "|developer.ibm.com |articles |null |456 |\n", + "|incentivesworkplace.atlanta.ibm.com|video |null |453 |\n", + "|developer.ibm.com |node |null |452 |\n", + "|developer.ibm.com |digexp |null |450 |\n", + "|ibmdev.webmaster.ibm.com |v18 |420 |439 |\n", + "|developer.ibm.com |testing |null |426 |\n", + "|developer.ibm.com |blogs |null |414 |\n", + "|developer.ibm.com |swift |null |403 |\n", + "|w3.ibm.com |hr |9690 |391 |\n", + "|prodpcbhrfaq03.w3-969.ibm.com |index.php |null |386 |\n", + "|developer.ibm.com |cn |null |349 |\n", + "|developer.ibm.com |videos |null |276 |\n", + "|w3-01.ibm.com |services |299 |270 |\n", + "|w3-03.ibm.com |support |1214 |264 |\n", + "|developer.ibm.com |jp |null |245 |\n", + "|developer.ibm.com |identitydev |null |239 |\n", + "|developer.ibm.com |iotplatform |null |236 |\n", + "|developer.ibm.com |docloud |null |216 |\n", + "|www.research.ibm.com |artificial-intelligence|750 |200 |\n", + "|learn.atlanta.ibm.com |la |528 |192 |\n", + "|incentivesworkplace.atlanta.ibm.com|taxonomy |null |184 |\n", + "|developer.ibm.com |powervc |null |163 |\n", + "|w3.itso.ibm.com |abstracts |359 |161 |\n", + "|developer.ibm.com |odm |null |160 |\n", + "|developer.ibm.com |api |null |153 |\n", + "|www.ibm.com |developerworks |null |150 |\n", + "|w3.ibm.com |transform |73 |144 |\n", + "|prodpcrhrfaq01.w3-969.ibm.com |systemstorage |null |132 |\n", + "|developer.ibm.com |announcements |null |130 |\n", + "|developer.ibm.com |cloudautomation |null |128 |\n", + "|developer.ibm.com |itom |null |128 |\n", + "|developer.ibm.com |in |null |127 |\n", + "|developer.ibm.com |series |null |123 |\n", + "|developer.ibm.com |open |null |123 |\n", + "|developer.ibm.com |watsonhealth |null |121 |\n", + "|prodpcrhrfaq01.w3-969.ibm.com |powersystems |null |112 |\n", + "|developer.ibm.com |data |null |110 |\n", + "|cedp-kc-doc.w3ibm.mybluemix.net |CEDP |439 |107 |\n", + "|prodpcbhrfaq03.w3-969.ibm.com |tooling-project |null |105 |\n", + "|developer.ibm.com |exchanges |null |101 |\n", + "|w3-01.ibm.com |chq |169 |96 |\n", + "|developer.ibm.com |qradar |null |94 |\n", + "|developer.ibm.com |javasdk |null |88 |\n", + "|w3.ibm.com |tools |215 |87 |\n", + "|developer.ibm.com |sso |null |84 |\n", + "|developer.ibm.com |components |null |84 |\n", + "|prodpcbhrfaq03.w3-969.ibm.com |user |null |79 |\n", + "|w3.itso.ibm.com |redbooks |null |77 |\n", + "|incentivesworkplace.atlanta.ibm.com|asca |null |75 |\n", + "|developer.ibm.com |technologies |null |75 |\n", + "|prodpcbhrfaq03.w3-969.ibm.com |taxonomy |null |59 |\n", + "|gbs-qms.w3ibm.mybluemix.net |template |160 |57 |\n", + "|incentivesworkplace.atlanta.ibm.com|plan-cycle |null |56 |\n", + "|developer.ibm.com |watson |null |54 |\n", + "|developer.ibm.com |courses |null |50 |\n", + "|incentivesworkplace.atlanta.ibm.com|incremental-incentives |null |50 |\n", + "|developer.ibm.com |code-and-response |null |47 |\n", + "+-----------------------------------+-----------------------+-----------+-----------+\n", + "only showing top 100 rows\n", + "\n" + ] + } + ], + "source": [ + "nutch1v2.show(100, False)" + ] + }, + { + "cell_type": "code", + "execution_count": 104, + "metadata": {}, + "outputs": [ + { + "ename": "AnalysisException", + "evalue": "'USING column `path1` cannot be resolved on the left side of the join. The left-side columns: [domain, nutch1count];'", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mPy4JJavaError\u001b[0m Traceback (most recent call last)", + "\u001b[0;32m/usr/local/Cellar/apache-spark/2.4.3/libexec/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mdeco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m 62\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 63\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mf\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 64\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mpy4j\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mprotocol\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPy4JJavaError\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m/usr/local/Cellar/apache-spark/2.4.3/libexec/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py\u001b[0m in \u001b[0;36mget_return_value\u001b[0;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[1;32m 327\u001b[0m \u001b[0;34m\"An error occurred while calling {0}{1}{2}.\\n\"\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 328\u001b[0;31m format(target_id, \".\", name), value)\n\u001b[0m\u001b[1;32m 329\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;31mPy4JJavaError\u001b[0m: An error occurred while calling o840.join.\n: org.apache.spark.sql.AnalysisException: USING column `path1` cannot be resolved on the left side of the join. The left-side columns: [domain, nutch1count];\n\tat org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$98$$anonfun$apply$71.apply(Analyzer.scala:2337)\n\tat org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$98$$anonfun$apply$71.apply(Analyzer.scala:2337)\n\tat scala.Option.getOrElse(Option.scala:121)\n\tat org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$98.apply(Analyzer.scala:2336)\n\tat org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$98.apply(Analyzer.scala:2335)\n\tat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)\n\tat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)\n\tat scala.collection.Iterator$class.foreach(Iterator.scala:891)\n\tat scala.collection.AbstractIterator.foreach(Iterator.scala:1334)\n\tat scala.collection.IterableLike$class.foreach(IterableLike.scala:72)\n\tat scala.collection.AbstractIterable.foreach(Iterable.scala:54)\n\tat scala.collection.TraversableLike$class.map(TraversableLike.scala:234)\n\tat scala.collection.AbstractTraversable.map(Traversable.scala:104)\n\tat org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$commonNaturalJoinProcessing(Analyzer.scala:2335)\n\tat org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNaturalAndUsingJoin$$anonfun$apply$34.applyOrElse(Analyzer.scala:2225)\n\tat org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNaturalAndUsingJoin$$anonfun$apply$34.applyOrElse(Analyzer.scala:2222)\n\tat org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$anonfun$resolveOperatorsUp$1$$anonfun$apply$1.apply(AnalysisHelper.scala:90)\n\tat org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$anonfun$resolveOperatorsUp$1$$anonfun$apply$1.apply(AnalysisHelper.scala:90)\n\tat org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)\n\tat org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$anonfun$resolveOperatorsUp$1.apply(AnalysisHelper.scala:89)\n\tat org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$anonfun$resolveOperatorsUp$1.apply(AnalysisHelper.scala:86)\n\tat org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:194)\n\tat org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.resolveOperatorsUp(AnalysisHelper.scala:86)\n\tat org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:29)\n\tat org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNaturalAndUsingJoin$.apply(Analyzer.scala:2222)\n\tat org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNaturalAndUsingJoin$.apply(Analyzer.scala:2221)\n\tat org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87)\n\tat org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84)\n\tat scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)\n\tat scala.collection.immutable.List.foldLeft(List.scala:84)\n\tat org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84)\n\tat org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)\n\tat scala.collection.immutable.List.foreach(List.scala:392)\n\tat org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)\n\tat org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:127)\n\tat org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:121)\n\tat org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:106)\n\tat org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:105)\n\tat org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)\n\tat org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)\n\tat org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)\n\tat org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)\n\tat org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)\n\tat org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:78)\n\tat org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3406)\n\tat org.apache.spark.sql.Dataset.join(Dataset.scala:938)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:238)\n\tat java.lang.Thread.run(Thread.java:748)\n", + "\nDuring handling of the above exception, another exception occurred:\n", + "\u001b[0;31mAnalysisException\u001b[0m Traceback (most recent call last)", + "\u001b[0;32m\u001b[0m in \u001b[0;36m\u001b[0;34m\u001b[0m\n\u001b[0;32m----> 1\u001b[0;31m \u001b[0mnutch1v2_2\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mhostnames1\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mjoin\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mhostnames\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mhow\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34m'outer'\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mon\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;34m'domain'\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m'path1'\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;31m\\\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 2\u001b[0m \u001b[0;34m.\u001b[0m\u001b[0morderBy\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"nutch2count\"\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mascending\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mFalse\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m/usr/local/Cellar/apache-spark/2.4.3/libexec/python/pyspark/sql/dataframe.py\u001b[0m in \u001b[0;36mjoin\u001b[0;34m(self, other, on, how)\u001b[0m\n\u001b[1;32m 1047\u001b[0m \u001b[0mon\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jseq\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1048\u001b[0m \u001b[0;32massert\u001b[0m \u001b[0misinstance\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mhow\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mbasestring\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m\"how should be basestring\"\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 1049\u001b[0;31m \u001b[0mjdf\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jdf\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mjoin\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mother\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jdf\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mon\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mhow\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 1050\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mDataFrame\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mjdf\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msql_ctx\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1051\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m/usr/local/Cellar/apache-spark/2.4.3/libexec/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m 1255\u001b[0m \u001b[0manswer\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1256\u001b[0m return_value = get_return_value(\n\u001b[0;32m-> 1257\u001b[0;31m answer, self.gateway_client, self.target_id, self.name)\n\u001b[0m\u001b[1;32m 1258\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1259\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mtemp_arg\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mtemp_args\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m/usr/local/Cellar/apache-spark/2.4.3/libexec/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mdeco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m 67\u001b[0m e.java_exception.getStackTrace()))\n\u001b[1;32m 68\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0ms\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstartswith\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'org.apache.spark.sql.AnalysisException: '\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 69\u001b[0;31m \u001b[0;32mraise\u001b[0m \u001b[0mAnalysisException\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0ms\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msplit\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m': '\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;36m1\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mstackTrace\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 70\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0ms\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstartswith\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'org.apache.spark.sql.catalyst.analysis'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 71\u001b[0m \u001b[0;32mraise\u001b[0m \u001b[0mAnalysisException\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0ms\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msplit\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m': '\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;36m1\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mstackTrace\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;31mAnalysisException\u001b[0m: 'USING column `path1` cannot be resolved on the left side of the join. The left-side columns: [domain, nutch1count];'" + ] + } + ], + "source": [ + "nutch1v2_2 = hostnames1.join(hostnames, how='outer', on=['domain','path1'])\\\n", + " .orderBy(\"nutch2count\", ascending=False)" + ] + }, + { + "cell_type": "code", + "execution_count": 204, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-----+-------+\n", + "|path1| count|\n", + "+-----+-------+\n", + "|blogs|3178314|\n", + "| ibm| 2279|\n", + "|cloud| 1674|\n", + "|demos| 18|\n", + "+-----+-------+\n", + "\n" + ] + } + ], + "source": [ + "hostnames = hostDf.select(\"domain\", \"path1\", \"path2\") \\\n", + " .where(col(\"domain\") == \"com.ibm.www\") \\\n", + " .groupBy(\"path1\")\\\n", + " .count()\\\n", + " .orderBy(\"count\", ascending=False)\\\n", + " .show()" + ] + }, + { + "cell_type": "code", + "execution_count": 193, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-----+-----+\n", + "|path1|count|\n", + "+-----+-----+\n", + "+-----+-----+\n", + "\n" + ] + } + ], + "source": [ + "hostnames.show(200, False)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.3" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/dataframes/vectorized_udf.ipynb b/dataframes/vectorized_udf.ipynb new file mode 100644 index 0000000..3b4da56 --- /dev/null +++ b/dataframes/vectorized_udf.ipynb @@ -0,0 +1,354 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "\n", + "from pyspark.sql.functions import col, pandas_udf, PandasUDFType\n", + "from pyspark.sql.types import LongType\n", + "from pyspark.sql import Window" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "spark.conf.set(\"spark.sql.execution.arrow.enabled\", \"true\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Scalar" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "\n", + "from pyspark.sql.functions import col, pandas_udf, PandasUDFType\n", + "from pyspark.sql.types import LongType" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "# Declare the function and create the UDF\n", + "# take pandas.Series as inputs and return a pandas.Series\n", + "@pandas_udf(returnType=LongType())\n", + "def multiply(a, b):\n", + " return a * b" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "x = pd.Series([1, 2, 3])" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [], + "source": [ + "# Create a Spark DataFrame, 'spark' is an existing SparkSession\n", + "df = spark.createDataFrame(pd.DataFrame(x, columns=[\"x\"]))" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+\n", + "| x|\n", + "+---+\n", + "| 1|\n", + "| 2|\n", + "| 3|\n", + "+---+\n", + "\n" + ] + } + ], + "source": [ + "df.show()" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+--------------+\n", + "|multiply(x, x)|\n", + "+--------------+\n", + "| 1|\n", + "| 4|\n", + "| 9|\n", + "+--------------+\n", + "\n" + ] + } + ], + "source": [ + "# Execute function as a Spark vectorized UDF\n", + "df.select(multiply(col(\"x\"), col(\"x\"))).show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Grouped MAP" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [], + "source": [ + "df = spark.createDataFrame(\n", + " [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],\n", + " (\"id\", \"v\"))" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+----+\n", + "| id| v|\n", + "+---+----+\n", + "| 1| 1.0|\n", + "| 1| 2.0|\n", + "| 2| 3.0|\n", + "| 2| 5.0|\n", + "| 2|10.0|\n", + "+---+----+\n", + "\n" + ] + } + ], + "source": [ + "df.show()" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [], + "source": [ + "# The input and output of the function are both pandas.DataFrame\n", + "@pandas_udf(\"id long, v double\", PandasUDFType.GROUPED_MAP)\n", + "def subtract_mean(pdf):\n", + " # pdf is a pandas.DataFrame\n", + " v = pdf.v\n", + " return pdf.assign(v=v - v.mean())" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+----+\n", + "| id| v|\n", + "+---+----+\n", + "| 1|-0.5|\n", + "| 1| 0.5|\n", + "| 2|-3.0|\n", + "| 2|-1.0|\n", + "| 2| 4.0|\n", + "+---+----+\n", + "\n" + ] + } + ], + "source": [ + "df.groupby(\"id\").apply(subtract_mean).show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Grouped Aggregate" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "df = spark.createDataFrame(\n", + " [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],\n", + " (\"id\", \"v\"))" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+----+\n", + "| id| v|\n", + "+---+----+\n", + "| 1| 1.0|\n", + "| 1| 2.0|\n", + "| 2| 3.0|\n", + "| 2| 5.0|\n", + "| 2|10.0|\n", + "+---+----+\n", + "\n" + ] + } + ], + "source": [ + "df.show()" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "# Aggregation from one or more pandas.Series to a scalar\n", + "@pandas_udf(\"double\", PandasUDFType.GROUPED_AGG)\n", + "def mean_udf(v):\n", + " return v.mean()" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+-----------+\n", + "| id|mean_udf(v)|\n", + "+---+-----------+\n", + "| 1| 1.5|\n", + "| 2| 6.0|\n", + "+---+-----------+\n", + "\n" + ] + } + ], + "source": [ + "df.groupby(\"id\").agg(mean_udf(df['v'])).show()" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "w = Window \\\n", + " .partitionBy('id') \\\n", + " .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+----+------+\n", + "| id| v|mean_v|\n", + "+---+----+------+\n", + "| 1| 1.0| 1.5|\n", + "| 1| 2.0| 1.5|\n", + "| 2| 3.0| 6.0|\n", + "| 2| 5.0| 6.0|\n", + "| 2|10.0| 6.0|\n", + "+---+----+------+\n", + "\n" + ] + } + ], + "source": [ + "df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.3" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/dataframes/window.ipynb b/dataframes/window.ipynb new file mode 100644 index 0000000..54b737e --- /dev/null +++ b/dataframes/window.ipynb @@ -0,0 +1,347 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "\n", + "
\n", + "

SparkSession - hive

\n", + " \n", + "
\n", + "

SparkContext

\n", + "\n", + "

Spark UI

\n", + "\n", + "
\n", + "
Version
\n", + "
v2.4.3
\n", + "
Master
\n", + "
local[*]
\n", + "
AppName
\n", + "
PySparkShell
\n", + "
\n", + "
\n", + " \n", + "
\n", + " " + ], + "text/plain": [ + "" + ] + }, + "execution_count": 1, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "spark" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "import sys\n", + "from pyspark.sql.window import Window\n", + "from pyspark.sql.functions import *\n" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "revenue_df = spark.read.format(\"csv\")\\\n", + " .option(\"header\", \"true\")\\\n", + " .option(\"inferSchema\", \"true\")\\\n", + " .load(\"../data/product-revenue/*.csv\")" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "root\n", + " |-- product: string (nullable = true)\n", + " |-- category: string (nullable = true)\n", + " |-- revenue: integer (nullable = true)\n", + "\n" + ] + } + ], + "source": [ + "revenue_df.printSchema()" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+----------+----------+-------+\n", + "| product| category|revenue|\n", + "+----------+----------+-------+\n", + "| Thin|Cell Phone| 6000|\n", + "| Normal| Tablet| 1500|\n", + "| Mini| Tablet| 5500|\n", + "|Ultra thin|Cell Phone| 5000|\n", + "| Very thin|Cell Phone| 6000|\n", + "| Big| Tablet| 2500|\n", + "| Bendable|Cell Phone| 3000|\n", + "| Foldable|Cell Phone| 3000|\n", + "| Pro| Tablet| 4500|\n", + "| Pro2| Tablet| 6500|\n", + "+----------+----------+-------+\n", + "\n" + ] + } + ], + "source": [ + "revenue_df.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Group Aggregate Functions" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+----------+------------+---------------------+\n", + "| category|sum(revenue)|collect_list(product)|\n", + "+----------+------------+---------------------+\n", + "| Tablet| 20500| [Normal, Mini, Bi...|\n", + "|Cell Phone| 23000| [Thin, Ultra thin...|\n", + "+----------+------------+---------------------+\n", + "\n" + ] + } + ], + "source": [ + "revenue_df.groupBy(\"category\")\\\n", + " .agg(sum(\"revenue\"), collect_list(\"product\"))\\\n", + " .show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Window Aggregate Functions" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### What is the difference between the revenue of each product and the revenue of the best-selling product in the same category of that product?" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [], + "source": [ + "windowSpec = Window \\\n", + " .partitionBy(revenue_df['category']) \\\n", + " .orderBy(revenue_df['revenue'].desc()) \\\n", + " .rangeBetween(-sys.maxsize, sys.maxsize)" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "revenue_difference = \\\n", + " (max(revenue_df['revenue']).over(windowSpec) - revenue_df['revenue'])" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+----------+----------+-------+------------------+\n", + "| product| category|revenue|revenue_difference|\n", + "+----------+----------+-------+------------------+\n", + "| Pro2| Tablet| 6500| 0.0|\n", + "| Mini| Tablet| 5500| 1000.0|\n", + "| Pro| Tablet| 4500| 2000.0|\n", + "| Big| Tablet| 2500| 4000.0|\n", + "| Normal| Tablet| 1500| 5000.0|\n", + "| Thin|Cell Phone| 6000| 0.0|\n", + "| Very thin|Cell Phone| 6000| 0.0|\n", + "|Ultra thin|Cell Phone| 5000| 1000.0|\n", + "| Bendable|Cell Phone| 3000| 3000.0|\n", + "| Foldable|Cell Phone| 3000| 3000.0|\n", + "+----------+----------+-------+------------------+\n", + "\n" + ] + } + ], + "source": [ + "revenue_df.select(\n", + " revenue_df['product'],\n", + " revenue_df['category'],\n", + " revenue_df['revenue'],\n", + " revenue_difference.alias(\"revenue_difference\")).show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Rollup" + ] + }, + { + "cell_type": "code", + "execution_count": 31, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+----------+----------+------------+-------------+\n", + "| category| product|sum(revenue)|grouping_id()|\n", + "+----------+----------+------------+-------------+\n", + "| Tablet| null| 20500| 1|\n", + "|Cell Phone| Very thin| 6000| 0|\n", + "| Tablet| Mini| 5500| 0|\n", + "| null| null| 43500| 3|\n", + "| Tablet| Pro| 4500| 0|\n", + "|Cell Phone| null| 23000| 1|\n", + "|Cell Phone| Thin| 6000| 0|\n", + "|Cell Phone| Bendable| 3000| 0|\n", + "|Cell Phone| Foldable| 3000| 0|\n", + "|Cell Phone|Ultra thin| 5000| 0|\n", + "| Tablet| Big| 2500| 0|\n", + "| Tablet| Normal| 1500| 0|\n", + "| Tablet| Pro2| 6500| 0|\n", + "+----------+----------+------------+-------------+\n", + "\n" + ] + } + ], + "source": [ + "revenue_df.rollup( \"category\", \"product\")\\\n", + " .agg(sum(\"revenue\"), grouping_id()).show(100)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Cube" + ] + }, + { + "cell_type": "code", + "execution_count": 30, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+----------+----------+------------+-------------+\n", + "| category| product|sum(revenue)|grouping_id()|\n", + "+----------+----------+------------+-------------+\n", + "| null| Bendable| 3000| 2|\n", + "| null| Very thin| 6000| 2|\n", + "| null| Normal| 1500| 2|\n", + "| Tablet| null| 20500| 1|\n", + "| null| Mini| 5500| 2|\n", + "|Cell Phone| Very thin| 6000| 0|\n", + "| null| Pro2| 6500| 2|\n", + "| Tablet| Mini| 5500| 0|\n", + "| null| null| 43500| 3|\n", + "| Tablet| Pro| 4500| 0|\n", + "| null| Foldable| 3000| 2|\n", + "| null| Pro| 4500| 2|\n", + "| null| Thin| 6000| 2|\n", + "|Cell Phone| null| 23000| 1|\n", + "|Cell Phone| Thin| 6000| 0|\n", + "|Cell Phone| Bendable| 3000| 0|\n", + "|Cell Phone| Foldable| 3000| 0|\n", + "|Cell Phone|Ultra thin| 5000| 0|\n", + "| Tablet| Big| 2500| 0|\n", + "| Tablet| Normal| 1500| 0|\n", + "| null| Big| 2500| 2|\n", + "| Tablet| Pro2| 6500| 0|\n", + "| null|Ultra thin| 5000| 2|\n", + "+----------+----------+------------+-------------+\n", + "\n" + ] + } + ], + "source": [ + "revenue_df.cube(\"category\", \"product\")\\\n", + " .agg(sum(\"revenue\"), grouping_id()).show(100)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.3" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}