Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revamp JobQueue into JobExecutor and introduce NativeAsyncJob #4118

Merged
merged 7 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

62 changes: 40 additions & 22 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ mod helper;
use boa_engine::{
builtins::promise::PromiseState,
context::ContextBuilder,
job::{FutureJob, JobQueue, NativeJob},
job::{Job, JobExecutor, NativeAsyncJob, PromiseJob},
module::{Module, SimpleModuleLoader},
optimizer::OptimizerOptions,
script::Script,
vm::flowgraph::{Direction, Graph},
Context, JsError, Source,
Context, JsError, JsResult, Source,
};
use boa_parser::source::ReadChar;
use clap::{Parser, ValueEnum, ValueHint};
Expand Down Expand Up @@ -292,7 +292,7 @@ fn evaluate_file(
);

let promise = module.load_link_evaluate(context);
context.run_jobs();
context.run_jobs().map_err(|err| err.into_erased(context))?;
let result = promise.state();

return match result {
Expand All @@ -308,9 +308,9 @@ fn evaluate_file(
Ok(v) => println!("{}", v.display()),
Err(v) => eprintln!("Uncaught {v}"),
}
context.run_jobs();

Ok(())
context
.run_jobs()
.map_err(|err| err.into_erased(context).into())
}

fn evaluate_files(args: &Opt, context: &mut Context, loader: &SimpleModuleLoader) {
Expand All @@ -336,10 +336,10 @@ fn main() -> Result<()> {

let args = Opt::parse();

let queue = Rc::new(Jobs::default());
let executor = Rc::new(Executor::default());
let loader = Rc::new(SimpleModuleLoader::new(&args.root).map_err(|e| eyre!(e.to_string()))?);
let mut context = ContextBuilder::new()
.job_queue(queue)
.job_executor(executor)
.module_loader(loader.clone())
.build()
.map_err(|e| eyre!(e.to_string()))?;
Expand Down Expand Up @@ -425,7 +425,9 @@ fn main() -> Result<()> {
eprintln!("{}: {}", "Uncaught".red(), v.to_string().red());
}
}
context.run_jobs();
if let Err(err) = context.run_jobs() {
eprintln!("{err}");
};
}
}

Expand Down Expand Up @@ -453,29 +455,45 @@ fn add_runtime(context: &mut Context) {
}

#[derive(Default)]
struct Jobs(RefCell<VecDeque<NativeJob>>);
struct Executor {
promise_jobs: RefCell<VecDeque<PromiseJob>>,
async_jobs: RefCell<VecDeque<NativeAsyncJob>>,
}

impl JobQueue for Jobs {
fn enqueue_promise_job(&self, job: NativeJob, _: &mut Context) {
self.0.borrow_mut().push_back(job);
impl JobExecutor for Executor {
fn enqueue_job(&self, job: Job, _: &mut Context) {
match job {
Job::PromiseJob(job) => self.promise_jobs.borrow_mut().push_back(job),
Job::AsyncJob(job) => self.async_jobs.borrow_mut().push_back(job),
job => eprintln!("unsupported job type {job:?}"),
}
}

fn run_jobs(&self, context: &mut Context) {
fn run_jobs(&self, context: &mut Context) -> JsResult<()> {
loop {
let jobs = std::mem::take(&mut *self.0.borrow_mut());
if jobs.is_empty() {
return;
if self.promise_jobs.borrow().is_empty() && self.async_jobs.borrow().is_empty() {
return Ok(());
}

let jobs = std::mem::take(&mut *self.promise_jobs.borrow_mut());
for job in jobs {
if let Err(e) = job.call(context) {
eprintln!("Uncaught {e}");
}
}
}
}

fn enqueue_future_job(&self, future: FutureJob, _: &mut Context) {
let job = pollster::block_on(future);
self.0.borrow_mut().push_back(job);
let async_jobs = std::mem::take(&mut *self.async_jobs.borrow_mut());
for async_job in async_jobs {
if let Err(err) = pollster::block_on(async_job.call(&RefCell::new(context))) {
eprintln!("Uncaught {err}");
}
let jobs = std::mem::take(&mut *self.promise_jobs.borrow_mut());
for job in jobs {
if let Err(e) = job.call(context) {
eprintln!("Uncaught {e}");
}
}
}
}
}
}
22 changes: 12 additions & 10 deletions core/engine/src/builtins/promise/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
builtins::{Array, BuiltInObject},
context::intrinsics::{Intrinsics, StandardConstructor, StandardConstructors},
error::JsNativeError,
job::{JobCallback, NativeJob},
job::{JobCallback, PromiseJob},
js_string,
native_function::NativeFunction,
object::{
Expand Down Expand Up @@ -1888,8 +1888,8 @@ impl Promise {

// c. Perform HostEnqueuePromiseJob(fulfillJob.[[Job]], fulfillJob.[[Realm]]).
context
.job_queue()
.enqueue_promise_job(fulfill_job, context);
.job_executor()
.enqueue_job(fulfill_job.into(), context);
}

// 11. Else,
Expand All @@ -1909,7 +1909,9 @@ impl Promise {
let reject_job = new_promise_reaction_job(reject_reaction, reason.clone(), context);

// e. Perform HostEnqueuePromiseJob(rejectJob.[[Job]], rejectJob.[[Realm]]).
context.job_queue().enqueue_promise_job(reject_job, context);
context
.job_executor()
.enqueue_job(reject_job.into(), context);

// 12. Set promise.[[PromiseIsHandled]] to true.
promise
Expand Down Expand Up @@ -1985,7 +1987,7 @@ impl Promise {
let job = new_promise_reaction_job(reaction, argument.clone(), context);

// b. Perform HostEnqueuePromiseJob(job.[[Job]], job.[[Realm]]).
context.job_queue().enqueue_promise_job(job, context);
context.job_executor().enqueue_job(job.into(), context);
}
// 2. Return unused.
}
Expand Down Expand Up @@ -2178,7 +2180,7 @@ impl Promise {
);

// 15. Perform HostEnqueuePromiseJob(job.[[Job]], job.[[Realm]]).
context.job_queue().enqueue_promise_job(job, context);
context.job_executor().enqueue_job(job.into(), context);

// 16. Return undefined.
Ok(JsValue::undefined())
Expand Down Expand Up @@ -2239,7 +2241,7 @@ fn new_promise_reaction_job(
mut reaction: ReactionRecord,
argument: JsValue,
context: &mut Context,
) -> NativeJob {
) -> PromiseJob {
// Inverting order since `job` captures `reaction` by value.

// 2. Let handlerRealm be null.
Expand Down Expand Up @@ -2320,7 +2322,7 @@ fn new_promise_reaction_job(
};

// 4. Return the Record { [[Job]]: job, [[Realm]]: handlerRealm }.
NativeJob::with_realm(job, realm, context)
PromiseJob::with_realm(job, realm, context)
}

/// More information:
Expand All @@ -2332,7 +2334,7 @@ fn new_promise_resolve_thenable_job(
thenable: JsValue,
then: JobCallback,
context: &mut Context,
) -> NativeJob {
) -> PromiseJob {
// Inverting order since `job` captures variables by value.

// 2. Let getThenRealmResult be Completion(GetFunctionRealm(then.[[Callback]])).
Expand Down Expand Up @@ -2374,5 +2376,5 @@ fn new_promise_resolve_thenable_job(
};

// 6. Return the Record { [[Job]]: job, [[Realm]]: thenRealm }.
NativeJob::with_realm(job, realm, context)
PromiseJob::with_realm(job, realm, context)
}
3 changes: 1 addition & 2 deletions core/engine/src/builtins/promise/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ fn promise() {
count += 1;
"#}),
TestAction::assert_eq("count", 2),
#[allow(clippy::redundant_closure_for_method_calls)]
TestAction::inspect_context(|ctx| ctx.run_jobs()),
TestAction::inspect_context(|ctx| ctx.run_jobs().unwrap()),
TestAction::assert_eq("count", 3),
]);
}
66 changes: 38 additions & 28 deletions core/engine/src/context/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! The ECMAScript context.

use std::cell::RefCell;
use std::{cell::Cell, path::Path, rc::Rc};

use boa_ast::StatementList;
Expand All @@ -13,11 +14,12 @@ use intrinsics::Intrinsics;
#[cfg(feature = "temporal")]
use temporal_rs::tzdb::FsTzdbProvider;

use crate::job::Job;
use crate::vm::RuntimeLimits;
use crate::{
builtins,
class::{Class, ClassBuilder},
job::{JobQueue, NativeJob, SimpleJobQueue},
job::{JobExecutor, SimpleJobExecutor},
js_string,
module::{IdleModuleLoader, ModuleLoader, SimpleModuleLoader},
native_function::NativeFunction,
Expand Down Expand Up @@ -111,7 +113,7 @@ pub struct Context {

host_hooks: &'static dyn HostHooks,

job_queue: Rc<dyn JobQueue>,
job_executor: Rc<dyn JobExecutor>,

module_loader: Rc<dyn ModuleLoader>,

Expand All @@ -133,7 +135,7 @@ impl std::fmt::Debug for Context {
.field("interner", &self.interner)
.field("vm", &self.vm)
.field("strict", &self.strict)
.field("promise_job_queue", &"JobQueue")
.field("job_executor", &"JobExecutor")
.field("hooks", &"HostHooks")
.field("module_loader", &"ModuleLoader")
.field("optimizer_options", &self.optimizer_options);
Expand Down Expand Up @@ -186,7 +188,7 @@ impl Context {
/// ```
///
/// Note that this won't run any scheduled promise jobs; you need to call [`Context::run_jobs`]
/// on the context or [`JobQueue::run_jobs`] on the provided queue to run them.
/// on the context or [`JobExecutor::run_jobs`] on the provided queue to run them.
#[allow(clippy::unit_arg, dropping_copy_types)]
pub fn eval<R: ReadChar>(&mut self, src: Source<'_, R>) -> JsResult<JsValue> {
let main_timer = Profiler::global().start_event("Script evaluation", "Main");
Expand Down Expand Up @@ -467,30 +469,35 @@ impl Context {
self.strict = strict;
}

/// Enqueues a [`NativeJob`] on the [`JobQueue`].
/// Enqueues a [`Job`] on the [`JobExecutor`].
#[inline]
pub fn enqueue_job(&mut self, job: NativeJob) {
self.job_queue().enqueue_promise_job(job, self);
pub fn enqueue_job(&mut self, job: Job) {
self.job_executor().enqueue_job(job, self);
}

/// Runs all the jobs in the job queue.
/// Runs all the jobs with the provided job executor.
#[inline]
pub fn run_jobs(&mut self) {
self.job_queue().run_jobs(self);
pub fn run_jobs(&mut self) -> JsResult<()> {
let result = self.job_executor().run_jobs(self);
self.clear_kept_objects();
result
}

/// Asynchronously runs all the jobs in the job queue.
/// Asynchronously runs all the jobs with the provided job executor.
///
/// # Note
///
/// Concurrent job execution cannot be guaranteed by the engine, since this depends on the
/// specific handling of each [`JobQueue`]. If you want to execute jobs concurrently, you must
/// provide a custom implementor of `JobQueue` to the context.
/// specific handling of each [`JobExecutor`]. If you want to execute jobs concurrently, you must
/// provide a custom implementatin of `JobExecutor` to the context.
#[allow(clippy::future_not_send)]
pub async fn run_jobs_async(&mut self) {
self.job_queue().run_jobs_async(self).await;
pub async fn run_jobs_async(&mut self) -> JsResult<()> {
let result = self
.job_executor()
.run_jobs_async(&RefCell::new(self))
.await;
self.clear_kept_objects();
result
}

/// Abstract operation [`ClearKeptObjects`][clear].
Expand Down Expand Up @@ -546,11 +553,11 @@ impl Context {
self.host_hooks
}

/// Gets the job queue.
/// Gets the job executor.
#[inline]
#[must_use]
pub fn job_queue(&self) -> Rc<dyn JobQueue> {
self.job_queue.clone()
pub fn job_executor(&self) -> Rc<dyn JobExecutor> {
self.job_executor.clone()
}

/// Gets the module loader.
Expand Down Expand Up @@ -881,7 +888,7 @@ impl Context {
pub struct ContextBuilder {
interner: Option<Interner>,
host_hooks: Option<&'static dyn HostHooks>,
job_queue: Option<Rc<dyn JobQueue>>,
job_executor: Option<Rc<dyn JobExecutor>>,
module_loader: Option<Rc<dyn ModuleLoader>>,
can_block: bool,
#[cfg(feature = "intl")]
Expand All @@ -893,7 +900,7 @@ pub struct ContextBuilder {
impl std::fmt::Debug for ContextBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
#[derive(Clone, Copy, Debug)]
struct JobQueue;
struct JobExecutor;
#[derive(Clone, Copy, Debug)]
struct HostHooks;
#[derive(Clone, Copy, Debug)]
Expand All @@ -903,7 +910,10 @@ impl std::fmt::Debug for ContextBuilder {

out.field("interner", &self.interner)
.field("host_hooks", &self.host_hooks.as_ref().map(|_| HostHooks))
.field("job_queue", &self.job_queue.as_ref().map(|_| JobQueue))
.field(
"job_executor",
&self.job_executor.as_ref().map(|_| JobExecutor),
)
.field(
"module_loader",
&self.module_loader.as_ref().map(|_| ModuleLoader),
Expand Down Expand Up @@ -1016,10 +1026,10 @@ impl ContextBuilder {
self
}

/// Initializes the [`JobQueue`] for the context.
/// Initializes the [`JobExecutor`] for the context.
#[must_use]
pub fn job_queue<Q: JobQueue + 'static>(mut self, job_queue: Rc<Q>) -> Self {
self.job_queue = Some(job_queue);
pub fn job_executor<Q: JobExecutor + 'static>(mut self, job_executor: Rc<Q>) -> Self {
self.job_executor = Some(job_executor);
self
}

Expand Down Expand Up @@ -1090,9 +1100,9 @@ impl ContextBuilder {
Rc::new(IdleModuleLoader)
};

let job_queue = self
.job_queue
.unwrap_or_else(|| Rc::new(SimpleJobQueue::new()));
let job_executor = self
.job_executor
.unwrap_or_else(|| Rc::new(SimpleJobExecutor::new()));

let mut context = Context {
interner: self.interner.unwrap_or_default(),
Expand All @@ -1119,7 +1129,7 @@ impl ContextBuilder {
instructions_remaining: self.instructions_remaining,
kept_alive: Vec::new(),
host_hooks,
job_queue,
job_executor,
module_loader,
optimizer_options: OptimizerOptions::OPTIMIZE_ALL,
root_shape,
Expand Down
Loading
Loading