Skip to content

Commit

Permalink
all: Encapsulate manipulation of SharedProofOfIndexing
Browse files Browse the repository at this point in the history
This brings a small amount of sanity to the code using that.
  • Loading branch information
lutter committed Mar 7, 2025
1 parent 2374ddf commit 8687ada
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 87 deletions.
19 changes: 2 additions & 17 deletions chain/substreams/src/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use graph::{
substreams::Modules,
};
use graph_runtime_wasm::module::ToAscPtr;
use lazy_static::__Deref;
use std::{collections::BTreeSet, sync::Arc};

use crate::{Block, Chain, NoopDataSourceTemplate, ParsedChanges};
Expand Down Expand Up @@ -179,18 +178,6 @@ impl blockchain::TriggersAdapter<Chain> for TriggersAdapter {
}
}

fn write_poi_event(
proof_of_indexing: &SharedProofOfIndexing,
poi_event: &ProofOfIndexingEvent,
causality_region: &str,
logger: &Logger,
) {
if let Some(proof_of_indexing) = proof_of_indexing {
let mut proof_of_indexing = proof_of_indexing.deref().borrow_mut();
proof_of_indexing.write(logger, causality_region, poi_event);
}
}

pub struct TriggerProcessor {
pub locator: DeploymentLocator,
}
Expand Down Expand Up @@ -226,8 +213,7 @@ where
return Err(MappingError::Unknown(anyhow!("Detected UNSET entity operation, either a server error or there's a new type of operation and we're running an outdated protobuf")));
}
ParsedChanges::Upsert { key, entity } => {
write_poi_event(
proof_of_indexing,
proof_of_indexing.write_event(
&ProofOfIndexingEvent::SetEntity {
entity_type: key.entity_type.typename(),
id: &key.entity_id.to_string(),
Expand All @@ -249,8 +235,7 @@ where
let id = entity_key.entity_id.clone();
state.entity_cache.remove(entity_key);

write_poi_event(
proof_of_indexing,
proof_of_indexing.write_event(
&ProofOfIndexingEvent::RemoveEntity {
entity_type: entity_type.typename(),
id: &id.to_string(),
Expand Down
20 changes: 6 additions & 14 deletions core/src/subgraph/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,7 @@ impl<C: Blockchain, T: RuntimeHostBuilder<C>> IndexingContext<C, T> {
) -> Result<BlockState, MappingError> {
let error_count = state.deterministic_errors.len();

if let Some(proof_of_indexing) = proof_of_indexing {
proof_of_indexing
.borrow_mut()
.start_handler(causality_region);
}
proof_of_indexing.start_handler(causality_region);

let start = Instant::now();

Expand All @@ -156,16 +152,12 @@ impl<C: Blockchain, T: RuntimeHostBuilder<C>> IndexingContext<C, T> {
let elapsed = start.elapsed().as_secs_f64();
subgraph_metrics.observe_trigger_processing_duration(elapsed);

if let Some(proof_of_indexing) = proof_of_indexing {
if state.deterministic_errors.len() != error_count {
assert!(state.deterministic_errors.len() == error_count + 1);
if state.deterministic_errors.len() != error_count {
assert!(state.deterministic_errors.len() == error_count + 1);

// If a deterministic error has happened, write a new
// ProofOfIndexingEvent::DeterministicError to the SharedProofOfIndexing.
proof_of_indexing
.borrow_mut()
.write_deterministic_error(logger, causality_region);
}
// If a deterministic error has happened, write a new
// ProofOfIndexingEvent::DeterministicError to the SharedProofOfIndexing.
proof_of_indexing.write_deterministic_error(logger, causality_region);
}

Ok(state)
Expand Down
21 changes: 7 additions & 14 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::subgraph::error::BlockProcessingError;
use crate::subgraph::inputs::IndexingInputs;
use crate::subgraph::state::IndexingState;
use crate::subgraph::stream::new_block_stream;
use atomic_refcell::AtomicRefCell;
use graph::blockchain::block_stream::{
BlockStreamError, BlockStreamEvent, BlockWithTriggers, FirehoseCursor,
};
Expand Down Expand Up @@ -367,10 +366,8 @@ where
debug!(logger, "Start processing block";
"triggers" => triggers.len());

let proof_of_indexing = Some(Arc::new(AtomicRefCell::new(ProofOfIndexing::new(
block_ptr.number,
self.inputs.poi_version,
))));
let proof_of_indexing =
SharedProofOfIndexing::new(block_ptr.number, self.inputs.poi_version);

// Causality region for onchain triggers.
let causality_region = PoICausalityRegion::from_network(&self.inputs.network);
Expand Down Expand Up @@ -629,8 +626,7 @@ where
return Err(BlockProcessingError::Canceled);
}

if let Some(proof_of_indexing) = proof_of_indexing {
let proof_of_indexing = Arc::try_unwrap(proof_of_indexing).unwrap().into_inner();
if let Some(proof_of_indexing) = proof_of_indexing.into_inner() {
update_proof_of_indexing(
proof_of_indexing,
block.timestamp(),
Expand Down Expand Up @@ -1156,7 +1152,7 @@ where

// PoI ignores offchain events.
// See also: poi-ignores-offchain
let proof_of_indexing = None;
let proof_of_indexing = SharedProofOfIndexing::ignored();
let causality_region = "";

let trigger = TriggerData::Offchain(trigger);
Expand Down Expand Up @@ -1314,10 +1310,8 @@ where
.deployment_head
.set(block_ptr.number as f64);

let proof_of_indexing = Some(Arc::new(AtomicRefCell::new(ProofOfIndexing::new(
block_ptr.number,
self.inputs.poi_version,
))));
let proof_of_indexing =
SharedProofOfIndexing::new(block_ptr.number, self.inputs.poi_version);

// Causality region for onchain triggers.
let causality_region = PoICausalityRegion::from_network(&self.inputs.network);
Expand Down Expand Up @@ -1372,8 +1366,7 @@ where
return Err(BlockProcessingError::Canceled.into());
}

if let Some(proof_of_indexing) = proof_of_indexing {
let proof_of_indexing = Arc::try_unwrap(proof_of_indexing).unwrap().into_inner();
if let Some(proof_of_indexing) = proof_of_indexing.into_inner() {
update_proof_of_indexing(
proof_of_indexing,
block_time,
Expand Down
20 changes: 6 additions & 14 deletions core/src/subgraph/trigger_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,7 @@ where
return Ok(state);
}

if let Some(proof_of_indexing) = proof_of_indexing {
proof_of_indexing
.borrow_mut()
.start_handler(causality_region);
}
proof_of_indexing.start_handler(causality_region);

for HostedTrigger {
host,
Expand Down Expand Up @@ -73,16 +69,12 @@ where
}
}

if let Some(proof_of_indexing) = proof_of_indexing {
if state.deterministic_errors.len() != error_count {
assert!(state.deterministic_errors.len() == error_count + 1);
if state.deterministic_errors.len() != error_count {
assert!(state.deterministic_errors.len() == error_count + 1);

// If a deterministic error has happened, write a new
// ProofOfIndexingEvent::DeterministicError to the SharedProofOfIndexing.
proof_of_indexing
.borrow_mut()
.write_deterministic_error(logger, causality_region);
}
// If a deterministic error has happened, write a new
// ProofOfIndexingEvent::DeterministicError to the SharedProofOfIndexing.
proof_of_indexing.write_deterministic_error(logger, causality_region);
}

Ok(state)
Expand Down
66 changes: 56 additions & 10 deletions graph/src/components/subgraph/proof_of_indexing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ mod online;
mod reference;

pub use event::ProofOfIndexingEvent;
use graph_derive::CheapClone;
pub use online::{ProofOfIndexing, ProofOfIndexingFinisher};
pub use reference::PoICausalityRegion;

use atomic_refcell::AtomicRefCell;
use std::sync::Arc;
use slog::Logger;
use std::{ops::Deref, sync::Arc};

use crate::prelude::BlockNumber;

#[derive(Copy, Clone, Debug)]
pub enum ProofOfIndexingVersion {
Expand All @@ -22,15 +26,57 @@ pub enum ProofOfIndexingVersion {
/// intentionally disallowed - PoI requires sequential access to the hash
/// function within a given causality region even if ownership is shared across
/// multiple mapping contexts.
///
/// The Option<_> is because not all subgraphs support PoI until re-deployed.
/// Eventually this can be removed.
///
/// This is not a great place to define this type, since the ProofOfIndexing
/// shouldn't "know" these details about wasmtime and subgraph re-deployments,
/// but the APIs that would make use of this are in graph/components so this
/// lives here for lack of a better choice.
pub type SharedProofOfIndexing = Option<Arc<AtomicRefCell<ProofOfIndexing>>>;
#[derive(Clone, CheapClone)]
pub struct SharedProofOfIndexing {
poi: Option<Arc<AtomicRefCell<ProofOfIndexing>>>,
}

impl SharedProofOfIndexing {
pub fn new(block: BlockNumber, version: ProofOfIndexingVersion) -> Self {
SharedProofOfIndexing {
poi: Some(Arc::new(AtomicRefCell::new(ProofOfIndexing::new(
block, version,
)))),
}
}

pub fn ignored() -> Self {
SharedProofOfIndexing { poi: None }
}

pub fn write_event(
&self,
poi_event: &ProofOfIndexingEvent,
causality_region: &str,
logger: &Logger,
) {
if let Some(poi) = &self.poi {
let mut poi = poi.deref().borrow_mut();
poi.write(logger, causality_region, poi_event);
}
}

pub fn start_handler(&self, causality_region: &str) {
if let Some(poi) = &self.poi {
let mut poi = poi.deref().borrow_mut();
poi.start_handler(causality_region);
}
}

pub fn write_deterministic_error(&self, logger: &Logger, causality_region: &str) {
if let Some(proof_of_indexing) = &self.poi {
proof_of_indexing
.deref()
.borrow_mut()
.write_deterministic_error(logger, causality_region);
}
}

pub fn into_inner(self) -> Option<ProofOfIndexing> {
self.poi
.map(|poi| Arc::try_unwrap(poi).unwrap().into_inner())
}
}

#[cfg(test)]
mod tests {
Expand Down
3 changes: 2 additions & 1 deletion runtime/test/src/common.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use ethabi::Contract;
use graph::blockchain::BlockTime;
use graph::components::store::DeploymentLocator;
use graph::components::subgraph::SharedProofOfIndexing;
use graph::data::subgraph::*;
use graph::data_source;
use graph::data_source::common::MappingABI;
Expand Down Expand Up @@ -127,7 +128,7 @@ pub fn mock_context(
.unwrap(),
Default::default(),
),
proof_of_indexing: None,
proof_of_indexing: SharedProofOfIndexing::ignored(),
host_fns: Arc::new(Vec::new()),
debug_fork: None,
mapping_logger: Logger::root(slog::Discard, o!()),
Expand Down
19 changes: 2 additions & 17 deletions runtime/wasm/src/host_exports.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::collections::HashMap;
use std::ops::Deref;
use std::str::FromStr;
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -33,18 +32,6 @@ use crate::{error::DeterminismLevel, module::IntoTrap};

use super::module::WasmInstanceData;

fn write_poi_event(
proof_of_indexing: &SharedProofOfIndexing,
poi_event: &ProofOfIndexingEvent,
causality_region: &str,
logger: &Logger,
) {
if let Some(proof_of_indexing) = proof_of_indexing {
let mut proof_of_indexing = proof_of_indexing.deref().borrow_mut();
proof_of_indexing.write(logger, causality_region, poi_event);
}
}

impl IntoTrap for HostExportError {
fn determinism_level(&self) -> DeterminismLevel {
match self {
Expand Down Expand Up @@ -336,8 +323,7 @@ impl HostExports {
.map_err(|e| HostExportError::Deterministic(anyhow!(e)))?;

let poi_section = stopwatch.start_section("host_export_store_set__proof_of_indexing");
write_poi_event(
proof_of_indexing,
proof_of_indexing.write_event(
&ProofOfIndexingEvent::SetEntity {
entity_type: &key.entity_type.typename(),
id: &key.entity_id.to_string(),
Expand Down Expand Up @@ -369,8 +355,7 @@ impl HostExports {
entity_id: String,
gas: &GasCounter,
) -> Result<(), HostExportError> {
write_poi_event(
proof_of_indexing,
proof_of_indexing.write_event(
&ProofOfIndexingEvent::RemoveEntity {
entity_type: &entity_type,
id: &entity_id,
Expand Down

0 comments on commit 8687ada

Please sign in to comment.