Skip to content

Commit

Permalink
Update to log's stable KV feature
Browse files Browse the repository at this point in the history
Requires updating to log v0.4.21 and std-logger v0.5.3.
  • Loading branch information
Thomasdezeeuw committed Mar 20, 2024
1 parent 1c7ee4a commit 51446b4
Show file tree
Hide file tree
Showing 14 changed files with 42 additions and 42 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ test = ["getrandom"]

[dependencies]
heph-inbox = { version = "0.2.3", path = "./inbox", default-features = false }
log = { version = "0.4.17", default-features = false, features = ["kv_unstable", "kv_unstable_std"] }
log = { version = "0.4.21", default-features = false, features = ["kv_std"] }

# Optional dependencies, enabled by features.
# Required by the `test` feature.
Expand All @@ -30,7 +30,7 @@ getrandom = { version = "0.2.2", default-features = false, features = ["
[dev-dependencies]
getrandom = { version = "0.2.2", default-features = false, features = ["std"] }
heph-rt = { version = "0.5.0", path = "./rt", default-features = false, features = ["test"] }
std-logger = { version = "0.5.0", default-features = false, features = ["log-panic", "nightly"] }
std-logger = { version = "0.5.3", default-features = false, features = ["log-panic", "nightly"] }

[[test]]
name = "examples"
Expand Down
2 changes: 1 addition & 1 deletion http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ log = { version = "0.4.17", default-features = false }
itoa = { version = "1.0.6", default-features = false }

[dev-dependencies]
std-logger = { version = "0.5.0", default-features = false, features = ["log-panic", "nightly"] }
std-logger = { version = "0.5.3", default-features = false, features = ["log-panic", "nightly"] }

[dev-dependencies.heph]
path = "../"
Expand Down
4 changes: 2 additions & 2 deletions rt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ test = ["heph/test"]
a10 = { version = "0.1.6", default-features = false, features = ["nightly"] }
heph = { version = "0.5.0", path = "../", default-features = false }
heph-inbox = { version = "0.2.3", path = "../inbox", default-features = false }
log = { version = "0.4.17", default-features = false, features = ["kv_unstable", "kv_unstable_std"] }
log = { version = "0.4.21", default-features = false, features = ["kv_std"] }
crossbeam-channel = { version = "0.5.0", default-features = false, features = ["std"] }
libc = { version = "0.2.96", default-features = false }
socket2 = { version = "0.5.2", default-features = false, features = ["all"] }

[dev-dependencies]
getrandom = { version = "0.2.2", default-features = false, features = ["std"] }
std-logger = { version = "0.5.0", default-features = false, features = ["nightly"] }
std-logger = { version = "0.5.3", default-features = false, features = ["nightly"] }

[[test]]
name = "functional"
Expand Down
2 changes: 1 addition & 1 deletion rt/examples/2_my_ip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ fn conn_supervisor(err: io::Error) -> SupervisorStrategy<TcpStream> {
/// as message type.
async fn conn_actor(_: actor::Context<!, ThreadLocal>, stream: TcpStream) -> io::Result<()> {
let address = stream.peer_addr()?;
info!(address = log::as_display!(address); "accepted connection");
info!(address:% = address; "accepted connection");

// This will allocate a new string which isn't the most efficient way to do
// this, but it's the easiest so we'll keep this for sake of example.
Expand Down
2 changes: 1 addition & 1 deletion rt/examples/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ where
RT: rt::Access + Clone,
{
let address = stream.peer_addr()?;
info!(address = log::as_display!(address); "accepted connection");
info!(address:% = address; "accepted connection");
let mut buffer = Vec::with_capacity(1024);

let err = loop {
Expand Down
26 changes: 13 additions & 13 deletions rt/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::{fmt, io, process};

use a10::signals::{ReceiveSignals, Signals};
use heph::actor_ref::{ActorGroup, Delivery};
use log::{as_debug, as_display, debug, error, info, trace};
use log::{debug, error, info, trace};

use crate::setup::{host_id, host_info, Uuid};
use crate::{self as rt, cpu_usage, shared, sync_worker, trace, worker, Signal};
Expand Down Expand Up @@ -228,14 +228,14 @@ impl Coordinator {
sending_pid = info.ssi_pid, sending_uid = info.ssi_uid; "received unexpected signal, not relaying");
continue;
};
debug!(signal = as_debug!(signal), signal_number = info.ssi_signo, signal_code = info.ssi_code,
debug!(signal:? = signal, signal_number = info.ssi_signo, signal_code = info.ssi_code,
sending_pid = info.ssi_pid, sending_uid = info.ssi_uid; "received process signal");

if let Signal::User2 = signal {
self.log_metrics();
}

trace!(signal = as_debug!(signal); "relaying process signal to worker threads");
trace!(signal:? = signal; "relaying process signal to worker threads");
for worker in &mut self.workers {
if let Err(err) = worker.send_signal(signal) {
// NOTE: if the worker is unable to receive a
Expand All @@ -248,13 +248,13 @@ impl Coordinator {
// (i.e. it has the reason why the worker thread
// stopped).
error!(
signal = as_debug!(signal), worker_id = worker.id();
signal:? = signal, worker_id = worker.id();
"failed to send process signal to worker: {err}",
);
}
}

trace!(signal = as_debug!(signal); "relaying process signal to actors");
trace!(signal:? = signal; "relaying process signal to actors");
self.signal_refs.remove_disconnected();
_ = self.signal_refs.try_send(signal, Delivery::ToAll);
}
Expand All @@ -281,26 +281,26 @@ impl Coordinator {
let trace_metrics = self.trace_log.as_ref().map(trace::CoordinatorLog::metrics);
info!(
target: "metrics",
heph_version = as_display!(concat!("v", env!("CARGO_PKG_VERSION"))),
heph_version = concat!("v", env!("CARGO_PKG_VERSION")),
host_os = self.host_os,
host_arch = ARCH,
host_name = self.host_name,
host_id = as_display!(self.host_id),
host_id:% = self.host_id,
app_name = self.app_name,
process_id = process::id(),
parent_process_id = parent_id(),
uptime = as_debug!(self.start.elapsed()),
uptime:? = self.start.elapsed(),
worker_threads = self.workers.len(),
sync_actors = self.sync_workers.len(),
shared_scheduler_ready = shared_metrics.scheduler_ready,
shared_scheduler_inactive = shared_metrics.scheduler_inactive,
shared_timers_total = shared_metrics.timers_total,
shared_timers_next = as_debug!(shared_metrics.timers_next),
process_signals = as_debug!(Signal::ALL),
shared_timers_next:? = shared_metrics.timers_next,
process_signals:? = Signal::ALL,
process_signal_receivers = self.signal_refs.len(),
cpu_time = as_debug!(cpu_usage(libc::CLOCK_THREAD_CPUTIME_ID)),
total_cpu_time = as_debug!(cpu_usage(libc::CLOCK_PROCESS_CPUTIME_ID)),
trace_file = as_debug!(trace_metrics.as_ref().map(|m| m.file)),
cpu_time:? = cpu_usage(libc::CLOCK_THREAD_CPUTIME_ID),
total_cpu_time:? = cpu_usage(libc::CLOCK_PROCESS_CPUTIME_ID),
trace_file:? = trace_metrics.as_ref().map(|m| m.file),
trace_counter = trace_metrics.map_or(0, |m| m.counter);
"coordinator metrics",
);
Expand Down
6 changes: 3 additions & 3 deletions rt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ use std::sync::Arc;
use std::task;
use std::time::{Duration, Instant};

use ::log::{as_debug, debug, warn};
use ::log::{debug, warn};
use heph::actor_ref::{ActorGroup, ActorRef};
use heph::supervisor::{Supervisor, SyncSupervisor};
use heph::{ActorFutureBuilder, NewActor, SyncActor};
Expand Down Expand Up @@ -607,13 +607,13 @@ impl RuntimeRef {

/// Add a timer.
pub(crate) fn add_timer(&self, deadline: Instant, waker: task::Waker) -> TimerToken {
::log::trace!(deadline = as_debug!(deadline); "adding timer");
::log::trace!(deadline:? = deadline; "adding timer");
self.internals.timers.borrow_mut().add(deadline, waker)
}

/// Remove a previously set timer.
pub(crate) fn remove_timer(&self, deadline: Instant, token: TimerToken) {
::log::trace!(deadline = as_debug!(deadline); "removing timer");
::log::trace!(deadline:? = deadline; "removing timer");
self.internals.timers.borrow_mut().remove(deadline, token);
}

Expand Down
8 changes: 4 additions & 4 deletions rt/src/local/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::rc::Rc;
use std::sync::Arc;

use heph::actor_ref::{ActorGroup, Delivery, SendError};
use log::{as_debug, info, trace};
use log::{info, trace};

use crate::scheduler::Scheduler;
use crate::timers::Timers;
Expand Down Expand Up @@ -78,7 +78,7 @@ impl RuntimeInternals {
/// returns an error if no actors want to receive it.
pub(crate) fn relay_signal(&self, signal: Signal) {
let timing = trace::start(&*self.trace_log.borrow());
trace!(worker_id = self.id.get(), signal = as_debug!(signal); "received process signal");
trace!(worker_id = self.id.get(), signal:? = signal; "received process signal");

if let Signal::User2 = signal {
self.log_metrics();
Expand Down Expand Up @@ -115,9 +115,9 @@ impl RuntimeInternals {
scheduler_ready = scheduler.ready(),
scheduler_inactive = scheduler.inactive(),
timers_total = timers.len(),
timers_next = as_debug!(timers.next_timer()),
timers_next:? = timers.next_timer(),
process_signal_receivers = self.signal_receivers.borrow().len(),
cpu_time = as_debug!(cpu_usage(libc::CLOCK_THREAD_CPUTIME_ID)),
cpu_time:? = cpu_usage(libc::CLOCK_THREAD_CPUTIME_ID),
trace_counter = trace_metrics.map_or(0, |m| m.counter);
"worker metrics",
);
Expand Down
2 changes: 1 addition & 1 deletion rt/src/net/tcp/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ where
let listener = TcpListener::bind_setup(ctx.runtime_ref(), local, set_listener_options)
.await
.map_err(Error::Accept)?;
trace!(address = log::as_display!(local); "TCP server listening");
trace!(address:% = local; "TCP server listening");

let mut accept = listener.incoming();
let mut receive = ctx.receive_next();
Expand Down
4 changes: 2 additions & 2 deletions rt/src/process/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{fmt, ptr};

use heph::supervisor::Supervisor;
use heph::{ActorFuture, NewActor};
use log::{as_debug, error, trace};
use log::{error, trace};

use crate::panic_message;
use crate::spawn::options::Priority;
Expand Down Expand Up @@ -166,7 +166,7 @@ impl<P: Process + ?Sized> ProcessData<P> {
self.fair_runtime += fair_elapsed;

trace!(
pid = pid.0, name = name, elapsed = as_debug!(elapsed), result = as_debug!(result);
pid = pid.0, name = name, elapsed:? = elapsed, result:? = result;
"finished running process",
);
RunStats { elapsed, result }
Expand Down
4 changes: 2 additions & 2 deletions rt/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,11 +417,11 @@ pub(crate) fn set_cpu_affinity(worker_id: NonZeroUsize) -> Option<usize> {
let cpu_set = cpu_set(cpu);
match set_affinity(&cpu_set) {
Ok(()) => {
debug!(worker_id = log::as_display!(worker_id); "worker thread CPU affinity set to {cpu}");
debug!(worker_id = worker_id; "worker thread CPU affinity set to {cpu}");
Some(cpu)
}
Err(err) => {
warn!(worker_id = log::as_display!(worker_id); "failed to set CPU affinity on thread: {err}");
warn!(worker_id = worker_id; "failed to set CPU affinity on thread: {err}");
None
}
}
Expand Down
6 changes: 3 additions & 3 deletions rt/src/shared/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{io, task};
use heph::actor_ref::ActorRef;
use heph::supervisor::Supervisor;
use heph::{ActorFutureBuilder, NewActor};
use log::{as_debug, debug, trace};
use log::{debug, trace};

use crate::process::{FutureProcess, Process, ProcessId};
use crate::scheduler::shared::{ProcessData, Scheduler};
Expand Down Expand Up @@ -161,15 +161,15 @@ impl RuntimeInternals {
///
/// See [`Timers::add`].
pub(crate) fn add_timer(&self, deadline: Instant, waker: task::Waker) -> TimerToken {
trace!(deadline = as_debug!(deadline); "adding timer");
trace!(deadline:? = deadline; "adding timer");
self.timers.add(deadline, waker)
}

/// Remove a previously set timer.
///
/// See [`Timers::remove`].
pub(crate) fn remove_timer(&self, deadline: Instant, token: TimerToken) {
trace!(deadline = as_debug!(deadline); "removing timer");
trace!(deadline:? = deadline; "removing timer");
self.timers.remove(deadline, token);
}

Expand Down
8 changes: 4 additions & 4 deletions rt/src/systemd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::{env, io, process};

use heph::actor;
use heph::messages::Terminate;
use log::{as_debug, debug, warn};
use log::{debug, warn};

use crate::access::Access;
use crate::net::uds::datagram::{Connected, UnixDatagram};
Expand Down Expand Up @@ -136,7 +136,7 @@ impl Notify {
/// pass a human-readable error message. **Note that it must be limited to a
/// single line.**
pub async fn change_state(&self, state: State, status: Option<&str>) -> io::Result<()> {
debug!(state = log::as_debug!(state), status = log::as_debug!(status); "updating state with service manager");
debug!(state:? = state, status:? = status; "updating state with service manager");
let state_line = match state {
State::Ready => "READY=1\n",
State::Reloading => "RELOADING=1\n",
Expand Down Expand Up @@ -170,7 +170,7 @@ impl Notify {
/// If you also need to change the state of the application you can use
/// [`Notify::change_state`].
pub async fn change_status(&self, status: &str) -> io::Result<()> {
debug!(status = log::as_display!(status); "updating status with service manager");
debug!(status = status; "updating status with service manager");
let mut state_update = String::with_capacity(7 + status.len() + 1);
state_update.push_str("STATUS=");
state_update.push_str(status);
Expand Down Expand Up @@ -290,7 +290,7 @@ where
notify.change_state(State::Ready, None).await?;

if let Some(timeout) = notify.watchdog_timeout() {
debug!(timeout = as_debug!(timeout); "started via systemd with watchdog");
debug!(timeout:? = timeout; "started via systemd with watchdog");
let mut interval = Interval::every(ctx.runtime_ref().clone(), timeout);
loop {
match either(ctx.receive_next(), next(&mut interval)).await {
Expand Down
6 changes: 3 additions & 3 deletions rt/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::{fmt, io, task, thread};
use crossbeam_channel::Receiver;
use heph::actor::{self, actor_fn};
use heph::supervisor::NoSupervisor;
use log::{as_debug, debug, trace};
use log::{debug, trace};

use crate::error::StringError;
use crate::local::RuntimeInternals;
Expand Down Expand Up @@ -499,7 +499,7 @@ impl Worker {
self.internals.shared.try_poll_ring()?;

let timeout = self.determine_timeout();
trace!(worker_id = self.internals.id.get(), timeout = as_debug!(timeout); "polling for OS events");
trace!(worker_id = self.internals.id.get(), timeout:? = timeout; "polling for OS events");
self.internals.ring.borrow_mut().poll(timeout)?;

// Since we could have been polling our own ring for a long time we poll
Expand Down Expand Up @@ -640,7 +640,7 @@ async fn comm_actor(
) {
while let Some(msg) = receiver.recv().await {
let internals = &ctx.runtime_ref().internals;
trace!(worker_id = internals.id.get(), message = as_debug!(msg); "processing coordinator message");
trace!(worker_id = internals.id.get(), message:? = msg; "processing coordinator message");
let timing = trace::start(&*internals.trace_log.borrow());
match msg {
Control::Started => internals.start(),
Expand Down

0 comments on commit 51446b4

Please sign in to comment.