diff --git a/llarp/iwp/session.cpp b/llarp/iwp/session.cpp index 75f2356a55..c8c7e7fa81 100644 --- a/llarp/iwp/session.cpp +++ b/llarp/iwp/session.cpp @@ -144,6 +144,8 @@ namespace llarp void Session::EncryptAndSend(ILinkSession::Packet_t data) { + if (m_State == State::Closed) + return; m_EncryptNext.emplace_back(std::move(data)); TriggerPump(); if (!IsEstablished()) @@ -179,12 +181,9 @@ namespace llarp return; auto close_msg = CreatePacket(Command::eCLOS, 0, 16, 16); m_Parent->UnmapAddr(m_RemoteAddr); - m_State = State::Closed; - if (m_SentClosed.test_and_set()) - return; EncryptAndSend(std::move(close_msg)); - LogInfo(m_Parent->PrintableName(), " closing connection to ", m_RemoteAddr); + m_State = State::Closed; } bool @@ -355,7 +354,7 @@ namespace llarp bool Session::TimedOut(llarp_time_t now) const { - if (m_State == State::Ready) + if (m_State == State::Ready || m_State == State::LinkIntro) { return now > m_LastRX && now - m_LastRX diff --git a/llarp/iwp/session.hpp b/llarp/iwp/session.hpp index 210a37a1ec..1e0e6b3c41 100644 --- a/llarp/iwp/session.hpp +++ b/llarp/iwp/session.hpp @@ -206,7 +206,6 @@ namespace llarp std::atomic_flag m_PlaintextEmpty; llarp::thread::Queue m_PlaintextRecv; - std::atomic_flag m_SentClosed; void EncryptWorker(CryptoQueue_t msgs); diff --git a/llarp/lokinet_shared.cpp b/llarp/lokinet_shared.cpp index e4e31fe940..970b0996d4 100644 --- a/llarp/lokinet_shared.cpp +++ b/llarp/lokinet_shared.cpp @@ -715,8 +715,7 @@ extern "C" return; } - auto on_open = [ctx, localAddr, remote, open_cb]( - bool success, void* user_data) { + auto on_open = [ctx, localAddr, remote, open_cb](bool success, void* user_data) { llarp::log::info( logcat, "Quic tunnel {}<->{}.", diff --git a/llarp/quic/endpoint.cpp b/llarp/quic/endpoint.cpp index f96563af96..b97db84bdf 100644 --- a/llarp/quic/endpoint.cpp +++ b/llarp/quic/endpoint.cpp @@ -298,7 +298,13 @@ namespace llarp::quic ngtcp2_pkt_info pi; auto written = ngtcp2_conn_write_connection_close( - conn, &conn.path.path, &pi, u8data(conn.conn_buffer), conn.conn_buffer.size(), &err, get_timestamp()); + conn, + &conn.path.path, + &pi, + u8data(conn.conn_buffer), + conn.conn_buffer.size(), + &err, + get_timestamp()); if (written <= 0) { log::warning( diff --git a/llarp/quic/tunnel.cpp b/llarp/quic/tunnel.cpp index 5fb9b6a27d..1a76f8f64d 100644 --- a/llarp/quic/tunnel.cpp +++ b/llarp/quic/tunnel.cpp @@ -135,7 +135,7 @@ namespace llarp::quic log::info(logcat, "EOF on connection to {}:{}", c.peer().ip, c.peer().port); if (auto stream = c.data()) { - stream->set_eof(); // CloseEvent will send graceful shutdown to other end + stream->set_eof(); // CloseEvent will send graceful shutdown to other end } c.close(); }); diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index c1d8e85959..19b3b9ccc5 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -72,6 +72,7 @@ namespace llarp _lastTick = llarp::time_now_ms(); m_NextExploreAt = Clock_t::now(); m_Pump = _loop->make_waker([this]() { PumpLL(); }); + m_Work = _loop->make_waker([this]() { submit_work(); }); } Router::~Router() @@ -79,6 +80,15 @@ namespace llarp llarp_dht_context_free(_dht); } + void + Router::submit_work() + { + m_lmq->job([work = std::move(m_WorkJobs)]() { + for (const auto& job : work) + job(); + }); + } + void Router::PumpLL() { @@ -482,8 +492,8 @@ namespace llarp LogError("RC is invalid, not saving"); return false; } - if (m_isServiceNode) - _nodedb->Put(_rc); + if (IsServiceNode()) + _nodedb->Put(rc()); QueueDiskIO([&]() { HandleSaveRC(); }); return true; } @@ -1631,7 +1641,10 @@ namespace llarp void Router::QueueWork(std::function func) { - m_lmq->job(std::move(func)); + _loop->call([this, func = std::move(func)]() mutable { + m_WorkJobs.push_back(std::move(func)); + m_Work->Trigger(); + }); } void diff --git a/llarp/router/router.hpp b/llarp/router/router.hpp index 16efb1fccc..bdbcbd8c91 100644 --- a/llarp/router/router.hpp +++ b/llarp/router/router.hpp @@ -78,6 +78,12 @@ namespace llarp path::BuildLimiter m_PathBuildLimiter; std::shared_ptr m_Pump; + std::shared_ptr m_Work; + std::vector> m_WorkJobs; + + /// submits cpu heavy work from last event loop tick cycle to worker threads. + void + submit_work(); path::BuildLimiter& pathBuildLimiter() override @@ -196,9 +202,11 @@ namespace llarp return _vpnPlatform.get(); } + /// queue functionally pure cpu heavy work to be done in another thread. void QueueWork(std::function func) override; + /// queue disk io bound work to be done in the disk io thread. void QueueDiskIO(std::function func) override;