From d44f2858f360b3f03db8f5c9ccb96b45f20c68dd Mon Sep 17 00:00:00 2001 From: Albin Antony Date: Fri, 30 Aug 2024 23:45:06 +0530 Subject: [PATCH] s3select: process jsonl Solve broken line issue for jsonl chunks. Signed-off-by: Albin Antony --- include/s3select.h | 75 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/include/s3select.h b/include/s3select.h index 311783c4..c7a3737d 100644 --- a/include/s3select.h +++ b/include/s3select.h @@ -3084,6 +3084,7 @@ class json_object : public base_s3object size_t m_row_count; bool star_operation_ind; bool m_init_json_processor_ind; + std::string m_incomplete_line_buffer; public: @@ -3272,6 +3273,80 @@ class json_object : public base_s3object return status; } + int run_s3select_on_stream_jsonl(std::string& result, const char* json_stream, size_t stream_length, size_t obj_size, bool json_format = false) +{ + int status = 0; + m_processed_bytes += stream_length; + set_sql_result(result); + + if (JsonHandler.is_fatal_initialization()) + { + throw base_s3select_exception(JsonHandler.m_fatal_initialization_description, base_s3select_exception::s3select_exp_en_t::FATAL); + } + + if (!stream_length || !json_stream) + { + if (!m_incomplete_line_buffer.empty()) + { + status = JsonHandler.process_json_buffer(m_incomplete_line_buffer.data(), m_incomplete_line_buffer.size(), true); + if (status < 0) + { + throw base_s3select_exception("Error processing incomplete JSON object at end of stream", base_s3select_exception::s3select_exp_en_t::FATAL); + } + m_incomplete_line_buffer.clear(); + } + + m_end_of_stream = true; + sql_execution_on_row_cb(); + return 0; + } + + std::string combined_data = m_incomplete_line_buffer + std::string(json_stream, stream_length); + size_t start = 0; + size_t end = 0; + size_t data_length = combined_data.size(); + + while (end < data_length) + { + if (combined_data[end] == '\n') + { + std::string line = combined_data.substr(start, end - start); + + if (!line.empty()) + { + try + { + status = JsonHandler.process_json_buffer(line.data(), line.size()); + } + catch (std::exception &e) + { + std::string error_description = "Exception while processing JSON: " + std::string(e.what()); + throw base_s3select_exception(error_description, base_s3select_exception::s3select_exp_en_t::FATAL); + } + + if (status < 0) + { + throw base_s3select_exception("Failure processing JSON line", base_s3select_exception::s3select_exp_en_t::FATAL); + } + } + + start = end + 1; + } + end++; + } + + if (start < data_length) + { + m_incomplete_line_buffer = combined_data.substr(start); + } + else + { + m_incomplete_line_buffer.clear(); + } + + return status; +} + void set_json_query(s3select* s3_query, csv_definitions csv) { m_csv_defintion = csv;