-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMessageGetter.cpp
145 lines (133 loc) · 3.92 KB
/
MessageGetter.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#include "MessageGetter.h"
MessageGetter::MessageGetter()//constructor
{
}
void MessageGetter::Init(Globals* _global)
{
global = _global;
pthread_attr_t tthr;
pthread_t thrid;
int ret;
/* initialize an attribute to the default value */
ret = pthread_attr_init(&tthr);
ret = pthread_create(&thrid, &tthr, MessageGetter::FileRead, this);
if (ret != 0 && thrid != 0)
printf("\nFileRead Thread can't be created :[%s]", strerror(thrid));
}
int MessageGetter::strcasecmp_withNumbers(const void* void_a, const void* void_b)
{
const char* a = (const char*)void_a;
const char* b = (const char*)void_b;
if (!a || !b)
{ // if one doesn't exist, other wins by default
return a ? 1 : b ? -1 : 0;
}
if (isdigit(*a) && isdigit(*b))
{ // if both start with numbers
char* remainderA;
char* remainderB;
long valA = strtol(a, &remainderA, 10);
long valB = strtol(b, &remainderB, 10);
if (valA != valB)
return valA - valB;
// if you wish 7 == 007, comment out the next two lines
else if (remainderB - b != remainderA - a) // equal with diff lengths
return (remainderB - b) - (remainderA - a); // set 007 before 7
else // if numerical parts equal, recurse
return strcasecmp_withNumbers(remainderA, remainderB);
}
if (isdigit(*a) || isdigit(*b))
{ // if just one is a number
return isdigit(*a) ? -1 : 1; // numbers always come first
}
while (*a && *b)
{ // non-numeric characters
if (isdigit(*a) || isdigit(*b))
return strcasecmp_withNumbers(a, b); // recurse
if (tolower(*a) != tolower(*b))
return tolower(*a) - tolower(*b);
a++;
b++;
}
return *a ? 1 : *b ? -1 : 0;
}
bool MessageGetter::MyDataSortPredicate(const string& d1, const string& d2)
{
return strcasecmp_withNumbers(d1.c_str(), d2.c_str()) < 0;
}
void* MessageGetter::FileRead(void* arg)
{
while (true)
{
MessageGetter* pMessageGetter = (MessageGetter*)(arg);
cout << "File Reading Starts";
struct stat sb;
char* linkname;
ssize_t r;
std::string path = "/home/nio/projects/MessageProcessor/bin/x64/Debug/MessageFiles";
//std::string path = "/home/computer/Project/Pcap/udp_pcap";
DIR* dir;
struct dirent* ent;
vector<string>dirlist;
if ((dir = opendir(path.c_str())) != NULL)
{
/* print all the files and directories within directory */
while ((ent = readdir(dir)) != NULL)
{
if (strcmp(ent->d_name, ".") != 0 && strcmp(ent->d_name, "..") != 0)
{
dirlist.push_back(string(ent->d_name));
}
}
closedir(dir);
std::sort(dirlist.begin(), dirlist.end(), MyDataSortPredicate);
for (int i = 0; i < dirlist.size(); i++)
{
string slist = path + "/" + dirlist[i];
cout << "Current processing file " << slist.c_str();
//Files Read Thread
FILE* inf;
struct Message inp;
inf = fopen(slist.c_str(), "rb");
if (inf == NULL) {
fprintf(stderr, "\nError to open the file\n");
exit(1);
}
if (strcmp(string("Out.txt").c_str(), dirlist[i].c_str()) == 0)//Ignore out file
{
break;
}
fseek(inf, 0, SEEK_SET);
while (!feof(inf))//
{
int readcount = fread(&inp, Globals::MessageLen, 1, inf);
printf("SeqNo:", inp.SeqNo);
pMessageGetter->global->MesgCount++;
if (pMessageGetter->global->MesgCount > pMessageGetter->MesgRate)//MessageRate Per second
{
//Insert in to hold queue
pMessageGetter->global->HoldQueue.push(inp);
}
else
{
//Process message
//Inserts in to queue
pMessageGetter->global->ReadQueue.push(inp);
}
//printf("roll_no = %d name = %s\n", inp.roll_no, inp.name);
}
fclose(inf);
cout << "Finish processing file " << slist.c_str();
}
cout << "All files processed successfully";
}
else
{
/* could not open directory */
perror("MessageProcess FileRead::");
// return EXIT_FAILURE;
}
sleep(5);
break;
}
}