-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkafka_topic_creator.py
79 lines (69 loc) · 3.12 KB
/
kafka_topic_creator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
from google.oauth2 import service_account
from getfilelistpy import getfilelist
from kafka.admin import KafkaAdminClient, NewTopic
from utility import space_remover
from kafka import errors
import os
from dotenv import load_dotenv
# initial loading of env variables
load_dotenv() #load all env variables from .env file
class KafkaTopicCreator:
def __init__(self, kafka_admin_client : KafkaAdminClient):
"""[Initialize the Kafka connection]
Args:
kafka_admin_client ([KafkaAdminClient]): [Needs object of Kafka Admin consist of bootstrap servers]
"""
self.kafka_admin_client = kafka_admin_client
def get_list_of_sheets(self,dir_id:str) -> list:
"""[It checks the list of files in google drive and returns the list of sheet names]
Args:
dir_id ([string]): [The id for the directory from which we need to take sheets]
Returns:
[list]: [list of sheets in directory]
"""
try:
SCOPES = ['https://www.googleapis.com/auth/drive']
SERVICE_ACCOUNT_FILE = os.getenv('SERVICE_FILE_NAME_JSON')
credentials = service_account.Credentials.from_service_account_file(SERVICE_ACCOUNT_FILE, scopes=SCOPES)
resource = {
"service_account": credentials,
"id": dir_id,
"fields": "files(name)",
}
res = getfilelist.GetFileList(resource) # or r = getfilelist.GetFolderTree(resource)
files=res["fileList"][0]["files"]
# The files consist a list of dictionary for all files in directory
sheets = [ each["name"] for each in files if "sheet" in each["name"].lower() ]
return sheets
except Exception as e:
print("Process stopped as ",e)
def create_topics(self,sheet_names:list):
"""[Create topics with names provided in list]
Args:
sheet_names ([list]): [It consist list of sheet names as str]
"""
try:
topic_list = [ NewTopic(name=sheet, num_partitions=1, replication_factor=1) for sheet in sheet_names]
self.kafka_admin_client.create_topics(new_topics=topic_list, validate_only=False)
except errors.TopicAlreadyExistsError as e:
print(e)
pass
except Exception as e:
print("The program stopped as ", e)
pass
if __name__ == "__main__":
dir_id = os.getenv("DIR_ID") # google drive directory id
kafka_admin_client = KafkaAdminClient(
bootstrap_servers=os.getenv("BOOTSTRAP_SERVERS"),
client_id=os.getenv("CLIENT_ID")
)
kafka_topic_creator = KafkaTopicCreator(kafka_admin_client)
sheet_names = kafka_topic_creator.get_list_of_sheets(dir_id)
sheet_names = space_remover(sheet_names)
existing_topics = kafka_admin_client.list_topics()
new_sheets = set(sheet_names) - set(existing_topics) # need to create only new topics
if new_sheets.__len__() !=0 :
kafka_topic_creator.create_topics(new_sheets)
print("print created topics for ", new_sheets)
else:
print("no new sheets")