Skip to content

Commit

Permalink
Refactors dbcore and limits the maximum amount of concurrent async qu…
Browse files Browse the repository at this point in the history
…eries to a variable amount (#59676)

Refactors dbcore to work off a subsystem if executed async and limits the maximum amount of concurrent async queries to 25.

This has been tested locally on a mysql docker image and there were no crashes (as long as you didn't run it with debug extools) + data was getting recorded fine.
Why It's Good For The Game

May or may not resolve terry crashes, however, each query creates a new thread which takes up 2mb, preventing the game from using that 2mb. This can lead to ooms if they stack up, e.g. due to poor connectivity. This solves that issue.

maintainer note: this did not actually resolve the crashes, but has value anyway. Crashes were sidestepped fixed by finding out Large Address Awareness works


cl
refactor: Refactors dbcore.dm to possibly resolve the crashes that happen on Terry.
/cl
  • Loading branch information
Watermelon914 authored Jun 21, 2021
1 parent 5d44d53 commit f711779
Show file tree
Hide file tree
Showing 9 changed files with 239 additions and 35 deletions.
6 changes: 6 additions & 0 deletions code/__DEFINES/database.dm
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
/// When a query has been queued up for execution/is being executed
#define DB_QUERY_STARTED 0
/// When a query is finished executing
#define DB_QUERY_FINISHED 1
/// When there was a problem with the execution of a query.
#define DB_QUERY_BROKEN 2
1 change: 1 addition & 0 deletions code/__DEFINES/subsystems.dm
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@
#define FIRE_PRIORITY_VIS 10
#define FIRE_PRIORITY_AMBIENCE 10
#define FIRE_PRIORITY_GARBAGE 15
#define FIRE_PRIORITY_DATABASE 16
#define FIRE_PRIORITY_WET_FLOORS 20
#define FIRE_PRIORITY_AIR 20
#define FIRE_PRIORITY_NPC 20
Expand Down
3 changes: 3 additions & 0 deletions code/controllers/configuration/entries/dbconfig.dm
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,6 @@
config_entry_value = 50
min_val = 1

/datum/config_entry/number/max_concurrent_queries
config_entry_value = 25
min_val = 1
229 changes: 196 additions & 33 deletions code/controllers/subsystem/dbcore.dm
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
SUBSYSTEM_DEF(dbcore)
name = "Database"
flags = SS_BACKGROUND
wait = 1 MINUTES
flags = SS_TICKER
wait = 10 // Not seconds because we're running on SS_TICKER
runlevels = RUNLEVEL_INIT|RUNLEVEL_LOBBY|RUNLEVELS_DEFAULT
init_order = INIT_ORDER_DBCORE
priority = FIRE_PRIORITY_DATABASE

var/failed_connection_timeout = 0

var/schema_mismatch = 0
Expand All @@ -11,7 +14,29 @@ SUBSYSTEM_DEF(dbcore)
var/failed_connections = 0

var/last_error
var/list/active_queries = list()

var/max_concurrent_queries = 25

/// Number of all queries, reset to 0 when logged in SStime_track. Used by SStime_track
var/all_queries_num = 0
/// Number of active queries, reset to 0 when logged in SStime_track. Used by SStime_track
var/queries_active_num = 0
/// Number of standby queries, reset to 0 when logged in SStime_track. Used by SStime_track
var/queries_standby_num = 0

/// All the current queries that exist.
var/list/all_queries = list()
/// Queries being checked for timeouts.
var/list/processing_queries

/// Queries currently being handled by database driver
var/list/datum/db_query/queries_active = list()
/// Queries pending execution that will be handled this controller firing
var/list/datum/db_query/queries_new
/// Queries pending execution, mapped to complete arguments
var/list/datum/db_query/queries_standby = list()
/// Queries left to handle during controller firing
var/list/datum/db_query/queries_current

var/connection // Arbitrary handle returned from rust_g.

Expand All @@ -26,8 +51,30 @@ SUBSYSTEM_DEF(dbcore)

return ..()

/datum/controller/subsystem/dbcore/fire()
for(var/I in active_queries)
/datum/controller/subsystem/dbcore/stat_entry(msg)
msg = "P:[length(all_queries)]|Active:[length(queries_active)]|Standby:[length(queries_standby)]"
return ..()

/// Resets the tracking numbers on the subsystem. Used by SStime_track.
/datum/controller/subsystem/dbcore/proc/reset_tracking()
all_queries_num = 0
queries_active_num = 0
queries_standby_num = 0

/datum/controller/subsystem/dbcore/fire(resumed = FALSE)
if(!IsConnected())
return

if(!resumed)
queries_new = null
if(!length(queries_active) && !length(queries_standby) && !length(all_queries))
processing_queries = null
queries_current = null
return
queries_current = queries_active.Copy()
processing_queries = all_queries.Copy()

for(var/I in processing_queries)
var/datum/db_query/Q = I
if(world.time - Q.last_activity_time > (5 MINUTES))
message_admins("Found undeleted query, please check the server logs and notify coders.")
Expand All @@ -36,12 +83,77 @@ SUBSYSTEM_DEF(dbcore)
if(MC_TICK_CHECK)
return

// First handle the already running queries
while(length(queries_current))
var/datum/db_query/query = popleft(queries_current)
if(!process_query(query))
queries_active -= query
if(MC_TICK_CHECK)
return

// Then strap on extra new queries as possible
if(isnull(queries_new))
if(!length(queries_standby))
return
queries_new = queries_standby.Copy(1, min(length(queries_standby), max_concurrent_queries) + 1)

while(length(queries_new) && length(queries_active) < max_concurrent_queries)
var/datum/db_query/query = popleft(queries_new)
queries_standby.Remove(query)
create_active_query(query)
if(MC_TICK_CHECK)
return

/// Helper proc for handling queued new queries
/datum/controller/subsystem/dbcore/proc/create_active_query(datum/db_query/query)
PRIVATE_PROC(TRUE)
SHOULD_NOT_SLEEP(TRUE)
if(IsAdminAdvancedProcCall())
return FALSE
run_query(query)
queries_active_num++
queries_active += query
return query

/datum/controller/subsystem/dbcore/proc/process_query(datum/db_query/query)
PRIVATE_PROC(TRUE)
SHOULD_NOT_SLEEP(TRUE)
if(IsAdminAdvancedProcCall())
return FALSE
if(QDELETED(query))
return FALSE
if(query.process(wait))
queries_active -= query
return FALSE
return TRUE

/datum/controller/subsystem/dbcore/proc/run_query_sync(datum/db_query/query)
if(IsAdminAdvancedProcCall())
return
run_query(query)
UNTIL(query.process())
return query

/datum/controller/subsystem/dbcore/proc/run_query(datum/db_query/query)
if(IsAdminAdvancedProcCall())
return
query.job_id = rustg_sql_query_async(connection, query.sql, json_encode(query.arguments))

/datum/controller/subsystem/dbcore/proc/queue_query(datum/db_query/query)
if(IsAdminAdvancedProcCall())
return
queries_standby_num++
queries_standby |= query

/datum/controller/subsystem/dbcore/Recover()
connection = SSdbcore.connection

/datum/controller/subsystem/dbcore/Shutdown()
//This is as close as we can get to the true round end before Disconnect() without changing where it's called, defeating the reason this is a subsystem
if(SSdbcore.Connect())
for(var/datum/db_query/query in queries_current)
run_query(query)

var/datum/db_query/query_round_shutdown = SSdbcore.NewQuery(
"UPDATE [format_table_name("round")] SET shutdown_datetime = Now(), end_state = :end_state WHERE id = :round_id",
list("end_state" = SSticker.end_state, "round_id" = GLOB.round_id)
Expand All @@ -53,11 +165,34 @@ SUBSYSTEM_DEF(dbcore)

//nu
/datum/controller/subsystem/dbcore/can_vv_get(var_name)
return var_name != NAMEOF(src, connection) && var_name != NAMEOF(src, active_queries) && ..()
if(var_name == NAMEOF(src, connection))
return FALSE
if(var_name == NAMEOF(src, all_queries))
return FALSE
if(var_name == NAMEOF(src, queries_active))
return FALSE
if(var_name == NAMEOF(src, queries_new))
return FALSE
if(var_name == NAMEOF(src, queries_standby))
return FALSE
if(var_name == NAMEOF(src, queries_active))
return FALSE

return ..()

/datum/controller/subsystem/dbcore/vv_edit_var(var_name, var_value)
if(var_name == NAMEOF(src, connection))
return FALSE
if(var_name == NAMEOF(src, all_queries))
return FALSE
if(var_name == NAMEOF(src, queries_active))
return FALSE
if(var_name == NAMEOF(src, queries_new))
return FALSE
if(var_name == NAMEOF(src, queries_standby))
return FALSE
if(var_name == NAMEOF(src, queries_active))
return FALSE
return ..()

/datum/controller/subsystem/dbcore/proc/Connect()
Expand All @@ -82,6 +217,8 @@ SUBSYSTEM_DEF(dbcore)
var/timeout = max(CONFIG_GET(number/async_query_timeout), CONFIG_GET(number/blocking_query_timeout))
var/thread_limit = CONFIG_GET(number/bsql_thread_limit)

max_concurrent_queries = CONFIG_GET(number/max_concurrent_queries)

var/result = json_decode(rustg_sql_connect_pool(json_encode(list(
"host" = address,
"port" = port,
Expand Down Expand Up @@ -196,7 +333,7 @@ SUBSYSTEM_DEF(dbcore)

for (var/thing in querys)
var/datum/db_query/query = thing
UNTIL(!query.in_progress)
query.sync()
if (qdel)
qdel(query)

Expand Down Expand Up @@ -280,8 +417,14 @@ Delayed insert mode was removed in mysql 7 and only works with MyISAM type table
var/sql
var/arguments

var/datum/callback/success_callback
var/datum/callback/fail_callback

// Status information
var/in_progress
/// Current status of the query.
var/status
/// Job ID of the query passed by rustg.
var/job_id
var/last_error
var/last_activity
var/last_activity_time
Expand All @@ -295,7 +438,8 @@ Delayed insert mode was removed in mysql 7 and only works with MyISAM type table
var/list/item //list of data values populated by NextRow()

/datum/db_query/New(connection, sql, arguments)
SSdbcore.active_queries[src] = TRUE
SSdbcore.all_queries += src
SSdbcore.all_queries_num++
Activity("Created")
item = list()

Expand All @@ -305,7 +449,9 @@ Delayed insert mode was removed in mysql 7 and only works with MyISAM type table

/datum/db_query/Destroy()
Close()
SSdbcore.active_queries -= src
SSdbcore.all_queries -= src
SSdbcore.queries_standby -= src
SSdbcore.queries_active -= src
return ..()

/datum/db_query/CanProcCall(proc_name)
Expand All @@ -323,7 +469,7 @@ Delayed insert mode was removed in mysql 7 and only works with MyISAM type table

/datum/db_query/proc/Execute(async = TRUE, log_error = TRUE)
Activity("Execute")
if(in_progress)
if(status == DB_QUERY_STARTED)
CRASH("Attempted to start a new query while waiting on the old one")

if(!SSdbcore.IsConnected())
Expand All @@ -334,7 +480,18 @@ Delayed insert mode was removed in mysql 7 and only works with MyISAM type table
if(!async)
start_time = REALTIMEOFDAY
Close()
. = run_query(async)
status = DB_QUERY_STARTED
if(async)
if(!Master.current_runlevel || Master.processing == 0)
SSdbcore.run_query_sync(src)
else
SSdbcore.queue_query(src)
sync()
else
var/job_result_str = rustg_sql_query_blocking(connection, sql, json_encode(arguments))
store_data(json_decode(job_result_str))

. = (status != DB_QUERY_BROKEN)
var/timed_out = !. && findtext(last_error, "Operation timed out")
if(!. && log_error)
log_sql("[last_error] | Query used: [sql] | Arguments: [json_encode(arguments)]")
Expand All @@ -345,34 +502,40 @@ Delayed insert mode was removed in mysql 7 and only works with MyISAM type table
log_query_debug("Query used: [sql]")
slow_query_check()

/datum/db_query/proc/run_query(async)
var/job_result_str
/// Sleeps until execution of the query has finished.
/datum/db_query/proc/sync()
while(status < DB_QUERY_FINISHED)
stoplag()

if (async)
var/job_id = rustg_sql_query_async(connection, sql, json_encode(arguments))
in_progress = TRUE
UNTIL((job_result_str = rustg_sql_check_query(job_id)) != RUSTG_JOB_NO_RESULTS_YET)
in_progress = FALSE
/datum/db_query/process(delta_time)
if(status >= DB_QUERY_FINISHED)
return

if (job_result_str == RUSTG_JOB_ERROR)
last_error = job_result_str
return FALSE
else
job_result_str = rustg_sql_query_blocking(connection, sql, json_encode(arguments))
status = DB_QUERY_STARTED
var/job_result = rustg_sql_check_query(job_id)
if(job_result == RUSTG_JOB_NO_RESULTS_YET)
return

var/result = json_decode(job_result_str)
switch (result["status"])
if ("ok")
store_data(json_decode(job_result))
return TRUE

/datum/db_query/proc/store_data(result)
switch(result["status"])
if("ok")
rows = result["rows"]
affected = result["affected"]
last_insert_id = result["last_insert_id"]
return TRUE
if ("err")
status = DB_QUERY_FINISHED
return
if("err")
last_error = result["data"]
return FALSE
if ("offline")
last_error = "offline"
return FALSE
status = DB_QUERY_BROKEN
return
if("offline")
last_error = "CONNECTION OFFLINE"
status = DB_QUERY_BROKEN
return


/datum/db_query/proc/slow_query_check()
message_admins("HEY! A database query timed out. Did the server just hang? <a href='?_src_=holder;[HrefToken()];slowquery=yes'>\[YES\]</a>|<a href='?_src_=holder;[HrefToken()];slowquery=no'>\[NO\]</a>")
Expand Down
12 changes: 10 additions & 2 deletions code/controllers/subsystem/time_track.dm
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ SUBSYSTEM_DEF(time_track)
"air_hotspot_count",
"air_network_count",
"air_delta_count",
"air_superconductive_count"
"air_superconductive_count",
"all_queries",
"queries_active",
"queries_standby"
#ifdef SENDMAPS_PROFILE
) + sendmaps_shorthands
#else
Expand Down Expand Up @@ -142,14 +145,19 @@ SUBSYSTEM_DEF(time_track)
length(SSair.hotspots),
length(SSair.networks),
length(SSair.high_pressure_delta),
length(SSair.active_super_conductivity)
length(SSair.active_super_conductivity),
SSdbcore.all_queries_num,
SSdbcore.queries_active_num,
SSdbcore.queries_standby_num
#ifdef SENDMAPS_PROFILE
) + send_maps_values
#else
)
#endif
)

SSdbcore.reset_tracking()

#ifdef SENDMAPS_PROFILE
/datum/controller/subsystem/time_track/proc/scream_maptick_data()
var/current_profile_data = world.Profile(PROFILE_REFRESH, type = "sendmaps", format="json")
Expand Down
Loading

0 comments on commit f711779

Please sign in to comment.