From 2123e0b91755461352beecdbeefe083a401ab044 Mon Sep 17 00:00:00 2001
From: Mark Riddoch <mark@dianomic.com>
Date: Wed, 11 Sep 2019 10:26:09 +0100
Subject: [PATCH] FOGL-3204 Fix delays in sending data from south service and
 storage layer (#1699)

* FOGL-3204 Add instrumentation

* FOGL-3204 Added instrumentation to the storage service and updated logic
of service registry queue

* FOGL-3204 Additional instrumentation and calculate time =put f south
south to preserve latentcy request

* Fix issue with enpty queue

* FOGL-3204 Make reporting of queue times switchable

* FOGL-3204 Make REGISTRY_SLEEP_TIME a #define
---
 C/services/south/ingest.cpp                   | 39 +++++++++++++++++--
 C/services/storage/include/storage_registry.h |  4 +-
 C/services/storage/storage_registry.cpp       | 31 +++++++++++++--
 3 files changed, 67 insertions(+), 7 deletions(-)

diff --git a/C/services/south/ingest.cpp b/C/services/south/ingest.cpp
index d549357d05..288f6e5708 100644
--- a/C/services/south/ingest.cpp
+++ b/C/services/south/ingest.cpp
@@ -289,8 +289,25 @@ void Ingest::waitForQueue()
 {
 	mutex mtx;
 	unique_lock<mutex> lck(mtx);
-	if (m_running)
-		m_cv.wait_for(lck,chrono::milliseconds(m_timeout));
+	if (m_running && m_queue->size() < m_queueSizeThreshold)
+	{
+		// Work out how long to wait based on age of oldest queued reading
+		long timeout = m_timeout;
+		if (!m_queue->empty())
+		{
+			Reading *reading = (*m_queue)[0];
+			struct timeval tm, now;
+			reading->getUserTimestamp(&tm);
+			gettimeofday(&now, NULL);
+			long ageMS = (now.tv_sec - tm.tv_sec) * 1000 +
+				(now.tv_usec - tm.tv_usec) / 1000;
+			timeout = m_timeout - ageMS;
+		}
+		if (timeout > 0)
+		{
+			m_cv.wait_for(lck,chrono::milliseconds(timeout));
+		}
+	}
 }
 
 /**
@@ -377,6 +394,22 @@ vector<Reading *>* newQ = new vector<Reading *>();
 		}
 		++statsEntriesCurrQueue[reading->getAssetName()];
 	}
+
+	/*
+	 * Check the first reading in the list to see if we are meeting the
+	 * latency configuration we have been set
+	 */
+	const vector<Reading *>::const_iterator itr = m_data->cbegin();
+	if (itr != m_data->cend())
+	{
+		const Reading *firstReading = *itr;
+		time_t now = time(0);
+		unsigned long latency = now - firstReading->getUserTimestamp();
+		if (latency > m_timeout / 1000)	// m_timeout is in milliseconds
+		{
+			m_logger->warn("Current send latency of %d seconds exceeds requested maximum latency of %d seconds", latency, m_timeout);
+		}
+	}
 		
 	/**
 	 * 'm_data' vector is ready to be sent to storage service.
@@ -396,7 +429,7 @@ vector<Reading *>* newQ = new vector<Reading *>();
 		m_logger->error("Failed to write readings to storage layer, buffering");
 		lock_guard<mutex> guard(m_qMutex);
 
-		// BUffer current data in m_data
+		// Buffer current data in m_data
 		m_queue->insert(m_queue->begin(),
 				m_data->begin(),
 				m_data->end());
diff --git a/C/services/storage/include/storage_registry.h b/C/services/storage/include/storage_registry.h
index e9707566f9..fa3b9ddb6a 100644
--- a/C/services/storage/include/storage_registry.h
+++ b/C/services/storage/include/storage_registry.h
@@ -27,8 +27,10 @@ class StorageRegistry {
 		void		processPayload(char *payload);
 		void		sendPayload(const std::string& url, char *payload);
 		void		filterPayload(const std::string& url, char *payload, const std::string& asset);
+		typedef 	std::pair<time_t, char *> Item;
 		REGISTRY			m_registrations;
-		std::queue<char *>		m_queue;
+		std::queue<StorageRegistry::Item>
+						m_queue;
 		std::mutex			m_qMutex;
 		std::thread			*m_thread;
 		std::condition_variable		m_cv;
diff --git a/C/services/storage/storage_registry.cpp b/C/services/storage/storage_registry.cpp
index d1650674e7..71fdadfe6b 100644
--- a/C/services/storage/storage_registry.cpp
+++ b/C/services/storage/storage_registry.cpp
@@ -17,6 +17,13 @@
 #include "logger.h"
 #include "strings.h"
 #include "client_http.hpp"
+#include <chrono>
+
+#define CHECK_QTIMES	0	// Turn on to check length of time data is queued
+#define QTIME_THRESHOLD 3	// Threshold to report long queue times
+
+#define REGISTRY_SLEEP_TIME	5	// Time to sleep in the register process thread
+					// between checks for chutdown
 
 using namespace std;
 using namespace rapidjson;
@@ -78,8 +85,10 @@ StorageRegistry::process(const string& payload)
 		char *data = NULL;
 		if ((data = strdup(payload.c_str())) != NULL)
 		{
+			time_t now = time(0);
+			Item item = make_pair(now, data);
 			lock_guard<mutex> guard(m_qMutex);
-			m_queue.push(data);
+			m_queue.push(item);
 			m_cv.notify_all();
 		}
 	}
@@ -132,14 +141,30 @@ StorageRegistry::run()
 	while (m_running)
 	{
 		char *data = NULL;
+		time_t qTime;
 		{
 			unique_lock<mutex> mlock(m_cvMutex);
-			m_cv.wait(mlock);
-			data = m_queue.front();
+			while (m_queue.size() == 0)
+			{
+				m_cv.wait_for(mlock, std::chrono::seconds(REGISTRY_SLEEP_TIME));
+				if (!m_running)
+				{
+					return;
+				}
+			}
+			Item item = m_queue.front();
 			m_queue.pop();
+			data = item.second;
+			qTime = item.first;
 		}
 		if (data)
 		{
+#if CHECK_QTIMES
+			if (time(0) - qTime > QTIME_THRESHOLD)
+			{
+				Logger::getLogger()->error("Data has been queued for %d seconds to be sent to registered party", (time(0) - qTime));
+			}
+#endif
 			processPayload(data);
 			free(data);
 		}