-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathinflux.c
288 lines (222 loc) · 7.98 KB
/
influx.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <curl/curl.h>
#include "corrspec.h"
#include "influx.h"
#define MAXLENGTH 16
//the returned data
//double influx_return;
influxStruct *influxReturn=NULL;
void extract_all_keyword_values(const char *source, const char *keyword) {
const char *found = source;
char result[64];
int result_size = sizeof(result);
int i=0;
char ID[32]="";
// Loop until no more keywords are found
while ((found = strstr(found, keyword)) != NULL) {
// Move the pointer to the position after the keyword
found += strlen(keyword);
// Check if the next character is a quote
if (*found == '"') {
found++; // Move past the starting quote
// Find the ending quote
const char *end = strchr(found, '"');
if (end) {
// Calculate the length of the string inside the quotes
size_t length = end - found;
// Ensure we don't copy more than result_size - 1 characters
if (length >= result_size) {
length = result_size - 1;
}
// Copy the string inside the quotes to the result buffer
strncpy(result, found, length);
result[length] = '\0'; // Null-terminate the result
// Print the extracted value
memset(ID, '\0', sizeof(ID));
strncpy(ID, result, strlen(result));
strncpy(influxReturn->name[i], ID, sizeof(ID));
//printf("Extracted value: %d %s\n", i, result);
//printf("stored value: %d %s\n", i, influxReturn->name[i]);
}
}
i++;
}
}
int countString(const char *haystack, const char *needle){
int count = 0;
const char *tmp = haystack;
while(tmp = strstr(tmp, needle))
{
count++;
tmp++;
}
return count;
}
// extract substring starting from nth occurence of start_str to end_char
void extract_substring(char *source, char *start_str, char end_char, int occurrence, char *result) {
char *start_pos = source;
for (int i = 0; i < occurrence; i++) {
start_pos = strstr(start_pos, start_str);
if (start_pos == NULL) {
// nth occurrence of starting string not found
result[0] = '\0';
return;
}
start_pos += strlen(start_str); // Move to the end of the current occurrence
}
char *end_pos = strchr(start_pos, end_char);
if (end_pos == NULL) {
// Ending character not found
result[0] = '\0';
return;
}
size_t length = end_pos - start_pos;
strncpy(result, start_pos, length);
result[length] = '\0'; // Null-terminate the result
}
// Callback function to handle the response from the InfluxDB server
size_t write_callback(char *contents, size_t size, size_t nmemb, void *userp) {
//DEBUG
//printf("%s\n", contents);
size_t realsize = size * nmemb;
// Parser column indicies
int time_indx = 0; // column containing time
int scan_indx = 0; // column containing scanID
int text_indx = 0; // column containing any text values (TARGET NAME)
int name_indx = 0; // column containing scanID
int pos = 0; //token count
char *token;
char ID[64]="";
char data[64]="";
// vars to extract number of columns substring
char *start_str = "\"columns\":\[";
char *start_str2 = "\"values\":\[\[";
char end_char = ']';
char result[256];
int nmeas = 0; // num measurements
// ACS3_DEV4_VQlo, ACS3_DEV4_VQhi, ACS3_DEV4_VIlo, ACS3_DEV4_VIhi would be meas=4
// udpPointing would be meas=1
// HK_TEMP11 would be meas=1
int ncols = 0; // num influx columns
// "time", "DEC", "RA", "scanID"
// "time", "temp"
// "time", "scanID", "volt"
//get the number of measurements
nmeas = countString(contents, "name");
//get the number of columns
extract_substring(contents, start_str, end_char, 1, result);
ncols = countString(result, ",") + 1;
// find the dynamically reported time and scanID columns
token = strtok(result, ",");
while (token != NULL ){
memset(ID, '\0', sizeof(ID));
strncpy(ID, token+1, strlen(token)-2);
if(!strcmp(ID, "time"))
time_indx = pos;
if(!strcmp(ID, "scanID"))
scan_indx = pos;
if(!strcmp(ID, "TARGET"))
text_indx = pos;
token = strtok(NULL, ",");
pos++;
}
// dynamically allocate structure
influxReturn = malloc(sizeof(*influxReturn));
//influxReturn->length = nmeas * (ncols-2); //edit: explicitly use length, reuse length var
influxReturn->length = nmeas ; //edit: explicitly use length, reuse length var
influxReturn->value = (float *)malloc(nmeas*(ncols-2) * sizeof(float));
influxReturn->name = (char **)malloc(nmeas*(ncols-2) * sizeof(char *));
for(int i=0; i<nmeas*(ncols-2); i++)
influxReturn->name[i] = (char *)malloc(64*sizeof(char));
// load names into allocated space for names
extract_all_keyword_values(contents, "name\":");
// Set all struct values to 0, just in case we don't get any returns from influxDB
//memset(&influxReturn, 0, sizeof(influxReturn));
//for (int i=0; i<influxReturn->length; i++){ //edit: explicitly use length, reuse length var
for (int i=0; i<(nmeas * (ncols-2)); i++){
influxReturn->value[i] = 0.;
}
// Parse the InfluxDB return string
int data_indx=0;
for(int i=0; i<nmeas; i++){
extract_substring(contents, start_str2, end_char, i+1, result);
pos=0;
token = strtok(result, ",");
while (token != NULL ){
if(pos == time_indx){
memset(ID, '\0', sizeof(ID));
strncpy(ID, token+1, strlen(token)-2);
strcpy(influxReturn->time, ID);
//DEBUG
//printf("time string = %s\n", ID);
}
else if(pos == scan_indx){
memset(ID, '\0', sizeof(ID));
strncpy(ID, token+1, strlen(token)-2);
influxReturn->scanID = atoi(ID);
//DEBUG
//printf("scanID string = %s\n", ID);
}
else if(pos == text_indx){
memset(ID, '\0', sizeof(ID));
strncpy(ID, token+1, strlen(token)-2);
strcpy(influxReturn->text, ID);
//DEBUG
//printf("text string = %s\n", ID);
}
else{
memset(data, '\0', sizeof(data));
strncpy(data, token, strlen(token));
influxReturn->value[data_indx] = atof(data);
//DEBUG
//printf("data string %d = %s\n", data_indx, data);
data_indx++;
}
pos++;
token = strtok(NULL, ",");
}
}
//DEBUG
//printf("\n");
return realsize;
}
void freeinfluxStruct(influxStruct *influxReturn) {
free(influxReturn->value);
free(influxReturn->name);
free(influxReturn);
}
// worker for the Influx DB query
// Takes: curl handle
// Operates: makes the function callback( )
influxStruct* influxWorker(CURL *curl, char *query)
{
CURLcode res;
if (curl) {
// Set the InfluxDB URL
curl_easy_setopt(curl, CURLOPT_URL, INFLUXDB_URL);
// Set the POST data (query)
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, query);
// Set the callback function to handle the response
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_callback);
// Make the HTTP POST request
res = curl_easy_perform(curl);
}
// Cleanup Influx DB
curl_easy_cleanup(curl);
curl_global_cleanup();
//return influx_return; //return the value
return influxReturn; //return the struct
}
// Initialization function to connect to the Influx DB.
// Returns: The curl handle to pass to the handler function
CURL *init_influx()
{
CURL *curl;
// Initialize libcurl
curl_global_init(CURL_GLOBAL_DEFAULT);
// Create a CURL handle
curl = curl_easy_init();
return curl;
}