Skip to content

Commit

Permalink
feat: Procedure to create a mito engine (#1035)
Browse files Browse the repository at this point in the history
* feat: wip

* feat: Implement procedure to create mito table

* feat: Add create_table_procedure to TableEngine

* feat: Impl dump and lock for CreateMitoTable

* feat: Impl CreateMitoTable::execute and register it to manager

* feat(common-procedure): pub local mod

* feat: Add simple test for MitoCreateTable

* style: Fix clippy

* refactor: Move create_table_procedure to a new trait TableEngineProcedure
  • Loading branch information
evenyag authored Feb 21, 2023
1 parent 9161796 commit aaaf241
Show file tree
Hide file tree
Showing 14 changed files with 544 additions and 23 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion src/common/procedure/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ pub enum Error {
key: String,
source: object_store::Error,
},

#[snafu(display("Failed to deserialize from json, source: {}", source))]
FromJson {
source: serde_json::Error,
backtrace: Backtrace,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -81,7 +87,8 @@ impl ErrorExt for Error {
| Error::PutState { .. }
| Error::DeleteState { .. }
| Error::ListState { .. }
| Error::ReadState { .. } => StatusCode::Internal,
| Error::ReadState { .. }
| Error::FromJson { .. } => StatusCode::Internal,
Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => {
StatusCode::InvalidArguments
}
Expand Down
5 changes: 1 addition & 4 deletions src/common/procedure/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
//! Common traits and structures for the procedure framework.
pub mod error;
#[allow(dead_code)]
mod local;
pub mod local;
mod procedure;
// TODO(yingwen): Remove this attribute once ProcedureManager is implemented.
#[allow(dead_code)]
mod store;

pub use crate::error::{Error, Result};
Expand Down
2 changes: 1 addition & 1 deletion src/common/procedure/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ impl ManagerContext {
#[derive(Debug)]
pub struct ManagerConfig {
/// Object store
object_store: ObjectStore,
pub object_store: ObjectStore,
}

/// A [ProcedureManager] that maintains procedure states locally.
Expand Down
4 changes: 0 additions & 4 deletions src/common/procedure/src/local/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,6 @@ impl ExecResult {
matches!(self, ExecResult::Done)
}

fn is_retry_later(&self) -> bool {
matches!(self, ExecResult::RetryLater)
}

fn is_failed(&self) -> bool {
matches!(self, ExecResult::Failed(_))
}
Expand Down
1 change: 1 addition & 0 deletions src/mito/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ async-trait = "0.1"
chrono.workspace = true
common-catalog = { path = "../common/catalog" }
common-error = { path = "../common/error" }
common-procedure = { path = "../common/procedure" }
common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
common-telemetry = { path = "../common/telemetry" }
Expand Down
46 changes: 36 additions & 10 deletions src/mito/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod procedure;

use std::collections::HashMap;
use std::sync::{Arc, RwLock};

use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_procedure::{BoxedProcedure, ProcedureManager};
use common_telemetry::tracing::log::info;
use common_telemetry::{debug, logging};
use datatypes::schema::{Schema, SchemaRef};
use datatypes::schema::Schema;
use object_store::ObjectStore;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{
ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, ColumnId,
CreateOptions, EngineContext as StorageEngineContext, OpenOptions, Region,
RegionDescriptorBuilder, RegionId, RowKeyDescriptor, RowKeyDescriptorBuilder, StorageEngine,
};
use table::engine::{EngineContext, TableEngine, TableReference};
use table::engine::{EngineContext, TableEngine, TableEngineProcedure, TableReference};
use table::error::TableOperationSnafu;
use table::metadata::{
TableId, TableInfo, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion,
Expand All @@ -40,6 +43,7 @@ use table::{error as table_error, Result as TableResult, Table};
use tokio::sync::Mutex;

use crate::config::EngineConfig;
use crate::engine::procedure::CreateMitoTable;
use crate::error::{
self, BuildColumnDescriptorSnafu, BuildColumnFamilyDescriptorSnafu, BuildRegionDescriptorSnafu,
BuildRowKeyDescriptorSnafu, InvalidPrimaryKeySnafu, InvalidRawSchemaSnafu,
Expand Down Expand Up @@ -83,6 +87,14 @@ impl<S: StorageEngine> MitoEngine<S> {
inner: Arc::new(MitoEngineInner::new(config, storage_engine, object_store)),
}
}

/// Register all procedure loaders to the procedure manager.
///
/// # Panics
/// Panics on error.
pub fn register_procedure_loaders(&self, procedure_manager: &dyn ProcedureManager) {
procedure::register_procedure_loaders(self.inner.clone(), procedure_manager);
}
}

#[async_trait]
Expand Down Expand Up @@ -152,7 +164,22 @@ impl<S: StorageEngine> TableEngine for MitoEngine<S> {
}
}

struct MitoEngineInner<S: StorageEngine> {
impl<S: StorageEngine> TableEngineProcedure for MitoEngine<S> {
fn create_table_procedure(
&self,
_ctx: &EngineContext,
request: CreateTableRequest,
) -> TableResult<BoxedProcedure> {
validate_create_table_request(&request)
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;

let procedure = Box::new(CreateMitoTable::new(request, self.inner.clone()));
Ok(procedure)
}
}

pub(crate) struct MitoEngineInner<S: StorageEngine> {
/// All tables opened by the engine. Map key is formatted [TableReference].
///
/// Writing to `tables` should also hold the `table_mutex`.
Expand All @@ -167,7 +194,7 @@ struct MitoEngineInner<S: StorageEngine> {
fn build_row_key_desc(
mut column_id: ColumnId,
table_name: &str,
table_schema: &SchemaRef,
table_schema: &Schema,
primary_key_indices: &Vec<usize>,
) -> Result<(ColumnId, RowKeyDescriptor)> {
let ts_column_schema = table_schema
Expand Down Expand Up @@ -231,7 +258,7 @@ fn build_row_key_desc(
fn build_column_family(
mut column_id: ColumnId,
table_name: &str,
table_schema: &SchemaRef,
table_schema: &Schema,
primary_key_indices: &[usize],
) -> Result<(ColumnId, ColumnFamilyDescriptor)> {
let mut builder = ColumnFamilyDescriptorBuilder::default();
Expand Down Expand Up @@ -454,7 +481,8 @@ impl<S: StorageEngine> MitoEngineInner<S> {

let Some((manifest, table_info)) = self
.recover_table_manifest_and_info(table_name, &table_dir)
.await? else { return Ok(None) };
.await.map_err(BoxedError::new)
.context(TableOperationSnafu)? else { return Ok(None) };

debug!(
"Opening table {}, table info recovered: {:?}",
Expand Down Expand Up @@ -500,16 +528,14 @@ impl<S: StorageEngine> MitoEngineInner<S> {
&self,
table_name: &str,
table_dir: &str,
) -> TableResult<Option<(TableManifest, TableInfo)>> {
) -> Result<Option<(TableManifest, TableInfo)>> {
let manifest = MitoTable::<<S as StorageEngine>::Region>::build_manifest(
table_dir,
self.object_store.clone(),
);
let Some(table_info) =
MitoTable::<<S as StorageEngine>::Region>::recover_table_info(table_name, &manifest)
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)? else { return Ok(None) };
.await? else { return Ok(None) };

Ok(Some((manifest, table_info)))
}
Expand Down
96 changes: 96 additions & 0 deletions src/mito/src/engine/procedure.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod create;

use std::sync::Arc;

use common_procedure::ProcedureManager;
pub(crate) use create::CreateMitoTable;
use store_api::storage::StorageEngine;

use crate::engine::MitoEngineInner;

/// Register all procedure loaders to the procedure manager.
///
/// # Panics
/// Panics on error.
pub(crate) fn register_procedure_loaders<S: StorageEngine>(
engine_inner: Arc<MitoEngineInner<S>>,
procedure_manager: &dyn ProcedureManager,
) {
// The procedure names are expected to be unique, so we just panic on error.
CreateMitoTable::register_loader(engine_inner, procedure_manager);
}

#[cfg(test)]
mod procedure_test_util {
use async_trait::async_trait;
use common_procedure::{
BoxedProcedure, Context, ContextProvider, ProcedureId, ProcedureState, Result, Status,
};
use log_store::NoopLogStore;
use storage::compaction::noop::NoopCompactionScheduler;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use tempdir::TempDir;

use super::*;
use crate::engine::{EngineConfig, MitoEngine};
use crate::table::test_util;

struct MockContextProvider {}

#[async_trait]
impl ContextProvider for MockContextProvider {
async fn procedure_state(
&self,
_procedure_id: ProcedureId,
) -> Result<Option<ProcedureState>> {
Ok(Some(ProcedureState::Done))
}
}

pub struct TestEnv {
pub table_engine: MitoEngine<EngineImpl<NoopLogStore>>,
pub dir: TempDir,
}

pub async fn setup_test_engine(path: &str) -> TestEnv {
let (dir, object_store) = test_util::new_test_object_store(path).await;
let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
let storage_engine = EngineImpl::new(
StorageEngineConfig::default(),
Arc::new(NoopLogStore::default()),
object_store.clone(),
compaction_scheduler,
);
let table_engine = MitoEngine::new(EngineConfig::default(), storage_engine, object_store);

TestEnv { table_engine, dir }
}

pub async fn execute_procedure_until_done(procedure: &mut BoxedProcedure) {
let ctx = Context {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider {}),
};

loop {
if let Status::Done = procedure.execute(&ctx).await.unwrap() {
break;
}
}
}
}
Loading

0 comments on commit aaaf241

Please sign in to comment.