-
Notifications
You must be signed in to change notification settings - Fork 270
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
50b1764
commit ccbfff6
Showing
1 changed file
with
355 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,355 @@ | ||
{ | ||
"cells": [ | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"## Partitioning\n", | ||
"\n", | ||
"When an RDD is created, you can specify the number of partitions.\n", | ||
"<br>The default is the number of workers defined when you setu th `SparkContext`" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 1, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"from pyspark import SparkContext" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"### Creating `SparkContext` with 2 workers" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 2, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"sc = SparkContext(master=\"local[2]\")" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 3, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"A = sc.parallelize(range(1000000))" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"### Use `getNumPartition` to retrive the number of partitions created" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 4, | ||
"metadata": {}, | ||
"outputs": [ | ||
{ | ||
"name": "stdout", | ||
"output_type": "stream", | ||
"text": [ | ||
"2\n" | ||
] | ||
} | ||
], | ||
"source": [ | ||
"print(A.getNumPartitions())" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"### We can repartition _A_ in any number of partitions we want" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 5, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"D = A.repartition(10)" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 6, | ||
"metadata": {}, | ||
"outputs": [ | ||
{ | ||
"name": "stdout", | ||
"output_type": "stream", | ||
"text": [ | ||
"10\n" | ||
] | ||
} | ||
], | ||
"source": [ | ||
"print(D.getNumPartitions())" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"### We can also set the number of partitions while creating the RDD with `numSlices` argument " | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 7, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"A = sc.parallelize(range(1000000),numSlices=8)" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 8, | ||
"metadata": {}, | ||
"outputs": [ | ||
{ | ||
"name": "stdout", | ||
"output_type": "stream", | ||
"text": [ | ||
"8\n" | ||
] | ||
} | ||
], | ||
"source": [ | ||
"print(A.getNumPartitions())" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"### Why partitions are important?\n", | ||
"\n", | ||
"* They define the unit the executor works on\n", | ||
"* You should have at least as many partitions as the number of worker nodes\n", | ||
"* Smaller partitions may allow more parallelization" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"## Repartitioning for Load Balancing" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"Suppose we start with 10 partitions, all with exactly the same number of elements" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 9, | ||
"metadata": {}, | ||
"outputs": [ | ||
{ | ||
"name": "stdout", | ||
"output_type": "stream", | ||
"text": [ | ||
"[100000, 100000, 100000, 100000, 100000, 100000, 100000, 100000, 100000, 100000]\n" | ||
] | ||
} | ||
], | ||
"source": [ | ||
"A=sc.parallelize(range(1000000)).map(lambda x:(x,x)).partitionBy(10)\n", | ||
"print(A.glom().map(len).collect())" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"Suppose we want to use **`filter()`** to select some of the elements in A.<br>\n", | ||
"Some partitions might have more elements remaining than others." | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 10, | ||
"metadata": {}, | ||
"outputs": [ | ||
{ | ||
"name": "stdout", | ||
"output_type": "stream", | ||
"text": [ | ||
"[100000, 0, 0, 0, 0, 100000, 0, 0, 0, 0]\n" | ||
] | ||
} | ||
], | ||
"source": [ | ||
"#select 10% of the entries\n", | ||
"# A bad filter for numbers divisable by 5\n", | ||
"B=A.filter(lambda pair: pair[0]%5==0)\n", | ||
"# get no. of partitions\n", | ||
"print(B.glom().map(len).collect())" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"Future operations on B will use only two workers.<br>\n", | ||
"The other workers will do nothing, because their partitions are empty." | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"### To fix the situation we need to repartition the unbalanced RDD. <br>One way to do that is to repartition using a new key using the method `partitionBy()`\n", | ||
"\n", | ||
"* The method **`.partitionBy(k)`** expects to get a **`(key,value)`** RDD where keys are integers.\n", | ||
"* Partitions the RDD into **`k`** partitions.\n", | ||
"* The element **`(key,value)`** is placed into partition no. **`key % k`**" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 11, | ||
"metadata": {}, | ||
"outputs": [ | ||
{ | ||
"name": "stdout", | ||
"output_type": "stream", | ||
"text": [ | ||
"[20000, 20000, 20000, 20000, 20000, 20000, 20000, 20000, 20000, 20000]\n" | ||
] | ||
} | ||
], | ||
"source": [ | ||
"C=B.map(lambda pair:(pair[1]/10,pair[1])).partitionBy(10) \n", | ||
"print(C.glom().map(len).collect())" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"Note, how **`C`** consists of only 200,000 elements from the unbalanced **`B`** partition but redistributes them in equal partitions of 20,000 elements each." | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"### Another approach is to use random partitioning using **`repartition(k)`**\n", | ||
"* An **advantage** of random partitioning is that it does not require defining a key.\n", | ||
"* A **disadvantage** of random partitioning is that you have no control on the partitioning i.e. which elements go to which partition." | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 12, | ||
"metadata": {}, | ||
"outputs": [ | ||
{ | ||
"name": "stdout", | ||
"output_type": "stream", | ||
"text": [ | ||
"[20000, 20000, 20000, 20000, 20000, 20000, 20000, 20000, 20000, 20000]\n" | ||
] | ||
} | ||
], | ||
"source": [ | ||
"C=B.repartition(10)\n", | ||
"print(C.glom().map(len).collect())" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"## `Glom()`\n", | ||
"* In general, spark does not allow the worker to refer to specific elements of the RDD.\n", | ||
"* Keeps the language clean, but can be a major limitation.\n", | ||
"\n", | ||
"#### `glom()` transforms each partition into a tuple (immutabe list) of elements.<br> Creates an RDD of tules. One tuple per partition. <br>Workers can refer to elements of the partition by index but you cannot assign values to the elements, the RDD is still immutable.\n", | ||
"\n", | ||
"* Consider **the command used above to count the number of elements in each partition.**: `print(C.glom().map(len).collect())`\n", | ||
"* We used `glom()` to make each partition into a tuple.\n", | ||
"* We used `len` on each partition to get the length of the tuple - size of the partition.\n", | ||
"* We `collect`ed the results to print them out." | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"### A more elaborate example\n", | ||
"There are many things that you can do using `glom()`.\n", | ||
"<br>\n", | ||
"For example, suppose we want to get the first element, the number of elements, and the sum of the elements of the unbalanced partitions we made from `A` into `B`. Of the partition is empty we just return `None`." | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": 14, | ||
"metadata": {}, | ||
"outputs": [ | ||
{ | ||
"name": "stdout", | ||
"output_type": "stream", | ||
"text": [ | ||
"[(0, 100000, 999990), None, None, None, None, (5, 100000, 999990), None, None, None, None]\n" | ||
] | ||
} | ||
], | ||
"source": [ | ||
"def getPartitionInfo(G):\n", | ||
" d=0\n", | ||
" if len(G)>1: \n", | ||
" for i in range(len(G)-1):\n", | ||
" d+=abs(G[i+1][1]-G[i][1]) # access the glomed RDD that is now a tuple (immutable list)\n", | ||
" return (G[0][0],len(G),d)\n", | ||
" else:\n", | ||
" return(None)\n", | ||
"\n", | ||
"output=B.glom().map(lambda B: getPartitionInfo(B)).collect()\n", | ||
"print(output)" | ||
] | ||
} | ||
], | ||
"metadata": { | ||
"kernelspec": { | ||
"display_name": "Python 3", | ||
"language": "python", | ||
"name": "python3" | ||
}, | ||
"language_info": { | ||
"codemirror_mode": { | ||
"name": "ipython", | ||
"version": 3 | ||
}, | ||
"file_extension": ".py", | ||
"mimetype": "text/x-python", | ||
"name": "python", | ||
"nbconvert_exporter": "python", | ||
"pygments_lexer": "ipython3", | ||
"version": "3.6.6" | ||
} | ||
}, | ||
"nbformat": 4, | ||
"nbformat_minor": 2 | ||
} |