Skip to content

Commit

Permalink
feat: add snapshot seqs field to query context (#5477)
Browse files Browse the repository at this point in the history
* TODO: snapshot read

* feat: RegionEngine get last seq

* feat: query context snapshot

* chore: use new proto

* feat: get_region_seqs in region engine

* chore: typo

* chore: toml

* feat: make snapshots modifiable

* feat: add hint for snapshot read

* chore: some typo

* refactor: remove hint as not used

* fix: use commited seqs

* refactor: remove sequences variant on RegionRequest

* refactor: per review

* chore: rebase solve conflict

* refactor: rm unused key

* chore: per review

* chore: per review
  • Loading branch information
discord9 authored Feb 14, 2025
1 parent 0d19e8f commit 1e6d2fb
Show file tree
Hide file tree
Showing 16 changed files with 159 additions and 18 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "683e9d10ae7f3dfb8aaabd89082fc600c17e3795" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "e2fd89fce1fe9ea0c36c85bcf447ce4bb4a84af3" }
hex = "0.4"
http = "1"
humantime = "2.1"
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/src/rpc/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1239,6 +1239,7 @@ impl From<QueryContext> for PbQueryContext {
timezone,
extensions,
channel: channel as u32,
snapshot_seqs: None,
}
}
}
Expand Down
21 changes: 15 additions & 6 deletions src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,12 @@ impl RegionServer {
self.inner.handle_request(region_id, request).await
}

async fn table_provider(&self, region_id: RegionId) -> Result<Arc<dyn TableProvider>> {
/// Returns a table provider for the region. Will set snapshot sequence if available in the context.
async fn table_provider(
&self,
region_id: RegionId,
ctx: Option<&session::context::QueryContext>,
) -> Result<Arc<dyn TableProvider>> {
let status = self
.inner
.region_map
Expand All @@ -173,7 +178,7 @@ impl RegionServer {

self.inner
.table_provider_factory
.create(region_id, status.into_engine())
.create(region_id, status.into_engine(), ctx)
.await
.context(ExecuteLogicalPlanSnafu)
}
Expand All @@ -188,16 +193,17 @@ impl RegionServer {
} else {
None
};
let region_id = RegionId::from_u64(request.region_id);
let provider = self.table_provider(region_id).await?;
let catalog_list = Arc::new(DummyCatalogList::with_table_provider(provider));

let query_ctx: QueryContextRef = request
.header
.as_ref()
.map(|h| Arc::new(h.into()))
.unwrap_or_else(|| Arc::new(QueryContextBuilder::default().build()));

let region_id = RegionId::from_u64(request.region_id);
let provider = self.table_provider(region_id, Some(&query_ctx)).await?;
let catalog_list = Arc::new(DummyCatalogList::with_table_provider(provider));

let decoder = self
.inner
.query_engine
Expand Down Expand Up @@ -226,7 +232,10 @@ impl RegionServer {
} else {
None
};
let provider = self.table_provider(request.region_id).await?;

let ctx: Option<session::context::QueryContext> = request.header.as_ref().map(|h| h.into());

let provider = self.table_provider(request.region_id, ctx.as_ref()).await?;

struct RegionDataSourceInjector {
source: Arc<dyn TableSource>,
Expand Down
6 changes: 5 additions & 1 deletion src/datanode/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use store_api::region_engine::{
SettableRegionRoleState,
};
use store_api::region_request::{AffectedRows, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
use table::TableRef;
use tokio::sync::mpsc::{Receiver, Sender};

Expand Down Expand Up @@ -218,6 +218,10 @@ impl RegionEngine for MockRegionEngine {
unimplemented!()
}

async fn get_last_seq_num(&self, _: RegionId) -> Result<Option<SequenceNumber>, BoxedError> {
unimplemented!()
}

async fn stop(&self) -> Result<(), BoxedError> {
Ok(())
}
Expand Down
6 changes: 5 additions & 1 deletion src/file-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use store_api::region_request::{
AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest,
RegionRequest,
};
use store_api::storage::{RegionId, ScanRequest};
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
use tokio::sync::Mutex;

use crate::config::EngineConfig;
Expand Down Expand Up @@ -114,6 +114,10 @@ impl RegionEngine for FileRegionEngine {
None
}

async fn get_last_seq_num(&self, _: RegionId) -> Result<Option<SequenceNumber>, BoxedError> {
Ok(None)
}

fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
self.inner
.set_region_role(region_id, role)
Expand Down
12 changes: 11 additions & 1 deletion src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use store_api::region_engine::{
SettableRegionRoleState,
};
use store_api::region_request::{BatchRegionDdlRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};

use self::state::MetricEngineState;
use crate::config::EngineConfig;
Expand Down Expand Up @@ -235,6 +235,16 @@ impl RegionEngine for MetricEngine {
self.handle_query(region_id, request).await
}

async fn get_last_seq_num(
&self,
region_id: RegionId,
) -> Result<Option<SequenceNumber>, BoxedError> {
self.inner
.get_last_seq_num(region_id)
.await
.map_err(BoxedError::new)
}

/// Retrieves region's metadata.
async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
self.inner
Expand Down
15 changes: 14 additions & 1 deletion src/metric-engine/src/engine/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use snafu::{OptionExt, ResultExt};
use store_api::metadata::{RegionMetadataBuilder, RegionMetadataRef};
use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
use store_api::region_engine::{RegionEngine, RegionScannerRef};
use store_api::storage::{RegionId, ScanRequest};
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};

use crate::engine::MetricEngineInner;
use crate::error::{
Expand Down Expand Up @@ -85,6 +85,19 @@ impl MetricEngineInner {
.context(MitoReadOperationSnafu)
}

pub async fn get_last_seq_num(&self, region_id: RegionId) -> Result<Option<SequenceNumber>> {
let region_id = if self.is_physical_region(region_id) {
region_id
} else {
let physical_region_id = self.get_physical_region_id(region_id).await?;
utils::to_data_region_id(physical_region_id)
};
self.mito
.get_last_seq_num(region_id)
.await
.context(MitoReadOperationSnafu)
}

pub async fn load_region_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
let is_reading_physical_region =
self.state.read().unwrap().exist_physical_region(region_id);
Expand Down
22 changes: 21 additions & 1 deletion src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ use store_api::region_engine::{
SetRegionRoleStateResponse, SettableRegionRoleState,
};
use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
use tokio::sync::{oneshot, Semaphore};

use crate::cache::CacheStrategy;
Expand Down Expand Up @@ -424,6 +424,17 @@ impl EngineInner {
receiver.await.context(RecvSnafu)?
}

fn get_last_seq_num(&self, region_id: RegionId) -> Result<Option<SequenceNumber>> {
// Reading a region doesn't need to go through the region worker thread.
let region = self
.workers
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;
let version_ctrl = &region.version_control;
let seq = Some(version_ctrl.committed_sequence());
Ok(seq)
}

/// Handles the scan `request` and returns a [ScanRegion].
fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
let query_start = Instant::now();
Expand Down Expand Up @@ -547,6 +558,15 @@ impl RegionEngine for MitoEngine {
.map_err(BoxedError::new)
}

async fn get_last_seq_num(
&self,
region_id: RegionId,
) -> Result<Option<SequenceNumber>, BoxedError> {
self.inner
.get_last_seq_num(region_id)
.map_err(BoxedError::new)
}

/// Retrieve region's metadata.
async fn get_metadata(
&self,
Expand Down
5 changes: 5 additions & 0 deletions src/mito2/src/region/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ impl VersionControl {
data.last_entry_id = entry_id;
}

/// Sequence number of last committed data.
pub(crate) fn committed_sequence(&self) -> SequenceNumber {
self.data.read().unwrap().committed_sequence
}

/// Freezes the mutable memtable if it is not empty.
pub(crate) fn freeze_mutable(&self) -> Result<()> {
let version = self.current().version;
Expand Down
17 changes: 16 additions & 1 deletion src/query/src/dummy_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ impl DummyTableProvider {
self.scan_request.lock().unwrap().series_row_selector = Some(selector);
}

pub fn with_sequence(&self, sequence: u64) {
self.scan_request.lock().unwrap().sequence = Some(sequence);
}

/// Gets the scan request of the provider.
#[cfg(test)]
pub fn scan_request(&self) -> ScanRequest {
Expand All @@ -249,6 +253,7 @@ impl TableProviderFactory for DummyTableProviderFactory {
&self,
region_id: RegionId,
engine: RegionEngineRef,
ctx: Option<&session::context::QueryContext>,
) -> Result<Arc<dyn TableProvider>> {
let metadata =
engine
Expand All @@ -258,11 +263,20 @@ impl TableProviderFactory for DummyTableProviderFactory {
engine: engine.name(),
region_id,
})?;

let scan_request = ctx
.and_then(|c| c.get_snapshot(region_id.as_u64()))
.map(|seq| ScanRequest {
sequence: Some(seq),
..Default::default()
})
.unwrap_or_default();

Ok(Arc::new(DummyTableProvider {
region_id,
engine,
metadata,
scan_request: Default::default(),
scan_request: Arc::new(Mutex::new(scan_request)),
}))
}
}
Expand All @@ -273,6 +287,7 @@ pub trait TableProviderFactory: Send + Sync {
&self,
region_id: RegionId,
engine: RegionEngineRef,
ctx: Option<&session::context::QueryContext>,
) -> Result<Arc<dyn TableProvider>>;
}

Expand Down
9 changes: 8 additions & 1 deletion src/query/src/optimizer/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use store_api::region_engine::{
SettableRegionRoleState,
};
use store_api::region_request::RegionRequest;
use store_api::storage::{ConcreteDataType, RegionId, ScanRequest};
use store_api::storage::{ConcreteDataType, RegionId, ScanRequest, SequenceNumber};

use crate::dummy_catalog::DummyTableProvider;

Expand Down Expand Up @@ -86,6 +86,13 @@ impl RegionEngine for MetaRegionEngine {
None
}

async fn get_last_seq_num(
&self,
_region_id: RegionId,
) -> Result<Option<SequenceNumber>, BoxedError> {
Ok(None)
}

async fn stop(&self) -> Result<(), BoxedError> {
Ok(())
}
Expand Down
25 changes: 24 additions & 1 deletion src/session/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ const CURSOR_COUNT_WARNING_LIMIT: usize = 10;
#[builder(build_fn(skip))]
pub struct QueryContext {
current_catalog: String,
/// mapping of RegionId to SequenceNumber, for snapshot read, meaning that the read should only
/// container data that was committed before(and include) the given sequence number
/// this field will only be filled if extensions contains a pair of "snapshot_read" and "true"
snapshot_seqs: Arc<RwLock<HashMap<u64, u64>>>,
// we use Arc<RwLock>> for modifiable fields
#[builder(default)]
mutable_session_data: Arc<RwLock<MutableInner>>,
Expand Down Expand Up @@ -116,7 +120,10 @@ impl From<&RegionRequestHeader> for QueryContext {
.current_schema(ctx.current_schema.clone())
.timezone(parse_timezone(Some(&ctx.timezone)))
.extensions(ctx.extensions.clone())
.channel(ctx.channel.into());
.channel(ctx.channel.into())
.snapshot_seqs(Arc::new(RwLock::new(
ctx.snapshot_seqs.clone().unwrap_or_default().snapshot_seqs,
)));
}
builder.build()
}
Expand All @@ -130,6 +137,9 @@ impl From<api::v1::QueryContext> for QueryContext {
.timezone(parse_timezone(Some(&ctx.timezone)))
.extensions(ctx.extensions)
.channel(ctx.channel.into())
.snapshot_seqs(Arc::new(RwLock::new(
ctx.snapshot_seqs.clone().unwrap_or_default().snapshot_seqs,
)))
.build()
}
}
Expand All @@ -141,6 +151,7 @@ impl From<QueryContext> for api::v1::QueryContext {
mutable_session_data: mutable_inner,
extensions,
channel,
snapshot_seqs,
..
}: QueryContext,
) -> Self {
Expand All @@ -151,6 +162,9 @@ impl From<QueryContext> for api::v1::QueryContext {
timezone: mutable_inner.timezone.to_string(),
extensions,
channel: channel as u32,
snapshot_seqs: Some(api::v1::SnapshotSequences {
snapshot_seqs: snapshot_seqs.read().unwrap().clone(),
}),
}
}
}
Expand Down Expand Up @@ -324,6 +338,14 @@ impl QueryContext {
let rb = guard.cursors.get(name);
rb.cloned()
}

pub fn snapshots(&self) -> HashMap<u64, u64> {
self.snapshot_seqs.read().unwrap().clone()
}

pub fn get_snapshot(&self, region_id: u64) -> Option<u64> {
self.snapshot_seqs.read().unwrap().get(&region_id).cloned()
}
}

impl QueryContextBuilder {
Expand All @@ -333,6 +355,7 @@ impl QueryContextBuilder {
current_catalog: self
.current_catalog
.unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string()),
snapshot_seqs: self.snapshot_seqs.unwrap_or_default(),
mutable_session_data: self.mutable_session_data.unwrap_or_default(),
mutable_query_context_data: self.mutable_query_context_data.unwrap_or_default(),
sql_dialect: self
Expand Down
2 changes: 2 additions & 0 deletions src/store-api/src/mito_engine_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub const APPEND_MODE_KEY: &str = "append_mode";
pub const MERGE_MODE_KEY: &str = "merge_mode";
/// Option key for TTL(time-to-live)
pub const TTL_KEY: &str = "ttl";
/// Option key for snapshot read.
pub const SNAPSHOT_READ: &str = "snapshot_read";
/// Option key for compaction type.
pub const COMPACTION_TYPE: &str = "compaction.type";
/// TWCS compaction strategy.
Expand Down
Loading

0 comments on commit 1e6d2fb

Please sign in to comment.