A Publisher Subscriber based ML application predicting fashion-mnist images through Apache Kafka or Google PubSub
Language: python
Requirements: docker, git
Everything else:
The following containers are pulled automatically in Step 2 below:
- confluentinc/cp-kafka (kafka cluster)
- confluentinc/cp-zookeeper (dependency of above image)
- sathyarrr/fashion-mnist-pubsub (actual app)
On top of ubuntu image,
- GooglePubSub emulator installed
- Google Cloud SDK, python, Java installed
- TensorFlow model server installed
- All required python packages installed
Step 1: Clone the repository. Delete .gitkeep
files under workdir folders. They are used only to commit empty directories to git.
Step 2: Open the terminal from root of the repo.
Download required containers and get the shell of app’s container
docker compose run my-app
The working directory at this stage is /my-app/app
Start the TensorFlow model server in background:
nohup tensorflow_model_server \
--rest_api_port=8501 \
--model_name=fashion-model \
--model_base_path="/my-app/fashion-models" &
Start the GooglePubSub emulator in background:
nohup gcloud beta emulators pubsub start --project=meow &
Initialize the brokers:
python3 -u initialize_broker.py
Start the Publisher in background:
nohup python3 -u publisher.py \
--input '/my-app/workdir/input-images' \
--published '/my-app/workdir/published-images' &
Start the Subscriber in background:
nohup python3 -u subscriber.py \
--output '/my-app/workdir/predicted-images' \
--url 'http://localhost:8501/v1/models/fashion-model:predict' &
*At this stage, jobs -l
should say 4 jobs are Running.
Step 3: Use the tools in Part 1 below to give inputs
By default, GooglePubSub broker is configured to run. It can be changed to ApacheKafka with ML_APP_BROKER
environment variable. Ref: How it Works?
The running publisher and subscriber should be killed(kill %[Job ID]
) before updating the environment variable and are restarted
TensorFlow-model-server: used to serve ML models in production
- Multiple models are easily managed
- Multiple versions of models are managed as well
- Support gRPC and REST APIs
- Intuitive Endpoint: POST http://host:port/v1/models/${MODEL_NAME}[/versions/${VERSION}|/labels/${LABEL}]:predict
app/trainer.py
- Generates a new fashion-mnist model based on CNN for the given data
- Trained model can be exported after looking at the accuracy
- If the export path is tf-model-server’s root, the model will be immediately served
Usage:
python3 trainer.py \
--images '/my-app/train-data/train-images-idx3-ubyte.gz' \
--labels '/my-app/train-data/train-labels-idx1-ubyte.gz' \
--export '/my-app/fashion-models'
app/ubyte_to_png.py
- Converts the fashion-mnist dataset (idx3-ubyte.gz) into actual image (.png)
- Number of images to convert are specified by start, end index (actual dataset has >10k images)
Usage:
python3 ubyte_to_png.py \
--images '/my-app/test-data/t10k-images-idx3-ubyte.gz' \
--labels '/my-app/test-data/t10k-labels-idx1-ubyte.gz' \
--output '/my-app/workdir/input-images' \
--start 0 \
--end 4
Brokers for the Publisher Subscriber model:
(Both are On-premise applications and don’t interact with Cloud)
- GooglePubSub and ApacheKafka inherits a common abstraction
- Both are interchanged seamlessly in the application layer
- GooglePubSub speaks to its emulator while ApacheKafka speaks to its cluster (Docker) internally
The model from Part 1 and the library from Part 2 are used to build a Machine Learning application.
initialize_broker.py
- Initializes both GooglePubSub and ApacheKafka
- Creates required topic (input-topic) in both the broker systems
Usage: python3 -u initialize_broker.py
The environment variable ML_APP_BROKER
says which of the two brokers to use in the running application.
Possible values: google-pubsub
or apache-kafka
publisher.py
- Keeps watching workdir/input-images directory for new images
- If an image is available, its data is published to input-topic as numpy array
- Once published, the images are moved to workdir/published-images directory
Usage:
nohup python3 -u publisher.py \
--input '/my-app/workdir/input-images' \
--published '/my-app/workdir/published-images' &
*(ubyte_to_png.py is used to fill workdir/input-images with new images)
subscriber.py
- When a new message is available at input-topic, it is subscribed
- The data (numpy array) in the message is predicted against Tensorflow model server
- The image is constructed from numpy array and is again stored in workdir/predicted-images directory
- The result class of the data is appended to the name of the image
(e.g., 0.png -> 0-Shirt.png)
Usage:
nohup python3 -u subscriber.py \
--output '/my-app/workdir/predicted-images' \
--url 'http://localhost:8501/v1/models/fashion-model:predict' &
The Publisher and Subscriber runs infinitely in the background doing their job.
broker_tester.py
Contains unit test cases to validate the Brokers.
- Almost all python utilities have
--help
to know more about the arguments - On restart, existing topics of Google emulator will get deleted
- Apache Kafka holds all the created topics unless explicitly deleted by creating a new container of docker
- Duplicate inputs are not taken care. The results will just be overwritten
- If the data subscribed and not successfully predicted against tf-model-server, then it is lost