Skip to content

Commit

Permalink
feat: Separate cache operation log environment and index environments (
Browse files Browse the repository at this point in the history
  • Loading branch information
chubei authored Mar 10, 2023
1 parent 488df7f commit a569032
Show file tree
Hide file tree
Showing 48 changed files with 2,200 additions and 1,633 deletions.
4 changes: 1 addition & 3 deletions dozer-api/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ pub enum ApiError {
OpenCache(#[source] CacheError),
#[error("Failed to open cache: {0}")]
CacheNotFound(String),
#[error("Cannot find schema by name")]
SchemaNotFound(#[source] CacheError),
#[error("Get by primary key is not supported when there is no primary key")]
NoPrimaryKey,
#[error("Get by primary key is not supported when it is composite: {0:?}")]
Expand Down Expand Up @@ -141,7 +139,7 @@ impl actix_web::error::ResponseError for ApiError {
ApiError::TypeError(_) => StatusCode::BAD_REQUEST,
ApiError::ApiAuthError(_) => StatusCode::UNAUTHORIZED,
ApiError::NotFound(_) => StatusCode::NOT_FOUND,
ApiError::SchemaNotFound(_) | ApiError::NoPrimaryKey | ApiError::MultiIndexFetch(_) => {
ApiError::NoPrimaryKey | ApiError::MultiIndexFetch(_) => {
StatusCode::UNPROCESSABLE_ENTITY
}
ApiError::InternalError(_)
Expand Down
11 changes: 2 additions & 9 deletions dozer-api/src/grpc/common/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,7 @@ impl CommonGrpcService for CommonService {

let cache_reader = cache_endpoint.cache_reader();
let records = shared_impl::query(&cache_reader, query_request.query.as_deref(), access)?;
let schema = &cache_reader
.get_schema()
.map_err(|_| Status::invalid_argument(&cache_endpoint.endpoint.name))?
.0;
let schema = &cache_reader.get_schema().0;

let fields = map_field_definitions(schema.fields.clone());
let records = records.into_iter().map(map_record).collect();
Expand All @@ -113,7 +110,6 @@ impl CommonGrpcService for CommonService {

shared_impl::on_event(
&cache_endpoint.cache_reader(),
&cache_endpoint.endpoint.name,
query_request.filter.as_deref(),
self.event_notifier.as_ref().map(|r| r.resubscribe()),
access.cloned(),
Expand Down Expand Up @@ -147,10 +143,7 @@ impl CommonGrpcService for CommonService {
.map_or(Err(Status::invalid_argument(&endpoint)), Ok)?;

let cache_reader = cache_endpoint.cache_reader();
let schema = &cache_reader
.get_schema()
.map_err(|_| Status::invalid_argument(endpoint))?
.0;
let schema = &cache_reader.get_schema().0;

let fields = map_field_definitions(schema.fields.clone());

Expand Down
7 changes: 1 addition & 6 deletions dozer-api/src/grpc/shared_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ pub fn query(

pub fn on_event<T: Send + 'static>(
reader: &CacheReader,
endpoint_name: &str,
filter: Option<&str>,
mut broadcast_receiver: Option<Receiver<Operation>>,
_access: Option<Access>,
Expand All @@ -82,11 +81,7 @@ pub fn on_event<T: Send + 'static>(
}
None => None,
};
let schema = reader
.get_schema()
.map_err(|_| Status::invalid_argument(endpoint_name))?
.0
.clone();
let schema = reader.get_schema().0.clone();

let (tx, rx) = tokio::sync::mpsc::channel(1);

Expand Down
21 changes: 7 additions & 14 deletions dozer-api/src/grpc/typed/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,20 +347,13 @@ fn on_event(
.transpose()?;

let endpoint_to_be_streamed = endpoint_name.to_string();
shared_impl::on_event(
reader,
endpoint_name,
filter,
event_notifier,
access.cloned(),
move |op| {
if endpoint_to_be_streamed == op.endpoint_name {
Some(Ok(on_event_to_typed_response(op, event_desc.clone())))
} else {
None
}
},
)
shared_impl::on_event(reader, filter, event_notifier, access.cloned(), move |op| {
if endpoint_to_be_streamed == op.endpoint_name {
Some(Ok(on_event_to_typed_response(op, event_desc.clone())))
} else {
None
}
})
}

fn token(
Expand Down
12 changes: 3 additions & 9 deletions dozer-api/src/rest/api_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use dozer_types::serde_json;
use dozer_types::serde_json::{json, Map, Value};

fn generate_oapi3(reader: &CacheReader, endpoint: ApiEndpoint) -> Result<OpenAPI, ApiError> {
let (schema, secondary_indexes) = reader.get_schema().map_err(ApiError::SchemaNotFound)?;
let (schema, secondary_indexes) = reader.get_schema();

let oapi_generator = OpenApiGenerator::new(
schema,
Expand Down Expand Up @@ -53,10 +53,7 @@ pub async fn get(
path: web::Path<String>,
) -> Result<HttpResponse, ApiError> {
let cache_reader = &cache_endpoint.cache_reader();
let schema = &cache_reader
.get_schema()
.map_err(ApiError::SchemaNotFound)?
.0;
let schema = &cache_reader.get_schema().0;

let key = path.as_str();
let key = if schema.primary_index.is_empty() {
Expand Down Expand Up @@ -151,10 +148,7 @@ fn get_records_map(
let mut maps = vec![];
let cache_reader = &cache_endpoint.cache_reader();
let records = get_records(cache_reader, exp, access.map(|a| a.into_inner()))?;
let schema = &cache_reader
.get_schema()
.map_err(ApiError::SchemaNotFound)?
.0;
let schema = &cache_reader.get_schema().0;
for record in records.into_iter() {
let map = record_to_map(record, schema)?;
maps.push(map);
Expand Down
6 changes: 3 additions & 3 deletions dozer-api/src/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use dozer_types::serde_json::{json, Value};
use dozer_types::types::{Field, Record, SourceDefinition};
use dozer_types::types::{Field, Record, SchemaWithIndex, SourceDefinition};
use dozer_types::{
models::api_endpoint::{ApiEndpoint, ApiIndex},
types::{FieldDefinition, FieldType, IndexDefinition, Schema, SchemaIdentifier},
};

use dozer_cache::cache::{CacheManager, LmdbCacheManager, RecordWithId};

pub fn get_schema() -> (Schema, Vec<IndexDefinition>) {
pub fn get_schema() -> SchemaWithIndex {
let fields = vec![
FieldDefinition {
name: "film_id".to_string(),
Expand Down Expand Up @@ -101,7 +101,7 @@ fn get_films() -> Vec<Value> {

pub fn initialize_cache(
schema_name: &str,
schema: Option<(Schema, Vec<IndexDefinition>)>,
schema: Option<SchemaWithIndex>,
) -> Box<dyn CacheManager> {
let cache_manager = LmdbCacheManager::new(Default::default()).unwrap();
let (schema, secondary_indexes) = schema.unwrap_or_else(get_schema);
Expand Down
Loading

0 comments on commit a569032

Please sign in to comment.