Skip to content

Commit

Permalink
gpu: Remove Arcs no longer necessary.
Browse files Browse the repository at this point in the history
`wgpu` types now implement `Clone` to the same effect.

The one special case is that the polling system I previously had relied
on having a `Weak<wgpu::Device>`. I replaced it with a new design where
an explicit guard object is required to keep polling going. As a minor
theoretical benefit, it now stops polling in cases where the `Device`
still exists and has activity but explicit polling is not wanted.
That never happens in current practice, though.
  • Loading branch information
kpreid committed Jan 16, 2025
1 parent 11f6a41 commit 46a38a2
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 96 deletions.
4 changes: 2 additions & 2 deletions all-is-cubes-gpu/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ all-is-cubes-mesh = { workspace = true, features = ["dynamic"] }
all-is-cubes-render = { workspace = true, features = ["raytracer"] }
bytemuck = { workspace = true, features = ["derive"] }
cfg-if = { workspace = true }
# Used to implement ensure_polled(). Also a `wgpu` dependency already.
# Used to implement start_polling(). Also a `wgpu` dependency already.
flume = { workspace = true }
futures-channel = { workspace = true, features = ["sink"] }
futures-core = { workspace = true }
Expand All @@ -69,7 +69,7 @@ half = { workspace = true, features = ["bytemuck"] }
itertools = { workspace = true }
log = { workspace = true }
num-traits = { workspace = true }
# Used to implement ensure_polled on non-Wasm targets, and in the `rerun` support.
# Used to implement start_polling() on non-Wasm targets, and in the `rerun` support.
pollster = { workspace = true }
rand = { workspace = true }
rand_xoshiro = { workspace = true }
Expand Down
10 changes: 4 additions & 6 deletions all-is-cubes-gpu/src/in_wgpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ pub fn device_descriptor(
#[derive(Debug)]
pub struct SurfaceRenderer<I: time::Instant> {
surface: wgpu::Surface<'static>,
device: Arc<wgpu::Device>,
device: wgpu::Device,
queue: wgpu::Queue,

everything: EverythingRenderer<I>,
Expand Down Expand Up @@ -134,8 +134,6 @@ impl<I: time::Instant> SurfaceRenderer<I> {
None,
);
let (device, queue) = request_device_future.await?;
#[cfg_attr(target_family = "wasm", expect(clippy::arc_with_non_send_sync))]
let device = Arc::new(device);

let viewport_source = cameras.viewport_source();
let everything = EverythingRenderer::new(
Expand All @@ -156,7 +154,7 @@ impl<I: time::Instant> SurfaceRenderer<I> {
}

/// Returns a clonable handle to the device this renderer owns.
pub fn device(&self) -> &Arc<wgpu::Device> {
pub fn device(&self) -> &wgpu::Device {
&self.device
}

Expand Down Expand Up @@ -271,7 +269,7 @@ impl<I: time::Instant> SurfaceRenderer<I> {
struct EverythingRenderer<I: time::Instant> {
executor: Arc<dyn Executor>,

device: Arc<wgpu::Device>,
device: wgpu::Device,

staging_belt: wgpu::util::StagingBelt,

Expand Down Expand Up @@ -369,7 +367,7 @@ impl<I: time::Instant> EverythingRenderer<I> {

pub fn new(
executor: Arc<dyn Executor>,
device: Arc<wgpu::Device>,
device: wgpu::Device,
cameras: StandardCameras,
surface_format: wgpu::TextureFormat,
adapter: &wgpu::Adapter,
Expand Down
17 changes: 8 additions & 9 deletions all-is-cubes-gpu/src/in_wgpu/headless.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@ use crate::in_wgpu::{self, init};
#[derive(Clone, Debug)]
pub struct Builder {
executor: Arc<dyn Executor>,
adapter: Arc<wgpu::Adapter>,
device: Arc<wgpu::Device>,
queue: Arc<wgpu::Queue>,
adapter: wgpu::Adapter,
device: wgpu::Device,
queue: wgpu::Queue,
}

impl Builder {
/// Create a [`Builder`] by obtaining a new [`wgpu::Device`] from the given adapter.
#[cfg_attr(target_family = "wasm", expect(clippy::arc_with_non_send_sync))]
pub async fn from_adapter(
label: &str,
adapter: wgpu::Adapter,
Expand All @@ -43,9 +42,9 @@ impl Builder {
)
.await?;
Ok(Self {
device: Arc::new(device),
queue: Arc::new(queue),
adapter: Arc::new(adapter),
device,
queue,
adapter,
executor: Arc::new(()),
})
}
Expand Down Expand Up @@ -100,8 +99,8 @@ pub struct Renderer {
/// Internals of [`Renderer`] to actually do the rendering.
#[derive(Debug)]
struct RendererImpl {
device: Arc<wgpu::Device>,
queue: Arc<wgpu::Queue>,
device: wgpu::Device,
queue: wgpu::Queue,
color_texture: wgpu::Texture,
everything: super::EverythingRenderer<AdaptedInstant>,
viewport_source: listen::DynSource<Viewport>,
Expand Down
12 changes: 6 additions & 6 deletions all-is-cubes-gpu/src/in_wgpu/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
use std::future::Future;
use std::io::Write as _;
use std::sync::Arc;

use all_is_cubes::euclid::Size3D;
use all_is_cubes_render::{camera, Flaws, Rendering};
Expand Down Expand Up @@ -147,7 +146,7 @@ fn shortened_adapter_info(info: &wgpu::AdapterInfo) -> String {
/// Panics if the pixel type or viewport size are incorrect.
#[doc(hidden)]
pub fn get_image_from_gpu(
device: &Arc<wgpu::Device>,
device: &wgpu::Device,
queue: &wgpu::Queue,
texture: &wgpu::Texture,
flaws: Flaws,
Expand Down Expand Up @@ -180,7 +179,7 @@ pub fn get_image_from_gpu(
/// Panics if the provided sizes are incorrect.
#[doc(hidden)]
pub fn get_texels_from_gpu<C>(
device: &Arc<wgpu::Device>,
device: &wgpu::Device,
queue: &wgpu::Queue,
texture: &wgpu::Texture,
components: usize,
Expand All @@ -193,7 +192,7 @@ where

let tc = TextureCopyParameters::from_texture(texture);
let temp_buffer = tc.copy_texture_to_new_buffer(device, queue, texture);
let buffer_mapped_future = map_really_async(device, &temp_buffer);
let buffer_mapped_future = map_really_async(device.clone(), &temp_buffer);

// Await the buffer being available and build the image.
async move {
Expand All @@ -211,16 +210,17 @@ where

#[doc(hidden)]
pub fn map_really_async(
device: &Arc<wgpu::Device>,
device: wgpu::Device,
buffer: &wgpu::Buffer,
) -> impl Future<Output = Result<(), wgpu::BufferAsyncError>> {
let (sender, receiver) = futures_channel::oneshot::channel();
buffer.slice(..).map_async(wgpu::MapMode::Read, |result| {
let _ = sender.send(result);
});
super::poll::ensure_polled(Arc::downgrade(device));
let poller = super::poll::start_polling(device);

async move {
let _poller = poller;
receiver
.await
.expect("map_async callback was dropped without call")
Expand Down
114 changes: 50 additions & 64 deletions all-is-cubes-gpu/src/in_wgpu/poll.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,37 @@
use std::collections::HashSet;
use std::sync::{OnceLock, Weak};
use std::collections::HashMap;
use std::sync::{Arc, OnceLock, Weak};

use cfg_if::cfg_if;
#[cfg(target_family = "wasm")]
use futures_util::StreamExt as _;

/// Start polling the given [`wgpu::Device`] until it is dropped or its queue is empty,
/// as reported by [`wgpu::Device::poll()`], to ensure that callbacks are invoked promptly.
///
/// This should be called immediately *after* each operation of interest, such as
/// [`wgpu::BufferView::map_async()`].
/// Start polling the given [`wgpu::Device`] in the background until the returned guard is dropped.
///
/// Calling this function again with the same device will not create redundant work.
/// It may block briefly if demand is high or if a callback function takes a long time
/// (which they should avoid).
pub(crate) fn ensure_polled(device: Weak<wgpu::Device>) {
pub(crate) fn start_polling(device: wgpu::Device) -> Poller {
let poller = Poller(Arc::new(device));

// (The wgpu docs say polling is automatic “on web” but they mean WebGPU, not WebGL,
// so we need to do this on all platforms.)
inner::ensure_polled(device)
inner::send_to_poller_task(Arc::downgrade(&poller.0));

poller
}

/// As long as this guard is not dropped,
/// the contained [`wgpu::Device`] will be polled periodically.
#[derive(Debug)]
pub(crate) struct Poller(Arc<wgpu::Device>);

#[cfg(not(target_family = "wasm"))]
mod inner {
use super::*;

static POLLER_CHANNEL: OnceLock<flume::Sender<Weak<wgpu::Device>>> = OnceLock::new();

pub(super) fn ensure_polled(device: Weak<wgpu::Device>) {
pub(super) fn send_to_poller_task(device: Weak<wgpu::Device>) {
POLLER_CHANNEL
.get_or_init(init_poller_task)
.send(device)
Expand Down Expand Up @@ -58,7 +63,7 @@ mod inner {
const { OnceLock::new() };
}

pub(super) fn ensure_polled(device: Weak<wgpu::Device>) {
pub(super) fn send_to_poller_task(device: Weak<wgpu::Device>) {
POLLER_CHANNEL.with(|ch| ch.get_or_init(init_poller_task).send(device).unwrap())
}

Expand All @@ -71,53 +76,46 @@ mod inner {
}
}

/// Polls all [`wgpu::Device`]s delivered to it on `POLLING_CHANNEL`.
/// Polls all [`wgpu::Device`]s delivered to it on `POLLING_CHANNEL`,
/// as long as each device has at least one weak handle to it.
/// To be run on a thread or async task as the platform permits.
#[expect(clippy::infinite_loop)]
async fn polling_task(rx: flume::Receiver<Weak<wgpu::Device>>) {
// 10 milliseconds is much better than acceptable latency for our applications, which are all
// non-realtime headless rendering.
let polling_interval = core::time::Duration::from_millis(10);

let mut to_poll: HashSet<WeakIdentityDevice> = HashSet::new();
let mut to_drop: Vec<WeakIdentityDevice> = Vec::new();
// We want to poll each device only once per polling interval, but there may be multiple
// requests to poll the same device. This table has devices as keys and requests as values.
let mut to_poll: HashMap<wgpu::Device, Vec<Weak<wgpu::Device>>> = HashMap::new();

#[cfg(target_family = "wasm")]
let mut tick_stream =
gloo_timers::future::IntervalStream::new(polling_interval.as_millis().try_into().unwrap())
.fuse();

loop {
// Prune dropped requests and their devices.
to_poll.retain(|_, requests| {
requests.retain(|request| Weak::upgrade(request).is_some());
!requests.is_empty()
});

// Poll all the devices to poll.
// eprintln!("poller: polling {}", to_poll.len());
for device_ref in to_poll.iter() {
match device_ref.0.upgrade() {
Some(device) => {
// Kludge: As of wgpu 0.18, using Maintain::Poll doesn't actually have any
// effect (doesn't cause the map callbacks to run) on WebGL on Firefox.
// So, for now, use Maintain::Wait (which also doesn't actually do any waiting
// (wgpu bug), but *does* trigger the callbacks).
// TODO: See if we can remove this in the next wgpu version.
let maintain = if cfg!(target_family = "wasm") {
wgpu::Maintain::Wait
} else {
wgpu::Maintain::Poll
};

let queue_empty = device.poll(maintain).is_queue_empty();
if queue_empty {
to_drop.push(device_ref.clone());
}
}
None => to_drop.push(device_ref.clone()),
}
}

// Remove no-longer-necessary entries.
// if !to_drop.is_empty() {
// eprintln!("poller: dropping {}", to_drop.len());
// }
for device_ref in to_drop.drain(..) {
to_poll.remove(&device_ref);
for device in to_poll.keys() {
// Kludge: As of wgpu 0.18, using Maintain::Poll doesn't actually have any
// effect (doesn't cause the map callbacks to run) on WebGL on Firefox.
// So, for now, use Maintain::Wait (which also doesn't actually do any waiting
// (wgpu bug), but *does* trigger the callbacks).
// TODO: See if we can remove this in the next wgpu version.
let maintain = if cfg!(target_family = "wasm") {
wgpu::Maintain::Wait
} else {
wgpu::Maintain::Poll
};

device.poll(maintain);
}

// While waiting for it to be time to poll again, check for incoming requests.
Expand Down Expand Up @@ -147,12 +145,17 @@ async fn polling_task(rx: flume::Receiver<Weak<wgpu::Device>>) {
};

match recv_result {
Ok(device) => {
// eprintln!("poller: got another device");
to_poll.insert(WeakIdentityDevice(device));
Ok(request) => {
if let Some(device) = request.upgrade() {
// eprintln!("poller: got request for {device:?}");
to_poll
.entry(wgpu::Device::clone(&*device))
.or_default()
.push(request);
}
}
Err(flume::RecvTimeoutError::Disconnected) => {
// This shouldn't happen because the sender is never dropped
// This shouldn't happen because the sender is static and never dropped
log::warn!("shouldn't happen: wgpu poller channel disconnected");
}
Err(flume::RecvTimeoutError::Timeout) => {
Expand All @@ -162,20 +165,3 @@ async fn polling_task(rx: flume::Receiver<Weak<wgpu::Device>>) {
}
}
}

/// Compare a `Weak<wgpu::Device>` by pointer identity.
#[derive(Clone, Debug)]
struct WeakIdentityDevice(Weak<wgpu::Device>);

impl PartialEq for WeakIdentityDevice {
fn eq(&self, other: &Self) -> bool {
Weak::ptr_eq(&self.0, &other.0)
}
}
impl Eq for WeakIdentityDevice {}
impl std::hash::Hash for WeakIdentityDevice {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
let ptr: *const wgpu::Device = self.0.as_ptr();
ptr.hash(state);
}
}
6 changes: 1 addition & 5 deletions all-is-cubes-gpu/src/in_wgpu/postprocess.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,13 @@
//! * screen-space effects such as bloom
//! * tone mapping
use std::sync::Arc;

use all_is_cubes::time;
use all_is_cubes_render::camera::{GraphicsOptions, ToneMappingOperator};
use all_is_cubes_render::Flaws;

use crate::in_wgpu::shaders::Shaders;

pub(crate) fn create_postprocess_bind_group_layout(
device: &Arc<wgpu::Device>,
) -> wgpu::BindGroupLayout {
pub(crate) fn create_postprocess_bind_group_layout(device: &wgpu::Device) -> wgpu::BindGroupLayout {
device.create_bind_group_layout(&wgpu::BindGroupLayoutDescriptor {
entries: &[
// Binding for info_text_texture
Expand Down
7 changes: 3 additions & 4 deletions all-is-cubes-gpu/src/in_wgpu/rerun_image.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Exports the rendered image to Rerun.
use std::num::NonZeroU64;
use std::sync::Arc;

use futures_core::future::BoxFuture;

Expand All @@ -14,7 +13,7 @@ use crate::in_wgpu::pipelines::Pipelines;
use crate::Memo;

pub(crate) struct RerunImageExport {
device: Arc<wgpu::Device>,
device: wgpu::Device,

destination: rg::Destination,
image_copy_future: Option<BoxFuture<'static, RerunImageCopyOutput>>,
Expand All @@ -37,7 +36,7 @@ struct Resources {
}

impl RerunImageExport {
pub fn new(device: Arc<wgpu::Device>) -> Self {
pub fn new(device: wgpu::Device) -> Self {
Self {
resources: Memo::new(),
device,
Expand Down Expand Up @@ -249,7 +248,7 @@ type RerunImageCopyOutput = (
);

fn perform_image_copy(
device: &Arc<wgpu::Device>,
device: &wgpu::Device,
queue: &wgpu::Queue,
srgb_scene_texture: &wgpu::Texture,
depth_texture: &wgpu::Texture,
Expand Down

0 comments on commit 46a38a2

Please sign in to comment.