Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Revamp JobQueue into JobExecutor
Browse files Browse the repository at this point in the history
jedel1043 committed Jan 11, 2025
1 parent a4df06b commit f2abc2a
Showing 9 changed files with 373 additions and 308 deletions.
28 changes: 14 additions & 14 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -13,7 +13,7 @@ mod helper;
use boa_engine::{
builtins::promise::PromiseState,
context::ContextBuilder,
job::{JobQueue, NativeAsyncJob, NativeJob},
job::{Job, JobExecutor, NativeAsyncJob, PromiseJob},
module::{Module, SimpleModuleLoader},
optimizer::OptimizerOptions,
script::Script,
@@ -336,10 +336,10 @@ fn main() -> Result<()> {

let args = Opt::parse();

let queue = Rc::new(Jobs::default());
let executor = Rc::new(Executor::default());
let loader = Rc::new(SimpleModuleLoader::new(&args.root).map_err(|e| eyre!(e.to_string()))?);
let mut context = ContextBuilder::new()
.job_queue(queue)
.job_executor(executor)
.module_loader(loader.clone())
.build()
.map_err(|e| eyre!(e.to_string()))?;
@@ -453,31 +453,31 @@ fn add_runtime(context: &mut Context) {
}

#[derive(Default)]
struct Jobs {
jobs: RefCell<VecDeque<NativeJob>>,
struct Executor {
promise_jobs: RefCell<VecDeque<PromiseJob>>,
async_jobs: RefCell<VecDeque<NativeAsyncJob>>,
}

impl JobQueue for Jobs {
fn enqueue_job(&self, job: NativeJob, _: &mut Context) {
self.jobs.borrow_mut().push_back(job);
}

fn enqueue_async_job(&self, async_job: NativeAsyncJob, _: &mut Context) {
self.async_jobs.borrow_mut().push_back(async_job);
impl JobExecutor for Executor {
fn enqueue_job(&self, job: Job, _: &mut Context) {
match job {
Job::PromiseJob(job) => self.promise_jobs.borrow_mut().push_back(job),
Job::AsyncJob(job) => self.async_jobs.borrow_mut().push_back(job),
job => eprintln!("unsupported job type {job:?}"),
}
}

fn run_jobs(&self, context: &mut Context) {
loop {
if self.jobs.borrow().is_empty() && self.async_jobs.borrow().is_empty() {
if self.promise_jobs.borrow().is_empty() && self.async_jobs.borrow().is_empty() {
return;
}
let async_jobs = std::mem::take(&mut *self.async_jobs.borrow_mut());
for async_job in async_jobs {
if let Err(err) = pollster::block_on(async_job.call(&RefCell::new(context))) {
eprintln!("Uncaught {err}");
}
let jobs = std::mem::take(&mut *self.jobs.borrow_mut());
let jobs = std::mem::take(&mut *self.promise_jobs.borrow_mut());
for job in jobs {
if let Err(e) = job.call(context) {
eprintln!("Uncaught {e}");
22 changes: 13 additions & 9 deletions core/engine/src/builtins/promise/mod.rs
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@ use crate::{
builtins::{Array, BuiltInObject},
context::intrinsics::{Intrinsics, StandardConstructor, StandardConstructors},
error::JsNativeError,
job::{JobCallback, NativeJob},
job::{JobCallback, PromiseJob},
js_string,
native_function::NativeFunction,
object::{
@@ -1887,7 +1887,9 @@ impl Promise {
new_promise_reaction_job(fulfill_reaction, value.clone(), context);

// c. Perform HostEnqueuePromiseJob(fulfillJob.[[Job]], fulfillJob.[[Realm]]).
context.job_queue().enqueue_job(fulfill_job, context);
context
.job_executor()
.enqueue_job(fulfill_job.into(), context);
}

// 11. Else,
@@ -1907,7 +1909,9 @@ impl Promise {
let reject_job = new_promise_reaction_job(reject_reaction, reason.clone(), context);

// e. Perform HostEnqueuePromiseJob(rejectJob.[[Job]], rejectJob.[[Realm]]).
context.job_queue().enqueue_job(reject_job, context);
context
.job_executor()
.enqueue_job(reject_job.into(), context);

// 12. Set promise.[[PromiseIsHandled]] to true.
promise
@@ -1983,7 +1987,7 @@ impl Promise {
let job = new_promise_reaction_job(reaction, argument.clone(), context);

// b. Perform HostEnqueuePromiseJob(job.[[Job]], job.[[Realm]]).
context.job_queue().enqueue_job(job, context);
context.job_executor().enqueue_job(job.into(), context);
}
// 2. Return unused.
}
@@ -2176,7 +2180,7 @@ impl Promise {
);

// 15. Perform HostEnqueuePromiseJob(job.[[Job]], job.[[Realm]]).
context.job_queue().enqueue_job(job, context);
context.job_executor().enqueue_job(job.into(), context);

// 16. Return undefined.
Ok(JsValue::undefined())
@@ -2237,7 +2241,7 @@ fn new_promise_reaction_job(
mut reaction: ReactionRecord,
argument: JsValue,
context: &mut Context,
) -> NativeJob {
) -> PromiseJob {
// Inverting order since `job` captures `reaction` by value.

// 2. Let handlerRealm be null.
@@ -2318,7 +2322,7 @@ fn new_promise_reaction_job(
};

// 4. Return the Record { [[Job]]: job, [[Realm]]: handlerRealm }.
NativeJob::with_realm(job, realm, context)
PromiseJob::with_realm(job, realm, context)
}

/// More information:
@@ -2330,7 +2334,7 @@ fn new_promise_resolve_thenable_job(
thenable: JsValue,
then: JobCallback,
context: &mut Context,
) -> NativeJob {
) -> PromiseJob {
// Inverting order since `job` captures variables by value.

// 2. Let getThenRealmResult be Completion(GetFunctionRealm(then.[[Callback]])).
@@ -2372,5 +2376,5 @@ fn new_promise_resolve_thenable_job(
};

// 6. Return the Record { [[Job]]: job, [[Realm]]: thenRealm }.
NativeJob::with_realm(job, realm, context)
PromiseJob::with_realm(job, realm, context)
}
65 changes: 32 additions & 33 deletions core/engine/src/context/mod.rs
Original file line number Diff line number Diff line change
@@ -14,12 +14,12 @@ use intrinsics::Intrinsics;
#[cfg(feature = "temporal")]
use temporal_rs::tzdb::FsTzdbProvider;

use crate::job::NativeAsyncJob;
use crate::job::Job;
use crate::vm::RuntimeLimits;
use crate::{
builtins,
class::{Class, ClassBuilder},
job::{JobQueue, NativeJob, SimpleJobQueue},
job::{JobExecutor, SimpleJobExecutor},
js_string,
module::{IdleModuleLoader, ModuleLoader, SimpleModuleLoader},
native_function::NativeFunction,
@@ -113,7 +113,7 @@ pub struct Context {

host_hooks: &'static dyn HostHooks,

job_queue: Rc<dyn JobQueue>,
job_executor: Rc<dyn JobExecutor>,

module_loader: Rc<dyn ModuleLoader>,

@@ -135,7 +135,7 @@ impl std::fmt::Debug for Context {
.field("interner", &self.interner)
.field("vm", &self.vm)
.field("strict", &self.strict)
.field("promise_job_queue", &"JobQueue")
.field("job_executor", &"JobExecutor")
.field("hooks", &"HostHooks")
.field("module_loader", &"ModuleLoader")
.field("optimizer_options", &self.optimizer_options);
@@ -188,7 +188,7 @@ impl Context {
/// ```
///
/// Note that this won't run any scheduled promise jobs; you need to call [`Context::run_jobs`]
/// on the context or [`JobQueue::run_jobs`] on the provided queue to run them.
/// on the context or [`JobExecutor::run_jobs`] on the provided queue to run them.
#[allow(clippy::unit_arg, dropping_copy_types)]
pub fn eval<R: ReadChar>(&mut self, src: Source<'_, R>) -> JsResult<JsValue> {
let main_timer = Profiler::global().start_event("Script evaluation", "Main");
@@ -469,35 +469,31 @@ impl Context {
self.strict = strict;
}

/// Enqueues a [`NativeJob`] on the [`JobQueue`].
/// Enqueues a [`Job`] on the [`JobExecutor`].
#[inline]
pub fn enqueue_job(&mut self, job: NativeJob) {
self.job_queue().enqueue_job(job, self);
pub fn enqueue_job(&mut self, job: Job) {
self.job_executor().enqueue_job(job, self);
}

/// Enqueues a [`NativeAsyncJob`] on the [`JobQueue`].
#[inline]
pub fn enqueue_async_job(&mut self, job: NativeAsyncJob) {
self.job_queue().enqueue_async_job(job, self);
}

/// Runs all the jobs in the job queue.
/// Runs all the jobs with the provided job executor.
#[inline]
pub fn run_jobs(&mut self) {
self.job_queue().run_jobs(self);
self.job_executor().run_jobs(self);
self.clear_kept_objects();
}

/// Asynchronously runs all the jobs in the job queue.
/// Asynchronously runs all the jobs with the provided job executor.
///
/// # Note
///
/// Concurrent job execution cannot be guaranteed by the engine, since this depends on the
/// specific handling of each [`JobQueue`]. If you want to execute jobs concurrently, you must
/// provide a custom implementor of `JobQueue` to the context.
/// specific handling of each [`JobExecutor`]. If you want to execute jobs concurrently, you must
/// provide a custom implementatin of `JobExecutor` to the context.
#[allow(clippy::future_not_send)]
pub async fn run_jobs_async(&mut self) {
self.job_queue().run_jobs_async(&RefCell::new(self)).await;
self.job_executor()
.run_jobs_async(&RefCell::new(self))
.await;
self.clear_kept_objects();
}

@@ -554,11 +550,11 @@ impl Context {
self.host_hooks
}

/// Gets the job queue.
/// Gets the job executor.
#[inline]
#[must_use]
pub fn job_queue(&self) -> Rc<dyn JobQueue> {
self.job_queue.clone()
pub fn job_executor(&self) -> Rc<dyn JobExecutor> {
self.job_executor.clone()
}

/// Gets the module loader.
@@ -889,7 +885,7 @@ impl Context {
pub struct ContextBuilder {
interner: Option<Interner>,
host_hooks: Option<&'static dyn HostHooks>,
job_queue: Option<Rc<dyn JobQueue>>,
job_executor: Option<Rc<dyn JobExecutor>>,
module_loader: Option<Rc<dyn ModuleLoader>>,
can_block: bool,
#[cfg(feature = "intl")]
@@ -901,7 +897,7 @@ pub struct ContextBuilder {
impl std::fmt::Debug for ContextBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
#[derive(Clone, Copy, Debug)]
struct JobQueue;
struct JobExecutor;
#[derive(Clone, Copy, Debug)]
struct HostHooks;
#[derive(Clone, Copy, Debug)]
@@ -911,7 +907,10 @@ impl std::fmt::Debug for ContextBuilder {

out.field("interner", &self.interner)
.field("host_hooks", &self.host_hooks.as_ref().map(|_| HostHooks))
.field("job_queue", &self.job_queue.as_ref().map(|_| JobQueue))
.field(
"job_executor",
&self.job_executor.as_ref().map(|_| JobExecutor),
)
.field(
"module_loader",
&self.module_loader.as_ref().map(|_| ModuleLoader),
@@ -1024,10 +1023,10 @@ impl ContextBuilder {
self
}

/// Initializes the [`JobQueue`] for the context.
/// Initializes the [`JobExecutor`] for the context.
#[must_use]
pub fn job_queue<Q: JobQueue + 'static>(mut self, job_queue: Rc<Q>) -> Self {
self.job_queue = Some(job_queue);
pub fn job_executor<Q: JobExecutor + 'static>(mut self, job_executor: Rc<Q>) -> Self {
self.job_executor = Some(job_executor);
self
}

@@ -1098,9 +1097,9 @@ impl ContextBuilder {
Rc::new(IdleModuleLoader)
};

let job_queue = self
.job_queue
.unwrap_or_else(|| Rc::new(SimpleJobQueue::new()));
let job_executor = self
.job_executor
.unwrap_or_else(|| Rc::new(SimpleJobExecutor::new()));

let mut context = Context {
interner: self.interner.unwrap_or_default(),
@@ -1127,7 +1126,7 @@ impl ContextBuilder {
instructions_remaining: self.instructions_remaining,
kept_alive: Vec::new(),
host_hooks,
job_queue,
job_executor,
module_loader,
optimizer_options: OptimizerOptions::OPTIMIZE_ALL,
root_shape,
297 changes: 175 additions & 122 deletions core/engine/src/job.rs

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions core/engine/src/object/builtins/jspromise.rs
Original file line number Diff line number Diff line change
@@ -292,7 +292,7 @@ impl JsPromise {
{
let (promise, resolvers) = Self::new_pending(context);

context.job_queue().enqueue_async_job(
context.enqueue_job(
NativeAsyncJob::new(move |context| {
Box::pin(async move {
let result = future.await;
@@ -306,8 +306,8 @@ impl JsPromise {
}
}
})
}),
context,
})
.into(),
);

promise
@@ -1085,7 +1085,7 @@ impl JsPromise {

/// Run jobs until this promise is resolved or rejected. This could
/// result in an infinite loop if the promise is never resolved or
/// rejected (e.g. with a [`boa_engine::job::JobQueue`] that does
/// rejected (e.g. with a [`boa_engine::job::JobExecutor`] that does
/// not prioritize properly). If you need more control over how
/// the promise handles timing out, consider using
/// [`Context::run_jobs`] directly.
4 changes: 2 additions & 2 deletions core/engine/src/script.rs
Original file line number Diff line number Diff line change
@@ -160,9 +160,9 @@ impl Script {
/// Evaluates this script and returns its result.
///
/// Note that this won't run any scheduled promise jobs; you need to call [`Context::run_jobs`]
/// on the context or [`JobQueue::run_jobs`] on the provided queue to run them.
/// on the context or [`JobExecutor::run_jobs`] on the provided queue to run them.
///
/// [`JobQueue::run_jobs`]: crate::job::JobQueue::run_jobs
/// [`JobExecutor::run_jobs`]: crate::job::JobExecutor::run_jobs
pub fn evaluate(&self, context: &mut Context) -> JsResult<JsValue> {
let _timer = Profiler::global().start_event("Execution", "Main");

135 changes: 69 additions & 66 deletions examples/src/bin/module_fetch_async.rs
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@ use std::{cell::RefCell, collections::VecDeque, future::Future, pin::Pin, rc::Rc

use boa_engine::{
builtins::promise::PromiseState,
job::{JobQueue, NativeAsyncJob, NativeJob},
job::{Job, JobExecutor, NativeAsyncJob, PromiseJob},
js_string,
module::ModuleLoader,
Context, JsNativeError, JsResult, JsString, JsValue, Module,
@@ -29,58 +29,61 @@ impl ModuleLoader for HttpModuleLoader {
let url = specifier.to_std_string_escaped();

// Just enqueue the future for now. We'll advance all the enqueued futures inside our custom
// `JobQueue`.
context.enqueue_async_job(NativeAsyncJob::with_realm(
move |context| {
Box::pin(async move {
// Adding some prints to show the non-deterministic nature of the async fetches.
// Try to run the example several times to see how sometimes the fetches start in order
// but finish in disorder.
println!("Fetching `{url}`...");

// This could also retry fetching in case there's an error while requesting the module.
let body: Result<_, isahc::Error> = async {
let mut response = Request::get(&url)
.redirect_policy(RedirectPolicy::Limit(5))
.body(())?
.send_async()
.await?;

Ok(response.text().await?)
}
.await;

println!("Finished fetching `{url}`");

let body = match body {
Ok(body) => body,
Err(err) => {
// On error we always call `finish_load` to notify the load promise about the
// error.
finish_load(
Err(JsNativeError::typ().with_message(err.to_string()).into()),
&mut context.borrow_mut(),
);

// Just returns anything to comply with `NativeAsyncJob::new`'s signature.
return Ok(JsValue::undefined());
// `JobExecutor`.
context.enqueue_job(
NativeAsyncJob::with_realm(
move |context| {
Box::pin(async move {
// Adding some prints to show the non-deterministic nature of the async fetches.
// Try to run the example several times to see how sometimes the fetches start in order
// but finish in disorder.
println!("Fetching `{url}`...");

// This could also retry fetching in case there's an error while requesting the module.
let body: Result<_, isahc::Error> = async {
let mut response = Request::get(&url)
.redirect_policy(RedirectPolicy::Limit(5))
.body(())?
.send_async()
.await?;

Ok(response.text().await?)
}
};

// Could also add a path if needed.
let source = Source::from_bytes(body.as_bytes());

let module = Module::parse(source, None, &mut context.borrow_mut());

// We don't do any error handling, `finish_load` takes care of that for us.
finish_load(module, &mut context.borrow_mut());

// Also needed to match `NativeAsyncJob::new`.
Ok(JsValue::undefined())
})
},
context.realm().clone(),
));
.await;

println!("Finished fetching `{url}`");

let body = match body {
Ok(body) => body,
Err(err) => {
// On error we always call `finish_load` to notify the load promise about the
// error.
finish_load(
Err(JsNativeError::typ().with_message(err.to_string()).into()),
&mut context.borrow_mut(),
);

// Just returns anything to comply with `NativeAsyncJob::new`'s signature.
return Ok(JsValue::undefined());
}
};

// Could also add a path if needed.
let source = Source::from_bytes(body.as_bytes());

let module = Module::parse(source, None, &mut context.borrow_mut());

// We don't do any error handling, `finish_load` takes care of that for us.
finish_load(module, &mut context.borrow_mut());

// Also needed to match `NativeAsyncJob::new`.
Ok(JsValue::undefined())
})
},
context.realm().clone(),
)
.into(),
);
}
}

@@ -108,7 +111,7 @@ fn main() -> JsResult<()> {
"#;

let context = &mut Context::builder()
.job_queue(Rc::new(Queue::new()))
.job_executor(Rc::new(Queue::new()))
// NEW: sets the context module loader to our custom loader
.module_loader(Rc::new(HttpModuleLoader))
.build()?;
@@ -169,20 +172,20 @@ fn main() -> JsResult<()> {
// Taken from the `smol_event_loop.rs` example.
/// An event queue using smol to drive futures to completion.
struct Queue {
async_jobs: RefCell<Vec<NativeAsyncJob>>,
jobs: RefCell<VecDeque<NativeJob>>,
async_jobs: RefCell<VecDeque<NativeAsyncJob>>,
promise_jobs: RefCell<VecDeque<PromiseJob>>,
}

impl Queue {
fn new() -> Self {
Self {
async_jobs: RefCell::default(),
jobs: RefCell::default(),
promise_jobs: RefCell::default(),
}
}

fn drain_jobs(&self, context: &mut Context) {
let jobs = std::mem::take(&mut *self.jobs.borrow_mut());
let jobs = std::mem::take(&mut *self.promise_jobs.borrow_mut());
for job in jobs {
if let Err(e) = job.call(context) {
eprintln!("Uncaught {e}");
@@ -191,13 +194,13 @@ impl Queue {
}
}

impl JobQueue for Queue {
fn enqueue_job(&self, job: NativeJob, _context: &mut Context) {
self.jobs.borrow_mut().push_back(job);
}

fn enqueue_async_job(&self, async_job: NativeAsyncJob, _context: &mut Context) {
self.async_jobs.borrow_mut().push(async_job);
impl JobExecutor for Queue {
fn enqueue_job(&self, job: Job, _context: &mut Context) {
match job {
Job::PromiseJob(job) => self.promise_jobs.borrow_mut().push_back(job),
Job::AsyncJob(job) => self.async_jobs.borrow_mut().push_back(job),
_ => panic!("unsupported job type"),
}
}

// While the sync flavor of `run_jobs` will block the current thread until all the jobs have finished...
@@ -216,7 +219,7 @@ impl JobQueue for Queue {
{
Box::pin(async move {
// Early return in case there were no jobs scheduled.
if self.jobs.borrow().is_empty() && self.async_jobs.borrow().is_empty() {
if self.promise_jobs.borrow().is_empty() && self.async_jobs.borrow().is_empty() {
return;
}
let mut group = FutureGroup::new();
@@ -225,7 +228,7 @@ impl JobQueue for Queue {
group.insert(job.call(context));
}

if self.jobs.borrow().is_empty() {
if self.promise_jobs.borrow().is_empty() {
let Some(result) = group.next().await else {
// Both queues are empty. We can exit.
return;
61 changes: 32 additions & 29 deletions examples/src/bin/smol_event_loop.rs
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ use std::{

use boa_engine::{
context::ContextBuilder,
job::{JobQueue, NativeAsyncJob, NativeJob},
job::{Job, JobExecutor, NativeAsyncJob, PromiseJob},
js_string,
native_function::NativeFunction,
property::Attribute,
@@ -35,19 +35,19 @@ fn main() {
/// An event queue using smol to drive futures to completion.
struct Queue {
async_jobs: RefCell<VecDeque<NativeAsyncJob>>,
jobs: RefCell<VecDeque<NativeJob>>,
promise_jobs: RefCell<VecDeque<PromiseJob>>,
}

impl Queue {
fn new() -> Self {
Self {
async_jobs: RefCell::default(),
jobs: RefCell::default(),
promise_jobs: RefCell::default(),
}
}

fn drain_jobs(&self, context: &mut Context) {
let jobs = std::mem::take(&mut *self.jobs.borrow_mut());
let jobs = std::mem::take(&mut *self.promise_jobs.borrow_mut());
for job in jobs {
if let Err(e) = job.call(context) {
eprintln!("Uncaught {e}");
@@ -56,13 +56,13 @@ impl Queue {
}
}

impl JobQueue for Queue {
fn enqueue_job(&self, job: NativeJob, _context: &mut Context) {
self.jobs.borrow_mut().push_back(job);
}

fn enqueue_async_job(&self, async_job: NativeAsyncJob, _context: &mut Context) {
self.async_jobs.borrow_mut().push_back(async_job);
impl JobExecutor for Queue {
fn enqueue_job(&self, job: Job, _context: &mut Context) {
match job {
Job::PromiseJob(job) => self.promise_jobs.borrow_mut().push_back(job),
Job::AsyncJob(job) => self.async_jobs.borrow_mut().push_back(job),
_ => panic!("unsupported job type"),
}
}

// While the sync flavor of `run_jobs` will block the current thread until all the jobs have finished...
@@ -81,7 +81,7 @@ impl JobQueue for Queue {
{
Box::pin(async move {
// Early return in case there were no jobs scheduled.
if self.jobs.borrow().is_empty() && self.async_jobs.borrow().is_empty() {
if self.promise_jobs.borrow().is_empty() && self.async_jobs.borrow().is_empty() {
return;
}
let mut group = FutureGroup::new();
@@ -90,7 +90,7 @@ impl JobQueue for Queue {
group.insert(job.call(context));
}

if self.jobs.borrow().is_empty() {
if self.promise_jobs.borrow().is_empty() {
let Some(result) = group.next().await else {
// Both queues are empty. We can exit.
return;
@@ -153,21 +153,24 @@ fn interval(this: &JsValue, args: &[JsValue], context: &mut Context) -> JsResult
let delay = args.get_or_undefined(1).to_u32(context)?;
let args = args.get(2..).unwrap_or_default().to_vec();

context.enqueue_async_job(NativeAsyncJob::with_realm(
move |context| {
Box::pin(async move {
let mut timer = smol::Timer::interval(Duration::from_millis(u64::from(delay)));
for _ in 0..10 {
timer.next().await;
if let Err(err) = function.call(&this, &args, &mut context.borrow_mut()) {
eprintln!("Uncaught {err}");
context.enqueue_job(
NativeAsyncJob::with_realm(
move |context| {
Box::pin(async move {
let mut timer = smol::Timer::interval(Duration::from_millis(u64::from(delay)));
for _ in 0..10 {
timer.next().await;
if let Err(err) = function.call(&this, &args, &mut context.borrow_mut()) {
eprintln!("Uncaught {err}");
}
}
}
Ok(JsValue::undefined())
})
},
context.realm().clone(),
));
Ok(JsValue::undefined())
})
},
context.realm().clone(),
)
.into(),
);

Ok(JsValue::undefined())
}
@@ -233,7 +236,7 @@ fn internally_async_event_loop() {
// Initialize the queue and the context
let queue = Queue::new();
let context = &mut ContextBuilder::new()
.job_queue(Rc::new(queue))
.job_executor(Rc::new(queue))
.build()
.unwrap();

@@ -262,7 +265,7 @@ fn externally_async_event_loop() {
// Initialize the queue and the context
let queue = Queue::new();
let context = &mut ContextBuilder::new()
.job_queue(Rc::new(queue))
.job_executor(Rc::new(queue))
.build()
.unwrap();

61 changes: 32 additions & 29 deletions examples/src/bin/tokio_event_loop.rs
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ use std::{

use boa_engine::{
context::ContextBuilder,
job::{JobQueue, NativeAsyncJob, NativeJob},
job::{Job, JobExecutor, NativeAsyncJob, PromiseJob},
js_string,
native_function::NativeFunction,
property::Attribute,
@@ -35,19 +35,19 @@ fn main() {
/// An event queue using tokio to drive futures to completion.
struct Queue {
async_jobs: RefCell<VecDeque<NativeAsyncJob>>,
jobs: RefCell<VecDeque<NativeJob>>,
promise_jobs: RefCell<VecDeque<PromiseJob>>,
}

impl Queue {
fn new() -> Self {
Self {
async_jobs: RefCell::default(),
jobs: RefCell::default(),
promise_jobs: RefCell::default(),
}
}

fn drain_jobs(&self, context: &mut Context) {
let jobs = std::mem::take(&mut *self.jobs.borrow_mut());
let jobs = std::mem::take(&mut *self.promise_jobs.borrow_mut());
for job in jobs {
if let Err(e) = job.call(context) {
eprintln!("Uncaught {e}");
@@ -56,13 +56,13 @@ impl Queue {
}
}

impl JobQueue for Queue {
fn enqueue_job(&self, job: NativeJob, _context: &mut Context) {
self.jobs.borrow_mut().push_back(job);
}

fn enqueue_async_job(&self, async_job: NativeAsyncJob, _context: &mut Context) {
self.async_jobs.borrow_mut().push_back(async_job);
impl JobExecutor for Queue {
fn enqueue_job(&self, job: Job, _context: &mut Context) {
match job {
Job::PromiseJob(job) => self.promise_jobs.borrow_mut().push_back(job),
Job::AsyncJob(job) => self.async_jobs.borrow_mut().push_back(job),
_ => panic!("unsupported job type"),
}
}

// While the sync flavor of `run_jobs` will block the current thread until all the jobs have finished...
@@ -86,7 +86,7 @@ impl JobQueue for Queue {
{
Box::pin(async move {
// Early return in case there were no jobs scheduled.
if self.jobs.borrow().is_empty() && self.async_jobs.borrow().is_empty() {
if self.promise_jobs.borrow().is_empty() && self.async_jobs.borrow().is_empty() {
return;
}
let mut group = FutureGroup::new();
@@ -95,7 +95,7 @@ impl JobQueue for Queue {
group.insert(job.call(context));
}

if self.jobs.borrow().is_empty() {
if self.promise_jobs.borrow().is_empty() {
let Some(result) = group.next().await else {
// Both queues are empty. We can exit.
return;
@@ -161,21 +161,24 @@ fn interval(this: &JsValue, args: &[JsValue], context: &mut Context) -> JsResult
let delay = args.get_or_undefined(1).to_u32(context)?;
let args = args.get(2..).unwrap_or_default().to_vec();

context.enqueue_async_job(NativeAsyncJob::with_realm(
move |context| {
Box::pin(async move {
let mut timer = time::interval(Duration::from_millis(u64::from(delay)));
for _ in 0..10 {
timer.tick().await;
if let Err(err) = function.call(&this, &args, &mut context.borrow_mut()) {
eprintln!("Uncaught {err}");
context.enqueue_job(
NativeAsyncJob::with_realm(
move |context| {
Box::pin(async move {
let mut timer = time::interval(Duration::from_millis(u64::from(delay)));
for _ in 0..10 {
timer.tick().await;
if let Err(err) = function.call(&this, &args, &mut context.borrow_mut()) {
eprintln!("Uncaught {err}");
}
}
}
Ok(JsValue::undefined())
})
},
context.realm().clone(),
));
Ok(JsValue::undefined())
})
},
context.realm().clone(),
)
.into(),
);

Ok(JsValue::undefined())
}
@@ -241,7 +244,7 @@ fn internally_async_event_loop() {
// Initialize the queue and the context
let queue = Queue::new();
let context = &mut ContextBuilder::new()
.job_queue(Rc::new(queue))
.job_executor(Rc::new(queue))
.build()
.unwrap();

@@ -268,7 +271,7 @@ async fn externally_async_event_loop() {
// Initialize the queue and the context
let queue = Queue::new();
let context = &mut ContextBuilder::new()
.job_queue(Rc::new(queue))
.job_executor(Rc::new(queue))
.build()
.unwrap();

0 comments on commit f2abc2a

Please sign in to comment.