diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index 6a4db8c5ae..19534b8b15 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -72,6 +72,7 @@ namespace llarp _lastTick = llarp::time_now_ms(); m_NextExploreAt = Clock_t::now(); m_Pump = _loop->make_waker([this]() { PumpLL(); }); + m_Work = _loop->make_waker([this]() { submit_work(); }); } Router::~Router() @@ -79,6 +80,15 @@ namespace llarp llarp_dht_context_free(_dht); } + void + Router::submit_work() + { + m_lmq->job([work = std::move(m_WorkJobs)]() { + for (const auto& job : work) + job(); + }); + } + void Router::PumpLL() { @@ -1631,7 +1641,10 @@ namespace llarp void Router::QueueWork(std::function func) { - m_lmq->job(std::move(func)); + _loop->call([this, func = std::move(func)]() mutable { + m_WorkJobs.push_back(std::move(func)); + m_Work->Trigger(); + }); } void diff --git a/llarp/router/router.hpp b/llarp/router/router.hpp index 16efb1fccc..bdbcbd8c91 100644 --- a/llarp/router/router.hpp +++ b/llarp/router/router.hpp @@ -78,6 +78,12 @@ namespace llarp path::BuildLimiter m_PathBuildLimiter; std::shared_ptr m_Pump; + std::shared_ptr m_Work; + std::vector> m_WorkJobs; + + /// submits cpu heavy work from last event loop tick cycle to worker threads. + void + submit_work(); path::BuildLimiter& pathBuildLimiter() override @@ -196,9 +202,11 @@ namespace llarp return _vpnPlatform.get(); } + /// queue functionally pure cpu heavy work to be done in another thread. void QueueWork(std::function func) override; + /// queue disk io bound work to be done in the disk io thread. void QueueDiskIO(std::function func) override;