Skip to content

Commit

Permalink
........S. [ZBX-24549] improved Elasticsearch performance by querying…
Browse files Browse the repository at this point in the history
… in time intervals and not calling index refresh; added slow query log; fixed queries not working with 8.16.1
  • Loading branch information
sokurenko authored and Yur11 committed Jan 10, 2025
1 parent 9a43439 commit d57dcce
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 33 deletions.
1 change: 1 addition & 0 deletions ChangeLog.d/bugfix/ZBX-24549
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
........S. [ZBX-24549] improved Elasticsearch performance by querying in time intervals and not calling index refresh; added slow query log; fixed queries not working with 8.16.1 (vso)
4 changes: 2 additions & 2 deletions include/zbx_dbversion_constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@

#define ZBX_ELASTIC_MIN_VERSION 70000
#define ZBX_ELASTIC_MIN_VERSION_STR "7.x"
#define ZBX_ELASTIC_MAX_VERSION 79999
#define ZBX_ELASTIC_MAX_VERSION_STR ZBX_ELASTIC_MIN_VERSION_STR
#define ZBX_ELASTIC_MAX_VERSION 89999
#define ZBX_ELASTIC_MAX_VERSION_STR "8.x"

#define ZBX_TIMESCALE_MIN_VERSION 20001
#define ZBX_TIMESCALE_MIN_VERSION_STR "2.0.1"
Expand Down
2 changes: 1 addition & 1 deletion include/zbxhistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ void zbx_history_value2variant(const zbx_history_value_t *value, unsigned char v
#define zbx_history_record_vector_create(vector) zbx_vector_history_record_create(vector)

int zbx_history_init(const char *config_history_storage_url, const char *config_history_storage_opts,
char **error);
int config_log_slow_queries, char **error);
void zbx_history_destroy(void);

typedef struct
Expand Down
6 changes: 3 additions & 3 deletions src/libs/zbxcachevalue/valuecache.c
Original file line number Diff line number Diff line change
Expand Up @@ -2046,7 +2046,7 @@ static int vch_item_cache_values_by_time_and_count(zbx_vc_item_t **item, int ran
return SUCCEED;

/* find if the cache should be updated to cover the required count */
if (NULL != (*item)->head)
if (0 != (*item)->db_cached_from && NULL != (*item)->head)
{
zbx_vc_chunk_t *chunk;
int index;
Expand All @@ -2065,7 +2065,7 @@ static int vch_item_cache_values_by_time_and_count(zbx_vc_item_t **item, int ran
return SUCCEED;

/* get the end timestamp to which (including) the values should be cached */
if (NULL != (*item)->head)
if (0 != (*item)->db_cached_from && NULL != (*item)->head)
range_end = (*item)->tail->slots[(*item)->tail->first_value].timestamp.sec - 1;
else
range_end = ZBX_JAN_2038;
Expand Down Expand Up @@ -2549,7 +2549,7 @@ int zbx_vc_add_values(zbx_vector_dc_history_ptr_t *history, int *ret_flush, int
}

/* cache new values only after the item history database status is known */
if (NULL != item && (ZBX_ITEM_STATUS_CACHED_ALL == item->status || 0 != item->db_cached_from))
if (NULL != item)
{
zbx_history_record_t record = {h->ts, h->value};
zbx_vc_chunk_t *head = item->head;
Expand Down
7 changes: 5 additions & 2 deletions src/libs/zbxhistory/history.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ zbx_history_iface_t history_ifaces[ITEM_VALUE_TYPE_BIN + 1];
* *
************************************************************************************/
int zbx_history_init(const char *config_history_storage_url, const char *config_history_storage_opts,
char **error)
int config_log_slow_queries, char **error)
{
/* TODO: support per value type specific configuration */

Expand All @@ -64,8 +64,11 @@ int zbx_history_init(const char *config_history_storage_url, const char *config_
return FAIL;
}

if (FAIL == zbx_history_elastic_init(&history_ifaces[i], i, config_history_storage_url, error))
if (FAIL == zbx_history_elastic_init(&history_ifaces[i], i, config_history_storage_url,
config_log_slow_queries, error))
{
return FAIL;
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/libs/zbxhistory/history.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,15 @@ struct zbx_history_iface
zbx_history_add_values_func_t add_values;
zbx_history_get_values_func_t get_values;
zbx_history_flush_func_t flush;
int config_log_slow_queries;
};

/* SQL hist */
void zbx_history_sql_init(zbx_history_iface_t *hist, unsigned char value_type);

/* elastic hist */
int zbx_history_elastic_init(zbx_history_iface_t *hist, unsigned char value_type,
const char *config_history_storage_url, char **error);
const char *config_history_storage_url, int config_log_slow_queries, char **error);
void zbx_elastic_version_extract(struct zbx_json *json, int *result, int config_allow_unsupported_db_versions,
const char *config_history_storage_url);
zbx_uint32_t zbx_elastic_version_get(void);
Expand Down
180 changes: 160 additions & 20 deletions src/libs/zbxhistory/history_elastic.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
#include "zbxtime.h"
#include "zbxalgo.h"
#include "zbxdb.h"
#include "zbxjson.h"
#include "zbxstr.h"
#include "zbxnum.h"
#include "zbxtime.h"
#include "zbxvariant.h"
#include "zbxcurl.h"
#include "zbxjson.h"
#include "zbxcacheconfig.h"

#define ZBX_HISTORY_STORAGE_DOWN 10000 /* Timeout in milliseconds */

Expand Down Expand Up @@ -626,7 +628,7 @@ static void elastic_destroy(zbx_history_iface_t *hist)
* Parameters: hist - [IN] the history storage interface *
* itemid - [IN] the itemid *
* start - [IN] the period start timestamp *
* count - [IN] the number of values to read *
* count - [IN/OUT] the number of values to read *
* end - [IN] the period end timestamp *
* values - [OUT] the item history data values *
* *
Expand All @@ -637,19 +639,32 @@ static void elastic_destroy(zbx_history_iface_t *hist)
* all values from the specified interval if count is zero. *
* *
************************************************************************************/
static int elastic_get_values(zbx_history_iface_t *hist, zbx_uint64_t itemid, int start, int count, int end,
zbx_vector_history_record_t *values)
static int elastic_get_values_for_period(zbx_history_iface_t *hist, zbx_uint64_t itemid, time_t start, int *count,
time_t end, zbx_vector_history_record_t *values)
{
zbx_elastic_data_t *data = hist->data.elastic_data;
size_t url_alloc = 0, url_offset = 0, id_alloc = 0, scroll_alloc = 0, scroll_offset = 0;
int total, empty, ret;
int empty, ret;
CURLcode err;
struct zbx_json query;
struct curl_slist *curl_headers = NULL;
char *scroll_id = NULL, *scroll_query = NULL, errbuf[CURL_ERROR_SIZE], *error = NULL;
CURLoption opt;
double sec = 0;

zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_DEBUG))
{
char start_str[32], end_str[32];

strftime(start_str, sizeof(start_str), "%Y-%m-%d %H:%M:%S", localtime(&start));
strftime(end_str, sizeof(end_str), "%Y-%m-%d %H:%M:%S", localtime(&end));

zabbix_log(LOG_LEVEL_DEBUG, "In %s() window:(%s, %s] age: %s count:%d", __func__, start_str, end_str,
zbx_age2str(end - start), *count);
}

if (0 != hist->config_log_slow_queries)
sec = zbx_time();

ret = FAIL;

Expand All @@ -660,15 +675,16 @@ static int elastic_get_values(zbx_history_iface_t *hist, zbx_uint64_t itemid, in
return FAIL;
}

url_offset = 0;
zbx_snprintf_alloc(&data->post_url, &url_alloc, &url_offset, "%s/%s*/_search?scroll=10s", data->base_url,
value_type_str[hist->value_type]);

/* prepare the json query for elasticsearch, apply ranges if needed */
zbx_json_init(&query, ZBX_JSON_ALLOCATE);

if (0 < count)
if (0 < *count)
{
zbx_json_adduint64(&query, "size", count);
zbx_json_adduint64(&query, "size", *count);
zbx_json_addarray(&query, "sort");
zbx_json_addobject(&query, NULL);
zbx_json_addobject(&query, "clock");
Expand All @@ -692,6 +708,7 @@ static int elastic_get_values(zbx_history_iface_t *hist, zbx_uint64_t itemid, in
zbx_json_addobject(&query, "range");
zbx_json_addobject(&query, "clock");

zbx_json_addstring(&query, "format", "epoch_second", ZBX_JSON_TYPE_STRING);
if (0 < start)
zbx_json_adduint64(&query, "gt", start);

Expand Down Expand Up @@ -748,8 +765,6 @@ static int elastic_get_values(zbx_history_iface_t *hist, zbx_uint64_t itemid, in
goto out;
}

total = (0 == count ? -1 : count);

/* For processing the records, we need to keep track of the total requested and if the response from the */
/* elasticsearch server is empty. For this we use two variables, empty and total. If the result is empty or */
/* the total reach zero, we terminate the scrolling query and return what we currently have. */
Expand Down Expand Up @@ -791,13 +806,15 @@ static int elastic_get_values(zbx_history_iface_t *hist, zbx_uint64_t itemid, in

zbx_vector_history_record_append_ptr(values, &hr);

if (-1 != total)
--total;

if (0 == total)
if (0 != *count)
{
empty = 1;
break;
(*count)--;

if (0 == *count)
{
empty = 1;
break;
}
}
}

Expand Down Expand Up @@ -859,6 +876,13 @@ static int elastic_get_values(zbx_history_iface_t *hist, zbx_uint64_t itemid, in

curl_slist_free_all(curl_headers);

if (0 != hist->config_log_slow_queries)
{
sec = zbx_time() - sec;
if (sec > (double)hist->config_log_slow_queries / 1000.0)
zabbix_log(LOG_LEVEL_WARNING, "slow query: " ZBX_FS_DBL " sec, \"%s\"", sec, query.buffer);
}

zbx_json_free(&query);

zbx_free(scroll_id);
Expand All @@ -867,11 +891,125 @@ static int elastic_get_values(zbx_history_iface_t *hist, zbx_uint64_t itemid, in

zbx_vector_history_record_sort(values, (zbx_compare_func_t)zbx_history_record_compare_desc_func);

zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
zabbix_log(LOG_LEVEL_DEBUG, "End of %s() values:%d", __func__, values->values_num);

return ret;
}


/************************************************************************************
* *
* Purpose: gets period window *
* *
* Parameters: periods - [IN] history storage periods *
* num - [IN] count of history storage periods *
* step - [IN] period step *
* clock_from - [IN/OUT] period start timestamp *
* clock_to - [IN] period end timestamp (including) *
* clock_to_shift - [OUT] next period end timestmap *
* *
* Return value: period - current period *
* FAIL - otherwise *
* *
* Comments: This function gets window in increments in order to touch as *
* less partitions as possible *
* *
************************************************************************************/
static int period_iter_next(const int *periods, int num, int *step, time_t *clock_from, time_t clock_to,
time_t *clock_to_shift)
{
int period = periods[*step];

if (-1 == period)
return period;

if (0 > (*clock_from = clock_to - period))
{
*clock_from = clock_to;

*step = num - 1;

return period;
}

*clock_to_shift = clock_to - period;
(*step)++;

return period;
}

/************************************************************************************
* *
* Purpose: gets item history data from history storage *
* *
* Parameters: hist - [IN] the history storage interface *
* itemid - [IN] the itemid *
* count - [IN] the number of values to read *
* clock_to - [IN] the period end timestamp (including) *
* values - [OUT] the item history data values *
* *
* Return value: SUCCEED - the history data were read successfully *
* FAIL - otherwise *
* *
* Comments: This function reads <count> values and moves window in increments *
* in order to touch as less partitions as possible *
* *
************************************************************************************/
static int elastic_read_values_by_count(zbx_history_iface_t *hist, zbx_uint64_t itemid,
int count, time_t clock_to, zbx_vector_history_record_t *values)
{
const int periods[] = {SEC_PER_HOUR, 12 * SEC_PER_HOUR, SEC_PER_DAY, SEC_PER_DAY, SEC_PER_WEEK,
SEC_PER_MONTH, 0, -1};
int step = 0, ret = FAIL;
time_t clock_from, clock_to_shift;

while (-1 != period_iter_next(periods, ARRSIZE(periods), &step, &clock_from, clock_to, &clock_to_shift) &&
1 < count)
{
if (clock_from == clock_to)
clock_from = 0;

zbx_recalc_time_period(&clock_from, ZBX_RECALC_TIME_PERIOD_HISTORY);

if (clock_from > clock_to)
return SUCCEED;

if (FAIL == (ret = elastic_get_values_for_period(hist, itemid, clock_from, &count, clock_to, values)))
break;

clock_to = clock_to_shift;
}

return ret;
}

/************************************************************************************
* *
* Purpose: gets item history data from history storage *
* *
* Parameters: hist - [IN] the history storage interface *
* itemid - [IN] the itemid *
* start - [IN] the period start timestamp *
* count - [IN/OUT] the number of values to read *
* end - [IN] the period end timestamp *
* values - [OUT] the item history data values *
* *
* Return value: SUCCEED - the history data were read successfully *
* FAIL - otherwise *
* *
* Comments: This function reads <count> values from ]<start>,<end>] interval or *
* all values from the specified interval if count is zero. *
* *
************************************************************************************/
static int elastic_get_values(zbx_history_iface_t *hist, zbx_uint64_t itemid, int start, int count, int end,
zbx_vector_history_record_t *values)
{
if (0 == count || 0 != start)
return elastic_get_values_for_period(hist, itemid, start, &count, end, values);

return elastic_read_values_by_count(hist, itemid, count, end, values);
}

/************************************************************************************
* *
* Purpose: sends history data to the storage *
Expand Down Expand Up @@ -946,7 +1084,7 @@ static int elastic_add_values(zbx_history_iface_t *hist, const zbx_vector_dc_his

if (num > 0)
{
data->post_url = zbx_dsprintf(NULL, "%s/_bulk?refresh=true", data->base_url);
data->post_url = zbx_dsprintf(NULL, "%s/_bulk", data->base_url);
elastic_writer_add_iface(hist);
}

Expand Down Expand Up @@ -989,7 +1127,7 @@ static int elastic_flush(zbx_history_iface_t *hist)
* *
************************************************************************************/
int zbx_history_elastic_init(zbx_history_iface_t *hist, unsigned char value_type,
const char *config_history_storage_url, char **error)
const char *config_history_storage_url, int config_log_slow_queries, char **error)
{
zbx_elastic_data_t *data;

Expand Down Expand Up @@ -1017,6 +1155,7 @@ int zbx_history_elastic_init(zbx_history_iface_t *hist, unsigned char value_type
hist->flush = elastic_flush;
hist->get_values = elastic_get_values;
hist->requires_trends = 0;
hist->config_log_slow_queries = config_log_slow_queries;

return SUCCEED;
}
Expand Down Expand Up @@ -1193,11 +1332,12 @@ zbx_uint32_t zbx_elastic_version_get(void)
}
#else
int zbx_history_elastic_init(zbx_history_iface_t *hist, unsigned char value_type,
const char *config_history_storage_url, char **error)
const char *config_history_storage_url, int config_log_slow_queries, char **error)
{
ZBX_UNUSED(hist);
ZBX_UNUSED(value_type);
ZBX_UNUSED(config_history_storage_url);
ZBX_UNUSED(config_log_slow_queries);

*error = zbx_strdup(*error, "Zabbix must be compiled with cURL library for Elasticsearch history backend");

Expand Down
3 changes: 2 additions & 1 deletion src/zabbix_server/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2314,7 +2314,8 @@ int MAIN_ZABBIX_ENTRY(int flags)
exit(EXIT_FAILURE);
}

if (SUCCEED != zbx_history_init(config_history_storage_url, config_history_storage_opts, &error))
if (SUCCEED != zbx_history_init(config_history_storage_url, config_history_storage_opts,
zbx_db_config->log_slow_queries, &error))
{
zabbix_log(LOG_LEVEL_CRIT, "cannot initialize history storage: %s", error);
zbx_free(error);
Expand Down
Loading

0 comments on commit d57dcce

Please sign in to comment.