## Client Generation with Protocol Buffers

gRPC is a high-performance, open-source universal RPC (Remote Procedure Call) framework that uses protocol buffers as its interface definition language. One of its powerful features is the automatic generation of client and server code from .proto files, which define the service methods and message types. This process simplifies the development of gRPC services and clients, making it easier to build distributed applications and microservices. Here's how gRPC allows client (and server) code generation based on .proto files:

## 1. Retrieve Service and Messages in .proto File

For 8.4 the .proto file can be retrieved here:

curl -O

## 2. Install Required Tools
To generate Python code, you need the Protocol Buffer compiler (protoc) and the Python gRPC plugin. If you haven't installed these, you can do so as follows:

Install protoc from the [official releases]( page or via a package manager for your system.

apt install -y protobuf-compiler

Install the Python gRPC tools using pip:

pip install grpcio-tools

## 3. Generate Python gRPC Code

python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. gateway.proto

After running this command, you should see two new files in your directory:

- ``: Contains the generated request and response classes.
- ``: Contains the generated client and server classes.

### 4. Implement OAuth Interceptor

The interceptor is required to seamlessly inject authentication tokens into all outgoing gRPC requests, ensuring secure communication with the Zeebe broker without manually adding tokens to each call. It works by intercepting each call, obtaining a fresh OAuth token if necessary, and appending it to the request's metadata as an Authorization header.

Example Implementation [here](

### 5. Write Zeebe Client
Example Implementation [here](

<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="" xmlns:bpmndi="" xmlns:dc="" xmlns:zeebe="" xmlns:di="" xmlns:modeler="" id="Definitions_0uvm6ju" targetNamespace="" exporter="Camunda Modeler" exporterVersion="5.16.0" modeler:executionPlatform="Camunda Cloud" modeler:executionPlatformVersion="8.3.0">
<bpmn:process id="example" isExecutable="true">
<bpmn:startEvent id="StartEvent_1">
<bpmn:sequenceFlow id="Flow_1ch9458" sourceRef="StartEvent_1" targetRef="Activity_1bv9kj9" />
<bpmn:endEvent id="Event_155fi0m">
<bpmn:sequenceFlow id="Flow_0c6v5vz" sourceRef="Activity_1bv9kj9" targetRef="Event_155fi0m" />
<bpmn:serviceTask id="Activity_1bv9kj9" name="example">
<zeebe:taskDefinition type="dummy" />
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="ExceptionHandling">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="99" width="36" height="36" />
<bpmndi:BPMNShape id="Event_155fi0m_di" bpmnElement="Event_155fi0m">
<dc:Bounds x="432" y="99" width="36" height="36" />
<bpmndi:BPMNShape id="Activity_0jj7g48_di" bpmnElement="Activity_1bv9kj9">
<dc:Bounds x="270" y="77" width="100" height="80" />
<bpmndi:BPMNLabel />
<bpmndi:BPMNEdge id="Flow_1ch9458_di" bpmnElement="Flow_1ch9458">
<di:waypoint x="215" y="117" />
<di:waypoint x="270" y="117" />
<bpmndi:BPMNEdge id="Flow_0c6v5vz_di" bpmnElement="Flow_0c6v5vz">
<di:waypoint x="370" y="117" />
<di:waypoint x="432" y="117" />
from collections import namedtuple
from grpc import UnaryUnaryClientInterceptor, StreamStreamClientInterceptor, UnaryStreamClientInterceptor, StreamUnaryClientInterceptor
import requests
import time

# Maintain the namedtuple definition for ClientCallDetails
ClientCallDetails = namedtuple('ClientCallDetails', ('method', 'timeout', 'metadata', 'credentials', 'wait_for_ready', 'compression'))

class OAuthInterceptor(UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor, StreamUnaryClientInterceptor, StreamStreamClientInterceptor):
def __init__(self, token_url, client_id, client_secret, audience):
self.token_url = token_url
self.client_id = client_id
self.client_secret = client_secret
self.audience = audience
self.token = None
self.token_expiry = None

def get_access_token(self):
"""Fetch the access token using client credentials."""
if self.token and self.token_expiry > time.time():
return self.token

print("Retrieving new token...")
payload = {
'grant_type': 'client_credentials',
'client_id': self.client_id,
'client_secret': self.client_secret,
'audience': self.audience
response =, data=payload)
response_data = response.json()
self.token = response_data['access_token']
self.token_expiry = time.time() + response_data['expires_in'] - 60 # 60 seconds leeway

return self.token

def update_metadata(self, client_call_details, token):
metadata = [('authorization', f'Bearer {token}')]
if client_call_details.metadata is not None:
# Return a new ClientCallDetails instance with updated metadata
return ClientCallDetails(

def intercept_call(self, continuation, client_call_details, request_or_iterator):
token = self.get_access_token()
new_call_details = self.update_metadata(client_call_details, token)
return continuation(new_call_details, request_or_iterator)

# Implement the intercept method for each call type using intercept_call
def intercept_unary_unary(self, continuation, client_call_details, request):
return self.intercept_call(continuation, client_call_details, request)

def intercept_unary_stream(self, continuation, client_call_details, request):
return self.intercept_call(continuation, client_call_details, request)

def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
return self.intercept_call(continuation, client_call_details, request_iterator)

def intercept_stream_stream(self, continuation, client_call_details, request_iterator):
return self.intercept_call(continuation, client_call_details, request_iterator)
import grpc
import gateway_pb2
import gateway_pb2_grpc
from oauthinterceptor import OAuthInterceptor

def run():

# OAuth Interceptor Configuration
token_url = "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token"
client_id = "zeebe"
client_secret = "NFp6GKwftJ"
audience = "zeebe-api"

# Create an instance of the OAuthInterceptor
oauth_interceptor = OAuthInterceptor(token_url, client_id, client_secret, audience)

# Add interceptor to the channel
intercept_channel = grpc.intercept_channel(
grpc.insecure_channel('localhost:26500'), oauth_interceptor)

# Now use the intercepted channel to create stubs
stub = gateway_pb2_grpc.GatewayStub(intercept_channel)

topologyResponse = stub.Topology(gateway_pb2.TopologyRequest())

tenantIds = ['custom'] # tenantIds
fileName = "example.bpmn"
with open(fileName, 'rb') as file:
bpmn_content =
# Deploy Diagram
for tenantId in tenantIds:
resource = gateway_pb2.Resource(name=fileName, content=bpmn_content)
deployResult = stub.DeployResource(gateway_pb2.DeployResourceRequest(tenantId=tenantId , resources=[resource]))

# Start Instances
for tenantId in tenantIds:
for x in range(20):
stub.CreateProcessInstance(gateway_pb2.CreateProcessInstanceRequest(tenantId = tenantId, bpmnProcessId="example", version=-1 ))

# Job worker logic (Activate and complete jobs)
job_type = 'dummy'
worker = 'python-worker'
timeout = 10000 # in milliseconds
request = gateway_pb2.ActivateJobsRequest(type=job_type, worker=worker, timeout=timeout, maxJobsToActivate=maxJobsToActivate, tenantIds=tenantIds)

while True:
for activate_response in stub.ActivateJobs(request):
for job in
print(f"Activated job {job.key}")
# Process the job here

# Complete the job
complete_request = gateway_pb2.CompleteJobRequest(jobKey=job.key, variables='{}')
print(f"Completed job {job.key}")
print("Looking for jobs again...")

if __name__ == '__main__':

