-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path11) Database_Apriori.py
95 lines (76 loc) · 3.14 KB
/
11) Database_Apriori.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
from kafka import KafkaConsumer
import json
from collections import defaultdict
from itertools import combinations
from pymongo import MongoClient
# Initialize MongoDB client and connect to the database
client = MongoClient('localhost', 27017)
db = client['assignment']
collection = db['frequent_itemsets']
def generate_candidates(prev_freq_itemsets, k):
candidates = set()
for itemset1 in prev_freq_itemsets:
for itemset2 in prev_freq_itemsets:
if len(itemset1.union(itemset2)) == k:
candidates.add(itemset1.union(itemset2))
return candidates
def prune_itemsets(candidates, prev_freq_itemsets, k):
pruned_itemsets = set()
for candidate in candidates:
subsets = combinations(candidate, k-1)
is_valid = True
for subset in subsets:
if frozenset(subset) not in prev_freq_itemsets:
is_valid = False
break
if is_valid:
pruned_itemsets.add(candidate)
return pruned_itemsets
# Function to calculate support count for each itemset
def calculate_support(data, itemsets):
support_count = defaultdict(int)
for transaction in data:
for itemset in itemsets:
if itemset.issubset(transaction):
support_count[itemset] += 1
return support_count
# Function to generate frequent itemsets
def generate_frequent_itemsets(data, min_support):
itemsets = [frozenset([item]) for transaction in data for item in transaction]
freq_itemsets = []
k = 1
while itemsets:
# Calculate support count for each itemset
support_count = calculate_support(data, itemsets)
# Filter itemsets based on min_support
freq_itemsets.extend([itemset for itemset, support in support_count.items() if support >= min_support])
# Generate candidate itemsets
candidates = generate_candidates(set(freq_itemsets), k+1)
# Prune candidate itemsets
itemsets = prune_itemsets(candidates, set(freq_itemsets), k+1)
k += 1
return freq_itemsets
def insert_frequent_itemsets(frequent_itemsets, data):
for itemset in frequent_itemsets:
items = [str(data[column]) for column in itemset if column in data]
if items:
filtered_items = [item for item in items if not (item.startswith('http') or '<' in item or '>' in item)]
if filtered_items:
collection.insert_one({"itemset": filtered_items[0]})
def process_message(message, window_size, min_support):
global transaction_window
data = message.value
transaction_window.append(data)
if len(transaction_window) >= window_size:
del transaction_window[0]
frequent_itemsets = generate_frequent_itemsets(transaction_window, min_support)
insert_frequent_itemsets(frequent_itemsets, data)
topic_name = 'assignment'
window_size = 100
min_support = 2
transaction_window = []
consumer = KafkaConsumer(topic_name, bootstrap_servers='localhost:9092',
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
# Consume messages
for message in consumer:
process_message(message, window_size, min_support)