Skip to content

Commit

Permalink
feat: support HTTP&gRPC&pg set timezone (GreptimeTeam#3125)
Browse files Browse the repository at this point in the history
* feat: support HTTP&gRPC&pg set timezone

* chore: fix code advice

* chore: fix code advice
  • Loading branch information
Taylor-lagrange authored Jan 15, 2024
1 parent 93f28c2 commit 816d948
Show file tree
Hide file tree
Showing 48 changed files with 568 additions and 132 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ etcd-client = "0.12"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a31ea166fc015ea7ff111ac94e26c3a5d64364d2" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2c1f17dce7af748c9a1255e82d6ceb7959f8919b" }
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/bin/nyc-taxi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr {
catalog_name: CATALOG_NAME.to_string(),
schema_name: SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
desc: "".to_string(),
desc: String::default(),
column_defs: vec![
ColumnDef {
name: "VendorID".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ mod tests {
assert_eq!(
StatusCode::StorageUnavailable,
Error::SystemCatalog {
msg: "".to_string(),
msg: String::default(),
location: Location::generate(),
}
.status_code()
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/kvbackend/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl KvBackendCatalogManager {
catalog_manager: me.clone(),
information_schema_provider: Arc::new(InformationSchemaProvider::new(
// The catalog name is not used in system_catalog, so let it empty
"".to_string(),
String::default(),
me.clone(),
)),
},
Expand Down
2 changes: 1 addition & 1 deletion src/client/examples/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn run() {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: "test_logical_dist_exec".to_string(),
desc: "".to_string(),
desc: String::default(),
column_defs: vec![
ColumnDef {
name: "timestamp".to_string(),
Expand Down
20 changes: 17 additions & 3 deletions src/client/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ pub struct Database {
// The dbname follows naming rule as out mysql, postgres and http
// protocol. The server treat dbname in priority of catalog/schema.
dbname: String,
// The time zone indicates the time zone where the user is located.
// Some queries need to be aware of the user's time zone to perform some specific actions.
timezone: String,

client: Client,
ctx: FlightContext,
Expand All @@ -58,7 +61,8 @@ impl Database {
Self {
catalog: catalog.into(),
schema: schema.into(),
dbname: "".to_string(),
dbname: String::default(),
timezone: String::default(),
client,
ctx: FlightContext::default(),
}
Expand All @@ -73,8 +77,9 @@ impl Database {
/// environment
pub fn new_with_dbname(dbname: impl Into<String>, client: Client) -> Self {
Self {
catalog: "".to_string(),
schema: "".to_string(),
catalog: String::default(),
schema: String::default(),
timezone: String::default(),
dbname: dbname.into(),
client,
ctx: FlightContext::default(),
Expand Down Expand Up @@ -105,6 +110,14 @@ impl Database {
self.dbname = dbname.into();
}

pub fn timezone(&self) -> &String {
&self.timezone
}

pub fn set_timezone(&mut self, timezone: impl Into<String>) {
self.timezone = timezone.into();
}

pub fn set_auth(&mut self, auth: AuthScheme) {
self.ctx.auth_header = Some(AuthHeader {
auth_scheme: Some(auth),
Expand Down Expand Up @@ -161,6 +174,7 @@ impl Database {
schema: self.schema.clone(),
authorization: self.ctx.auth_header.clone(),
dbname: self.dbname.clone(),
timezone: self.timezone.clone(),
// TODO(Taylor-lagrange): add client grpc tracing
tracing_context: W3cTrace::new(),
}),
Expand Down
4 changes: 2 additions & 2 deletions src/client/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,15 +230,15 @@ mod test {
let result = check_response_header(Some(ResponseHeader {
status: Some(PbStatus {
status_code: StatusCode::Success as u32,
err_msg: "".to_string(),
err_msg: String::default(),
}),
}));
assert!(result.is_ok());

let result = check_response_header(Some(ResponseHeader {
status: Some(PbStatus {
status_code: u32::MAX,
err_msg: "".to_string(),
err_msg: String::default(),
}),
}));
assert!(matches!(
Expand Down
2 changes: 1 addition & 1 deletion src/common/base/src/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ mod tests {
let bytes = StringBytes::from(hello.clone());
assert_eq!(bytes.len(), hello.len());

let zero = "".to_string();
let zero = String::default();
let bytes = StringBytes::from(zero);
assert!(bytes.is_empty());
}
Expand Down
10 changes: 5 additions & 5 deletions src/common/grpc-expr/src/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ mod tests {
#[test]
fn test_alter_expr_to_request() {
let expr = AlterExpr {
catalog_name: "".to_string(),
schema_name: "".to_string(),
catalog_name: String::default(),
schema_name: String::default(),
table_name: "monitor".to_string(),

kind: Some(Kind::AddColumns(AddColumns {
Expand Down Expand Up @@ -186,8 +186,8 @@ mod tests {
#[test]
fn test_alter_expr_with_location_to_request() {
let expr = AlterExpr {
catalog_name: "".to_string(),
schema_name: "".to_string(),
catalog_name: String::default(),
schema_name: String::default(),
table_name: "monitor".to_string(),

kind: Some(Kind::AddColumns(AddColumns {
Expand All @@ -204,7 +204,7 @@ mod tests {
}),
location: Some(Location {
location_type: LocationType::First.into(),
after_column_name: "".to_string(),
after_column_name: String::default(),
}),
},
AddColumn {
Expand Down
2 changes: 1 addition & 1 deletion src/common/procedure/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ pub struct ManagerConfig {
impl Default for ManagerConfig {
fn default() -> Self {
Self {
parent_path: "".to_string(),
parent_path: String::default(),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
remove_outdated_meta_task_interval: Duration::from_secs(60 * 10),
Expand Down
2 changes: 1 addition & 1 deletion src/common/query/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl From<&AddColumnLocation> for Location {
match value {
AddColumnLocation::First => Location {
location_type: LocationType::First.into(),
after_column_name: "".to_string(),
after_column_name: String::default(),
},
AddColumnLocation::After { column_name } => Location {
location_type: LocationType::After.into(),
Expand Down
8 changes: 4 additions & 4 deletions src/common/time/src/interval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ impl IntervalFormat {
return "PT0S".to_string();
}
let fract_str = match self.microseconds {
0 => "".to_string(),
0 => String::default(),
_ => format!(".{:06}", self.microseconds)
.trim_end_matches('0')
.to_string(),
Expand Down Expand Up @@ -446,7 +446,7 @@ impl IntervalFormat {
if self.is_zero() {
return "00:00:00".to_string();
}
let mut result = "".to_string();
let mut result = String::default();
if self.has_year_month() {
if self.years != 0 {
result.push_str(&format!("{} year ", self.years));
Expand All @@ -464,7 +464,7 @@ impl IntervalFormat {

/// get postgres time part(include hours, minutes, seconds, microseconds)
fn get_postgres_time_part(&self) -> String {
let mut time_part = "".to_string();
let mut time_part = String::default();
if self.has_time_part() {
let sign = if !self.has_time_part_positive() {
"-"
Expand Down Expand Up @@ -516,7 +516,7 @@ fn get_time_part(
is_time_part_positive: bool,
is_only_time: bool,
) -> String {
let mut interval = "".to_string();
let mut interval = String::default();
if is_time_part_positive && is_only_time {
interval.push_str(&format!("{}:{:02}:{:02}", hours, mins, secs));
} else {
Expand Down
10 changes: 10 additions & 0 deletions src/common/time/src/timezone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ pub fn get_timezone(tz: Option<Timezone>) -> Timezone {
})
}

#[inline(always)]
/// If the `tz = Some("") || None || Some(Invalid timezone)`, return system timezone,
/// or return parsed `tz` as timezone.
pub fn parse_timezone(tz: Option<&str>) -> Timezone {
match tz {
None | Some("") => Timezone::Named(Tz::UTC),
Some(tz) => Timezone::from_tz_string(tz).unwrap_or(Timezone::Named(Tz::UTC)),
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Timezone {
Offset(FixedOffset),
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,8 @@ pub fn check_permission(
// show create table and alter are not supported yet
Statement::ShowCreateTable(_) | Statement::CreateExternalTable(_) | Statement::Alter(_) => {
}
// set/show variable now only alter/show variable in session
Statement::SetVariables(_) | Statement::ShowVariables(_) => {}

Statement::Insert(insert) => {
validate_param(insert.table_name(), query_ctx)?;
Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/procedure/region_failover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ mod tests {
let nodes = (1..=region_distribution.len())
.map(|id| Peer {
id: id as u64,
addr: "".to_string(),
addr: String::default(),
})
.collect();
Arc::new(RandomNodeSelector { nodes })
Expand Down Expand Up @@ -751,7 +751,7 @@ mod tests {
peers: Arc::new(Mutex::new(vec![
Some(Peer {
id: 42,
addr: "".to_string(),
addr: String::default(),
}),
None,
])),
Expand Down
2 changes: 1 addition & 1 deletion src/meta-srv/src/procedure/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub mod mock {
header: Some(ResponseHeader {
status: Some(PbStatus {
status_code: 0,
err_msg: "".to_string(),
err_msg: String::default(),
}),
}),
affected_rows: 0,
Expand Down
4 changes: 2 additions & 2 deletions src/operator/src/expr_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ pub(crate) async fn create_external_expr(
catalog_name,
schema_name,
table_name,
desc: "".to_string(),
desc: String::default(),
column_defs,
time_index,
primary_keys,
Expand Down Expand Up @@ -198,7 +198,7 @@ pub fn create_to_expr(create: &CreateTable, query_ctx: QueryContextRef) -> Resul
catalog_name,
schema_name,
table_name,
desc: "".to_string(),
desc: String::default(),
column_defs: columns_to_expr(&create.columns, &time_index, &primary_keys)?,
time_index,
primary_keys,
Expand Down
49 changes: 45 additions & 4 deletions src/operator/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use common_meta::table_name::TableName;
use common_query::Output;
use common_telemetry::tracing;
use common_time::range::TimestampRange;
use common_time::Timestamp;
use common_time::{Timestamp, Timezone};
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use query::parser::QueryStatement;
use query::plan::LogicalPlan;
Expand All @@ -45,14 +45,14 @@ use sql::statements::copy::{CopyDatabaseArgument, CopyTable, CopyTableArgument};
use sql::statements::statement::Statement;
use sql::statements::OptionMap;
use sql::util::format_raw_object_name;
use sqlparser::ast::ObjectName;
use sqlparser::ast::{Expr, ObjectName, Value};
use table::engine::TableReference;
use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest};
use table::TableRef;

use crate::error::{
self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, PlanStatementSnafu,
Result, TableNotFoundSnafu,
self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, NotSupportedSnafu,
PlanStatementSnafu, Result, TableNotFoundSnafu,
};
use crate::insert::InserterRef;
use crate::statement::backup::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
Expand Down Expand Up @@ -188,6 +188,20 @@ impl StatementExecutor {
self.show_create_table(table_name, table_ref, query_ctx)
.await
}
Statement::SetVariables(set_var) => {
let var_name = set_var.variable.to_string().to_uppercase();
match var_name.as_str() {
"TIMEZONE" | "TIME_ZONE" => set_timezone(set_var.value, query_ctx)?,
_ => {
return NotSupportedSnafu {
feat: format!("Unsupported set variable {}", var_name),
}
.fail()
}
}
Ok(Output::AffectedRows(0))
}
Statement::ShowVariables(show_variable) => self.show_variable(show_variable, query_ctx),
}
}

Expand Down Expand Up @@ -228,6 +242,33 @@ impl StatementExecutor {
}
}

fn set_timezone(exprs: Vec<Expr>, ctx: QueryContextRef) -> Result<()> {
let tz_expr = exprs.first().context(NotSupportedSnafu {
feat: "No timezone find in set variable statement",
})?;
match tz_expr {
Expr::Value(Value::SingleQuotedString(tz)) | Expr::Value(Value::DoubleQuotedString(tz)) => {
match Timezone::from_tz_string(tz.as_str()) {
Ok(timezone) => ctx.set_timezone(timezone),
Err(_) => {
return NotSupportedSnafu {
feat: format!("Invalid timezone expr {} in set variable statement", tz),
}
.fail()
}
}
Ok(())
}
expr => NotSupportedSnafu {
feat: format!(
"Unsupported timezone expr {} in set variable statement",
expr
),
}
.fail(),
}
}

fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<CopyTableRequest> {
let direction = match stmt {
CopyTable::To(_) => CopyDirection::Export,
Expand Down
7 changes: 6 additions & 1 deletion src/operator/src/statement/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use session::context::QueryContextRef;
use snafu::ResultExt;
use sql::ast::{Ident, Value as SqlValue};
use sql::statements::create::{PartitionEntry, Partitions};
use sql::statements::show::{ShowDatabases, ShowTables};
use sql::statements::show::{ShowDatabases, ShowTables, ShowVariables};
use sql::{statements, MAXVALUE};
use table::TableRef;

Expand Down Expand Up @@ -71,6 +71,11 @@ impl StatementExecutor {
query::sql::show_create_table(table, partitions, query_ctx)
.context(error::ExecuteStatementSnafu)
}

#[tracing::instrument(skip_all)]
pub fn show_variable(&self, stmt: ShowVariables, query_ctx: QueryContextRef) -> Result<Output> {
query::sql::show_variable(stmt, query_ctx).context(error::ExecuteStatementSnafu)
}
}

fn create_partitions_stmt(partitions: Vec<PartitionInfo>) -> Result<Option<Partitions>> {
Expand Down
Loading

0 comments on commit 816d948

Please sign in to comment.