forked from waggle-sensor/beehive-server
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconfig.py
executable file
·179 lines (118 loc) · 5.8 KB
/
config.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
#!/usr/bin/env python
import sys, os, StringIO, ConfigParser, logging, pika, ssl, re
import time, datetime
CONFIG_FILE="/etc/waggle/beehive-server.cfg"
loglevel=logging.DEBUG
LOG_FILENAME="/var/log/waggle/communicator/beehive-server.log"
LOG_FORMAT='%(asctime)s - %(name)s - %(levelname)s - line=%(lineno)d - %(message)s'
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
root_logger = logging.getLogger()
root_logger.setLevel(loglevel)
formatter = logging.Formatter(LOG_FORMAT)
handler = logging.StreamHandler(stream=sys.stdout)
handler.setFormatter(formatter)
root_logger.handlers = []
root_logger.addHandler(handler)
# log rotate will be activated in main method
def read_file( str ):
if not os.path.isfile(str) :
return ""
with open(str,'r') as file_:
return file_.read().strip()
return ""
def read_value(key, defaultval):
value=None
try:
value=my_config.get("root", key)
except ConfigParser.NoOptionError:
value=""
if not value:
return defaultval
return value
ini_str = '[root]\n'
if os.path.isfile(CONFIG_FILE):
ini_str = ini_str + open(CONFIG_FILE, 'r').read()
ini_fp = StringIO.StringIO(ini_str)
my_config = ConfigParser.RawConfigParser()
my_config.readfp(ini_fp)
RABBITMQ_HOST=read_value("rabbitmq-host", "rabbitmq")
logger.info("RABBITMQ_HOST: %s" %(RABBITMQ_HOST))
CASSANDRA_HOST=read_value("cassandra-host", "cassandra")
logger.info("CASSANDRA_HOST: %s" %(CASSANDRA_HOST))
### RabbitMQ ###
USE_SSL=True
RABBITMQ_PORT=23181
# Beehive server has needs client certificates for RabbitMQ
CLIENT_KEY_FILE="/usr/lib/waggle/SSL/beehive-server/key.pem"
CLIENT_CERT_FILE="/usr/lib/waggle/SSL/beehive-server/cert.pem"
CA_ROOT_FILE="/usr/lib/waggle/SSL/waggleca/cacert.pem"
pika_credentials = pika.PlainCredentials('server', 'waggle')
pika_params=pika.ConnectionParameters( host=RABBITMQ_HOST,
credentials=pika_credentials,
virtual_host='/',
port=RABBITMQ_PORT,
ssl=USE_SSL,
ssl_options={"ca_certs": CA_ROOT_FILE , 'certfile': CLIENT_CERT_FILE, 'keyfile': CLIENT_KEY_FILE, 'cert_reqs' : ssl.CERT_REQUIRED}
)
### Cassandra ###
# Note: Cassandra tables are created in Server.py
keyspace_cql = '''CREATE KEYSPACE IF NOT EXISTS waggle WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '2'} AND durable_writes = true;'''
type_plugin_sql = '''CREATE TYPE IF NOT EXISTS waggle.plugin (
name ascii,
version int,
instance ascii # optional, should be "default" if not specified otherwise
);'''
type_plugin_sql = re.sub('[ ]*#.*', '', type_plugin_sql)
type_plugin_sql = type_plugin_sql.replace('\n', ' ').replace('\r', '')
nodes_cql = '''CREATE TABLE IF NOT EXISTS waggle.nodes (
node_id ascii,
timestamp timestamp,
queue ascii, # provided by the NC registration
plugins_currently set<frozen <plugin>>, # provided by either Server (filter messages) of plugin registration
plugins_all set<frozen <plugin>>, # provided by either Server (filter messages) of plugin registration
reverse_port int, # provided by the NC registration
name ascii, # descriptive name provided by the NC
parent ascii, # guest nodes sends registration message (NC modifies message and adds itself as parent)
children list<ascii>, # guest nodes sends registration message
PRIMARY KEY (node_id)
);'''
# remove comments
nodes_cql = re.sub('[ ]*#.*', '', nodes_cql)
# remove line breaks
nodes_cql = nodes_cql.replace('\n', ' ').replace('\r', '')
# event is "register" or "deregister"
# deregister event can have empty values everywhere.
registration_log_cql = '''CREATE TABLE IF NOT EXISTS waggle.registration_log (
node_id ascii,
timestamp timestamp,
event ascii,
queue ascii,
plugins list <frozen <plugin>>,
reverse_port int,
name ascii,
parent ascii,
children list<ascii>,
PRIMARY KEY (node_id, timestamp, event)
);'''
registration_log_cql = registration_log_cql.replace('\n', ' ').replace('\r', '')
sensor_data_cql = '''CREATE TABLE IF NOT EXISTS waggle.sensor_data (
node_id ascii,
date ascii,
plugin_id ascii,
plugin_version int,
plugin_instance ascii,
timestamp timestamp,
sensor ascii,
sensor_meta ascii, # pointer to data description (syntax, semantics and other metadata (accuracy, configuration))
data list<ascii>,
PRIMARY KEY ((node_id, date), plugin_id, plugin_version, plugin_instance, timestamp, sensor)
);'''
sensor_data_cql = re.sub('[ ]*#.*', '', sensor_data_cql)
sensor_data_cql = sensor_data_cql.replace('\n', ' ').replace('\r', '')
def unix_time(dt):
epoch = datetime.datetime.utcfromtimestamp(0)
delta = dt - epoch
return delta.total_seconds()
def unix_time_millis(dt):
return long(unix_time(dt) * 1000.0)