-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsales_producer.py
84 lines (65 loc) · 2.04 KB
/
sales_producer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import argparse
import atexit
import json
import logging
import random
import time
import sys
from confluent_kafka import Producer
logging.basicConfig(
format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=logging.INFO,
handlers=[
logging.FileHandler("sales_producer.log"),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger()
SELLERS = ['LNK', 'OMA', 'KC', 'DEN']
class ProducerCallback:
def __init__(self, record, log_success=False):
self.record = record
self.log_success = log_success
def __call__(self, err, msg):
if err:
logger.error('Error producing record {}'.format(self.record))
elif self.log_success:
logger.info('Produced {} to topic {} partition {} offset {}'.format(
self.record,
msg.topic(),
msg.partition(),
msg.offset()
))
def main(args):
logger.info('Starting sales producer')
conf = {
'bootstrap.servers': args.bootstrap_server,
'linger.ms': 200,
'client.id': 'sales-1',
'partitioner': 'murmur2_random'
}
producer = Producer(conf)
atexit.register(lambda p: p.flush(), producer)
i = 1
while True:
is_tenth = i % 10 == 0
sales = {
'seller_id': random.choice(SELLERS),
'amount_usd': random.randrange(100, 1000),
'sale_ts': int(time.time() * 1000)
}
producer.produce(topic=args.topic,
value=json.dumps(sales),
on_delivery=ProducerCallback(sales, log_success=is_tenth))
if is_tenth:
producer.poll(1)
time.sleep(5)
i = 0 # no need to let i grow unnecessarily large
i += 1
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--bootstrap-server', default='localhost:9092')
parser.add_argument('--topic', default='sales-usd')
args = parser.parse_args()
main(args)