forked from dm4p385/iot-smartcities
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathparking_manager.py
87 lines (81 loc) · 3.61 KB
/
parking_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
from azure.digitaltwins.core import DigitalTwinsClient
from azure.identity import DefaultAzureCredential
import random
# DefaultAzureCredential supports different authentication mechanisms and determines the appropriate credential type based of the environment it is executing in.
# It attempts to use multiple credential types in an order until it finds a working credential.
# - AZURE_URL: The URL to the ADT in Azure
url = "https://IoT-1.api.wcus.digitaltwins.azure.net"
# DefaultAzureCredential expects the following three environment variables:
# - AZURE_TENANT_ID: The tenant ID in Azure Active Directory
# - AZURE_CLIENT_ID: The application (client) ID registered in the AAD tenant
# - AZURE_CLIENT_SECRET: The client secret for the registered application
credential = DefaultAzureCredential()
service_client = DigitalTwinsClient(url, credential)
query_expression = 'SELECT * FROM digitaltwins'
query_result = service_client.query_twins(query_expression)
print('DigitalTwins:')
random_number = random.randint(1, 20)
number_of_cars = []
print("Random Number:", random_number)
spotStatus = ['Reserved', 'Occupied', 'Vacant']
for key, twin in enumerate(query_result):
relationships = service_client.list_relationships(twin["$dtId"])
print(twin["$dtId"])
for relationship in relationships:
print(relationship)
service_client.delete_relationship(twin["$dtId"],relationship["$relationshipId"])
print("Vehicle" in twin["$dtId"])
if "Vehicle" in twin["$dtId"]:
lol = service_client.delete_digital_twin(twin["$dtId"])
for i in range(1,2):
digital_twin_id = f'ParkingSpot{i}'
temporary_twin = {
"$metadata": {
"$model": "dtmi:com:adt:dtsample:vehicle;1"
},
"id": digital_twin_id,
"name": f"Spot {i}",
"status": random.choice(list(spotStatus))
}
created_twin = service_client.upsert_digital_twin(digital_twin_id, temporary_twin)
print(f'Created Digital Twin {i}:')
print(created_twin)
roads = ['RoadA', 'RoadB', 'RoadC']
random_road = random.choice(roads)
print("Random Road:", random_road)
myRelationship = {
"$relationshipId": f"RoadVehicle {i}",
"$sourceId": random_road ,
"$relationshipName": "is_on",
"$targetId": f"Vehicle{i}"
}
print(myRelationship)
service_client.upsert_relationship(
myRelationship["$sourceId"],
myRelationship["$relationshipId"],
myRelationship
)
for road in roads:
relationships = service_client.list_relationships(road)
relationships_list = list(relationships)
print("Number of Relationships:", len(relationships_list))
number_of_cars.append(len(relationships_list))
max_value = max(number_of_cars)
max_indices = [i for i, value in enumerate(number_of_cars) if value == max_value]
if len(max_indices) > 1:
selected_road = random.choice(max_indices)
print(f"Traffic light for road {chr(ord('A') + selected_road)} has turned green")
else:
max_index = number_of_cars.index(max_value)
print(f"Traffic light for road {chr(ord('A') + max_index)} has turned green")
# for
# if key <= 6:
# relationships = service_client.list_relationships(twin['$dtId'])
# for relationship in relationships:
# print(relationship)
# x = service_client.delete_relationship(relationship['$sourceId'], relationship['$relationshipId'])
# service_client.delete_digital_twin(twin['$dtId'])
# service_client.delete_digital_twin("RoadSeg1")
# service_client.delete_digital_twin("Road2")
# service_client.delete_digital_twin("RoadSeg2")
# service_client.delete_digital_twin("vehicle1")