Here we present a step by step guide to building a batch pipeline in Python using Apache Beam.
The "main" brach has the complete code for the batch pipeline.
the "workshop" branch has code with gaps that are to be completed live in the workshop.
Checkout the "workshop" branch and challenge yourself !
git checkout workshop
This is a hands on workshop, please come ready with Python 3.10 installed:
- use the Python offical downloads
- use pyenv
- following the instructions for Google Cloud
- ...of whatever you prefer
This is a hands on workshop, please come ready with a development environment:
- VSCode
- PyCharm
- GCP cloud shell
- GCP cloud shell editor
- GCP workstation
- ... or whatever you prefer !
With python pointing at python 3.10 run the following
Create a virtual environment
python -m venv venv
Activate the virual environment
source venv/bin/activate
While activated, your python and pip commands with point to the virtual environment, so any changes or installed dependencies are self-contained.
Execute from the root of this repo to initialize the pipeline code.
First, update pip before installing dependencies. It's always a good idea to do this.
pip install -U pip
Next, install the project as a local package. This installs all dependencies as well.
pip install -e .
run this python one-liner to check that you python + apache beam installed
python -c "import apache_beam as beam; import sys; print(f'beam version is = {beam.__version__}'); print(f'python version is = {sys.version}')"
Output shold be similar to
beam version is = 2.50.0
python version is = 3.10.0 (default, Apr 28 2023, 17:16:10) [Clang 14.0.3 (clang-1403.0.22.14.1)]
To run all the tests
python -m unittest -v
To run just a single test, choose from the below.
For task 1
python -m unittest test.test_transforms.TestExtractSpeech.test_task_1_extract_speech
For task 2
python -m unittest test.test_transforms.TestSpeechToWords.test_task_2_speech_to_words
For task 3
python -m unittest test.test_transforms.TestSanitizeWords.test_task_3_sanitize_words
For task 4
python -m unittest test.test_transforms.TestCountWordFrequency.test_task_4_count_word_frequency
For task 5
python -m unittest test.test_transforms.TopWords.test_task_5_top_words
For task 6
python -m unittest test.test_transforms.Prettify.test_task_6_prettify
For task 7
python -m unittest test.test_transforms.FrequentWords.test_task_7_frequent_words
It is possible to run the pipeline on your machine using the DirectRunner.
Run it for a sample of the script of Harry Potter & the Philosopher's stone.
python main.py \
--runner DirectRunner \
--save_main_session \
--setup_file ./setup.py \
--input_filename=data/input/harry_potter_philosopher_stone_script_sample.csv \
--output_filename=data/output/harry_potter_philosopher_stone_script_sample_results.txt
Run it for the entire script of Harry Potter & the Philosopher's stone.
python main.py \
--runner DirectRunner \
--save_main_session \
--setup_file ./setup.py \
--input_filename=data/input/harry_potter_philosopher_stone_script.csv \
--output_filename=data/output/harry_potter_philosopher_stone_script_results.txt
Here is a description of what each of these flags mean
Flag | Description |
---|---|
runner | Apache Beam execution engine or "runner", e.g. DirectRunner or DataflowRunner |
streaming | By omitting this the pipeline does not execute in streaming mode but in batch mode |
save_main_session | Make global imports availabe to all dataflow workers details |
setup_file | To hanle Multiple File Dependencies details |
input-filename | Custom. The input file to the pipeline. |
output-filename | Custom. The output file of the pipeline |
This is based on a workshop created by Israel Herraiz which can be found here
This uses a dataset from Kaggle which can be found here