Skip to content

Commit

Permalink
Fix issues with cache invalidation related to parallel queries. (#6292)
Browse files Browse the repository at this point in the history
### What
- Resolves: #6279

### Issue 1: data invalidation before data use

This failure mode happens where:
  - Thread 1: produces results and returns them
  - Thread 2: invalidates results
- Thread 1: uses the new results which point to a non-overlapping range
and finds nothing of interest.

I worked around this by cloning the data and orphaning the Inner when we
execute an invalidation. This feels a bit heavy-handed for the cases
where we don't need it. Maybe there's an alternative way to keep the
structure locked for read until the results have been consumed?

### Issue 2: not considering pending data

The `time_range` logic wasn't factoring in the possibility of
pending_data. This meant we could have pending gaps in queries that had
not yet been resolved, but wouldn't count them as gaps. Subsequently as
the promises are resolved we would still end up with gaps in the data.

`time_range()` has now been replaced with `pending_time_range()` to do
the proper book-keeping.

### Checklist
* [x] I have read and agree to [Contributor
Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and
the [Code of
Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md)
* [x] I've included a screenshot or gif (if applicable)
* [x] I have tested the web demo (if applicable):
* Using examples from latest `main` build:
[rerun.io/viewer](https://rerun.io/viewer/pr/6292?manifest_url=https://app.rerun.io/version/main/examples_manifest.json)
* Using full set of examples from `nightly` build:
[rerun.io/viewer](https://rerun.io/viewer/pr/6292?manifest_url=https://app.rerun.io/version/nightly/examples_manifest.json)
* [x] The PR title and labels are set such as to maximize their
usefulness for the next release's CHANGELOG
* [x] If applicable, add a new check to the [release
checklist](https://github.com/rerun-io/rerun/blob/main/tests/python/release_checklist)!

- [PR Build Summary](https://build.rerun.io/pr/6292)
- [Recent benchmark results](https://build.rerun.io/graphs/crates.html)
- [Wasm size tracking](https://build.rerun.io/graphs/sizes.html)

To run all checks from `main`, comment on the PR with `@rerun-bot
full-check`.

---------

Co-authored-by: Clement Rey <[email protected]>
  • Loading branch information
jleibs and teh-cmc authored May 13, 2024
1 parent f0856be commit 95b7910
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 109 deletions.
2 changes: 1 addition & 1 deletion crates/re_query/src/cache_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl Caches {
(
key.clone(),
(
cache.time_range(),
cache.pending_time_range(),
CachedComponentStats {
total_indices: cache.indices.len() as _,
total_instances: cache.num_instances(),
Expand Down
16 changes: 15 additions & 1 deletion crates/re_query/src/flat_vec_deque.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ pub trait ErasedFlatVecDeque: std::any::Any {

fn into_any(self: Box<Self>) -> Box<dyn std::any::Any>;

/// Dynamically dispatches to [`FlatVecDeque::clone`].
///
/// This is prefixed with `dyn_` to avoid method dispatch ambiguities that are very hard to
/// avoid even with explicit syntax and that silently lead to infinite recursions.
fn dyn_clone(&self) -> Box<dyn ErasedFlatVecDeque + Send + Sync>;

/// Dynamically dispatches to [`FlatVecDeque::num_entries`].
///
/// This is prefixed with `dyn_` to avoid method dispatch ambiguities that are very hard to
Expand Down Expand Up @@ -53,7 +59,10 @@ pub trait ErasedFlatVecDeque: std::any::Any {
fn dyn_total_size_bytes(&self) -> u64;
}

impl<T: SizeBytes + 'static> ErasedFlatVecDeque for FlatVecDeque<T> {
impl<T: SizeBytes + 'static> ErasedFlatVecDeque for FlatVecDeque<T>
where
T: Send + Sync + Clone,
{
#[inline]
fn as_any(&self) -> &dyn std::any::Any {
self
Expand All @@ -64,6 +73,11 @@ impl<T: SizeBytes + 'static> ErasedFlatVecDeque for FlatVecDeque<T> {
self
}

#[inline]
fn dyn_clone(&self) -> Box<dyn ErasedFlatVecDeque + Send + Sync> {
Box::new((*self).clone())
}

#[inline]
fn into_any(self: Box<Self>) -> Box<dyn std::any::Any> {
self
Expand Down
88 changes: 17 additions & 71 deletions crates/re_query/src/range/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl Caches {
// and coarsly invalidates the whole cache in that case, to avoid the kind of bugs
// showcased in <https://github.com/rerun-io/rerun/issues/5686>.
{
let time_range = cache.per_data_time.read_recursive().time_range();
let time_range = cache.per_data_time.read_recursive().pending_time_range();
if let Some(time_range) = time_range {
{
let hole_start = time_range.max();
Expand Down Expand Up @@ -290,7 +290,12 @@ impl RangeCache {
return;
};

per_data_time.write().truncate_at_time(pending_invalidation);
// Invalidating data is tricky. Our results object may have been cloned and shared already.
// We can't just invalidate the data in-place without guaranteeing the post-invalidation query
// will return the same results as the pending pre-invalidation queries.
let mut new_inner = (*per_data_time.read()).clone();
new_inner.truncate_at_time(pending_invalidation);
per_data_time.inner = Arc::new(RwLock::new(new_inner));
}
}

Expand Down Expand Up @@ -326,14 +331,6 @@ impl RangeComponentResultsInner {
pub fn compute_front_query(&self, query: &RangeQuery) -> Option<RangeQuery> {
let mut reduced_query = query.clone();

// If nothing has been cached already, then we just want to query everything.
if self.indices.is_empty()
&& self.promises_front.is_empty()
&& self.promises_back.is_empty()
{
return Some(reduced_query);
}

// If the cache contains static data, then there's no point in querying anything else since
// static data overrides everything anyway.
if self
Expand All @@ -354,29 +351,14 @@ impl RangeComponentResultsInner {
// We check the back promises too just because I'm feeling overly cautious.
// See `Concurrency edge-case` section below.

let pending_front_min = self
.promises_front
.first()
.map_or(TimeInt::MAX.as_i64(), |((t, _), _)| {
t.as_i64().saturating_sub(1)
});
let pending_back_min = self
.promises_back
.first()
.map_or(TimeInt::MAX.as_i64(), |((t, _), _)| {
t.as_i64().saturating_sub(1)
});
let pending_min = i64::min(pending_front_min, pending_back_min);

if let Some(time_range) = self.time_range() {
let time_range_min = i64::min(time_range.min().as_i64().saturating_sub(1), pending_min);
if let Some(time_range) = self.pending_time_range() {
let time_range_min = time_range.min().as_i64().saturating_sub(1);
reduced_query
.range
.set_max(i64::min(reduced_query.range.max().as_i64(), time_range_min));
} else {
reduced_query
.range
.set_max(i64::min(reduced_query.range.max().as_i64(), pending_min));
// If nothing has been cached already, then we just want to query everything.
return Some(reduced_query);
}

if reduced_query.range.max() < reduced_query.range.min() {
Expand All @@ -396,15 +378,6 @@ impl RangeComponentResultsInner {
) -> Option<RangeQuery> {
let mut reduced_query = query.clone();

// If nothing has been cached already, then the front query is already going to take care
// of everything.
if self.indices.is_empty()
&& self.promises_front.is_empty()
&& self.promises_back.is_empty()
{
return None;
}

// If the cache contains static data, then there's no point in querying anything else since
// static data overrides everything anyway.
if self
Expand All @@ -415,49 +388,22 @@ impl RangeComponentResultsInner {
return None;
}

// Otherwise, query for what's missing on the back-side of the cache, while making sure to
// Otherwise, query for what's missing on the back-side of the cache., while making sure to
// take pending promises into account!
//
// Keep in mind: it is not possible for the cache to contain only part of a given
// timestamp. All entries for a given timestamp are loaded and invalidated atomically,
// whether it's promises or already resolved entries.

// # Concurrency edge-case
//
// We need to make sure to check for both front _and_ back promises here.
// If two or more threads are querying for the same entity path concurrently, it is possible
// that the first thread queues a bunch of promises to the front queue, but then relinquishes
// control to the second thread before ever resolving those promises.
//
// If that happens, the second thread would end up queuing the same exact promises in the
// back queue, yielding duplicated data.
// In most cases, duplicated data isn't noticeable (except for a performance drop), but if
// the duplication is only partial this can sometimes lead to visual glitches, depending on
// the visualizer used.

let pending_back_max = self
.promises_back
.last()
.map_or(TimeInt::MIN.as_i64(), |((t, _), _)| {
t.as_i64().saturating_add(1)
});
let pending_front_max = self
.promises_front
.last()
.map_or(TimeInt::MIN.as_i64(), |((t, _), _)| {
t.as_i64().saturating_add(1)
});
let pending_max = i64::max(pending_back_max, pending_front_max);

if let Some(time_range) = self.time_range() {
let time_range_max = i64::max(time_range.max().as_i64().saturating_add(1), pending_max);
if let Some(time_range) = self.pending_time_range() {
let time_range_max = time_range.max().as_i64().saturating_add(1);
reduced_query
.range
.set_min(i64::max(reduced_query.range.min().as_i64(), time_range_max));
} else {
reduced_query
.range
.set_min(i64::max(reduced_query.range.min().as_i64(), pending_max));
// If nothing has been cached already, then the front query is already going to take care
// of everything.
return None;
}

// Back query should never overlap with the front query.
Expand Down
41 changes: 32 additions & 9 deletions crates/re_query/src/range/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,20 @@ pub struct RangeComponentResultsInner {
pub(crate) cached_dense: Option<Box<dyn ErasedFlatVecDeque + Send + Sync>>,
}

impl Clone for RangeComponentResultsInner {
#[inline]
fn clone(&self) -> Self {
Self {
indices: self.indices.clone(),
promises_front: self.promises_front.clone(),
promises_back: self.promises_back.clone(),
front_status: self.front_status.clone(),
back_status: self.back_status.clone(),
cached_dense: self.cached_dense.as_ref().map(|dense| dense.dyn_clone()),
}
}
}

impl SizeBytes for RangeComponentResultsInner {
#[inline]
fn heap_size_bytes(&self) -> u64 {
Expand Down Expand Up @@ -747,9 +761,10 @@ impl RangeComponentResultsInner {
}),
"back promises must always be sorted in ascending index order"
);
if let (Some(p_index), Some(i_index)) =
(promises_back.last().map(|(index, _)| index), indices.back())
{
if let (Some(p_index), Some(i_index)) = (
promises_back.first().map(|(index, _)| index),
indices.back(),
) {
assert!(
i_index < p_index,
"the leftmost back promise must have an index larger than the rightmost data index ({i_index:?} < {p_index:?})",
Expand All @@ -761,14 +776,22 @@ impl RangeComponentResultsInner {
}
}

/// Returns the time range covered by the cached data.
/// Returns the pending time range that will be covered by the cached data.
///
/// Reminder: [`TimeInt::STATIC`] is never included in [`ResolvedTimeRange`]s.
#[inline]
pub fn time_range(&self) -> Option<ResolvedTimeRange> {
let first_time = self.indices.front().map(|(t, _)| *t)?;
let last_time = self.indices.back().map(|(t, _)| *t)?;
Some(ResolvedTimeRange::new(first_time, last_time))
pub fn pending_time_range(&self) -> Option<ResolvedTimeRange> {
let pending_front_min = self.promises_front.first().map(|((t, _), _)| *t);
let pending_front_max = self.promises_front.last().map(|((t, _), _)| *t);
let pending_back_max = self.promises_back.last().map(|((t, _), _)| *t);

let first_time = self.indices.front().map(|(t, _)| *t);
let last_time = self.indices.back().map(|(t, _)| *t);

Some(ResolvedTimeRange::new(
pending_front_min.or(first_time)?,
pending_back_max.or(last_time).or(pending_front_max)?,
))
}

#[inline]
Expand All @@ -786,7 +809,7 @@ impl RangeComponentResultsInner {
pub fn truncate_at_time(&mut self, threshold: TimeInt) {
re_tracing::profile_function!();

let time_range = self.time_range();
let time_range = self.pending_time_range();

let Self {
indices,
Expand Down
Loading

0 comments on commit 95b7910

Please sign in to comment.