Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Trino scale-down to zero with KEDA #726

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion distributed-databases/trino/addons.tf
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ module "aws_ebs_csi_pod_identity" {
#---------------------------------------------------------------
module "eks_blueprints_addons" {
source = "aws-ia/eks-blueprints-addons/aws"
version = "1.3"
version = "1.19"

cluster_name = module.eks.cluster_name
cluster_endpoint = module.eks.cluster_endpoint
Expand Down Expand Up @@ -222,3 +222,4 @@ resource "aws_secretsmanager_secret_version" "grafana" {
secret_id = aws_secretsmanager_secret.grafana.id
secret_string = random_password.grafana.result
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
use tpcds.sf10000;
select * from tpcds.sf10000.item limit 10;
select * from tpcds.sf10000.warehouse limit 10;

/*

drop schema iceberg.iceberg_schema;
drop table iceberg.iceberg_schema.warehouse;
drop table iceberg.iceberg_schema.item;
drop table iceberg.iceberg_schema.inventory;
drop table iceberg.iceberg_schema.date_dim;
*/
/* Iceberg schema creation */

create schema if not exists iceberg.iceberg_schema
with (LOCATION = 's3://$BUCKET/iceberg/');

/* Iceberg Table Creation with CTAS from tpcds tables */

create table if not exists iceberg.iceberg_schema.inventory
with (FORMAT = 'PARQUET')
as select *
from tpcds.sf10000.inventory;

create table if not exists iceberg.iceberg_schema.date_dim
with (FORMAT = 'PARQUET')
as select d_date_sk,
cast(d_date_id as varchar(16)) as d_date_id,
d_date,
d_month_seq,
d_week_seq,
d_quarter_seq,
d_year,
d_dow,
d_moy,
d_dom,
d_qoy,
d_fy_year,
d_fy_quarter_seq,
d_fy_week_seq,
cast(d_day_name as varchar(9)) as d_day_name,
cast(d_quarter_name as varchar(6)) as d_quarter_name,
cast(d_holiday as varchar(1)) as d_holiday,
cast(d_weekend as varchar(1)) as d_weekend,
cast(d_following_holiday as varchar(1)) as d_following_holiday,
d_first_dom,
d_last_dom,
d_same_day_ly,
d_same_day_lq,
cast(d_current_day as varchar(1)) as d_current_day,
cast(d_current_week as varchar(1)) as d_current_week,
cast(d_current_month as varchar(1)) as d_current_month,
cast(d_current_quarter as varchar(1)) as d_current_quarter
from tpcds.sf10000.date_dim;

create table if not exists iceberg.iceberg_schema.warehouse
with (FORMAT = 'PARQUET')
as select
w_warehouse_sk,
cast(w_warehouse_id as varchar(16)) as w_warehouse_id,
w_warehouse_name,
w_warehouse_sq_ft,
cast(w_street_number as varchar(10)) as w_street_number,
w_street_name,
cast(w_street_type as varchar(15)) as w_street_type,
cast(w_suite_number as varchar(10)) as w_suite_number,
w_city,
w_county,
cast(w_state as varchar(2)) as w_state,
cast(w_zip as varchar(10)) as w_zip,
w_country,
w_gmt_offset
from tpcds.sf10000.warehouse;

create table if not exists iceberg.iceberg_schema.item
with (FORMAT = 'PARQUET')
as select
i_item_sk,
cast(i_item_id as varchar(16)) as i_item_id,
i_rec_start_date,
i_rec_end_date,
i_item_desc,
i_current_price,
i_wholesale_cost,
i_brand_id,
cast(i_brand as varchar(50)) as i_brand,
i_class_id,
cast(i_class as varchar(50)) as i_class,
i_category_id,
cast(i_category as varchar(50)) as i_category,
i_manufact_id,
cast(i_manufact as varchar(50)) as i_manufact,
cast(i_size as varchar(50)) as i_size,
cast(i_formulation as varchar(20)) as i_formulation,
cast(i_color as varchar(20)) as i_color,
cast(i_units as varchar(10)) as i_units,
cast(i_container as varchar(10)) as i_container,
i_manager_id,
cast(i_product_name as varchar(50)) as i_product_name
from tpcds.sf10000.item;


/* Select from Iceberg table */

select * from iceberg.iceberg_schema.date_dim limit 10;
select * from iceberg.iceberg_schema.item limit 10;
select * from iceberg.iceberg_schema.inventory limit 10;

/* Running query from Iceberg table */

with inv as
(select w_warehouse_name,w_warehouse_sk,i_item_sk,d_moy
,stdev,mean, case mean when 0 then null else stdev/mean end cov
from(select w_warehouse_name,w_warehouse_sk,i_item_sk,d_moy
,stddev_samp(inv_quantity_on_hand) stdev,avg(inv_quantity_on_hand) mean
from iceberg.iceberg_schema.inventory
,iceberg.iceberg_schema.item
,iceberg.iceberg_schema.warehouse
,iceberg.iceberg_schema.date_dim
where inv_item_sk = i_item_sk
and inv_warehouse_sk = w_warehouse_sk
and inv_date_sk = d_date_sk
and d_year =1999
group by w_warehouse_name,w_warehouse_sk,i_item_sk,d_moy) foo
where case mean when 0 then 0 else stdev/mean end > 1)
select inv1.w_warehouse_sk,inv1.i_item_sk,inv1.d_moy,inv1.mean, inv1.cov
,inv2.w_warehouse_sk,inv2.i_item_sk,inv2.d_moy,inv2.mean, inv2.cov
from inv inv1,inv inv2
where inv1.i_item_sk = inv2.i_item_sk
and inv1.w_warehouse_sk = inv2.w_warehouse_sk
and inv1.d_moy=4
and inv2.d_moy=4+1
and inv1.cov > 1.5
order by inv1.w_warehouse_sk,inv1.i_item_sk,inv1.d_moy,inv1.mean,inv1.cov,inv2.d_moy,inv2.mean, inv2.cov;
14 changes: 5 additions & 9 deletions distributed-databases/trino/helm-values/trino.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,12 @@
tag: 447
pullPolicy: IfNotPresent
server:
workers: 3
workers: 1
exchangeManager:
name: filesystem
baseDir: "s3://${exchange_bucket_id}"
autoscaling:
enabled: false
minReplicas: 1
maxReplicas: 20
targetCPUUtilizationPercentage: 75
targetMemoryUtilizationPercentage: 80
# Add advanced scaling behavior from https://github.com/trinodb/charts/blob/4630167a839e6730c04cecc2af09ff038f522903/charts/trino/values.yaml#L94
config:
query:
Expand Down Expand Up @@ -61,9 +57,10 @@
limits:
cpu: "6000m" # Higher limit for spikes
memory: 40Gi
annotations:
karpenter.sh/do-not-disrupt: "true"
nodeSelector:
NodePool: trino-sql-karpenter
karpenter.sh/capacity-type: on-demand
topologySpreadConstraints:
- maxSkew: 1
topologyKey: kubernetes.io/hostname
Expand Down Expand Up @@ -95,7 +92,6 @@
memory: 112Gi
nodeSelector:
NodePool: trino-sql-karpenter
karpenter.sh/capacity-type: on-demand
topologySpreadConstraints:
- maxSkew: 1
topologyKey: kubernetes.io/hostname
Expand All @@ -110,7 +106,7 @@
- "query.remote-task.max-error-duration=1m"
- "query.max-hash-partition-count=100" # Updated from query.hash-partition-count
- "spill-enabled=true" # Updated from experimental.spill-enabled
- "spiller-spill-path=/tmp/spill" # Chagne this to SSD mount for faster

Check failure on line 109 in distributed-databases/trino/helm-values/trino.yaml

View workflow job for this annotation

GitHub Actions / Check for spelling errors

Chagne ==> Change
- "memory.heap-headroom-per-node=9.6GB"
- "optimizer.join-reordering-strategy=AUTOMATIC" # Updated from join-reordering-strategy
- "query.max-history=100"
Expand Down Expand Up @@ -166,7 +162,7 @@
ssl: false
lowercaseOutputName: false
lowercaseOutputLabelNames: false
whitelistObjectNames: ["trino.execution:name=QueryManager","trino.execution:name=SqlTaskManager","trino.execution.executor:name=TaskExecutor","trino.memory:name=ClusterMemoryManager","java.lang:type=Runtime","trino.memory:type=ClusterMemoryPool,name=general","java.lang:type=Memory","trino.memory:type=MemoryPool,name=general"]
whitelistObjectNames: ["trino.execution:name=QueryManager","trino.execution:name=ClusterSizeMonitor","trino.execution:name=SqlTaskManager","trino.execution.executor:name=TaskExecutor","trino.memory:name=ClusterMemoryManager","java.lang:type=Runtime","trino.memory:type=ClusterMemoryPool,name=general","java.lang:type=Memory","trino.memory:type=MemoryPool,name=general"]
autoExcludeObjectNameAttributes: true
excludeObjectNameAttributes:
"java.lang:type=OperatingSystem":
Expand All @@ -188,7 +184,7 @@
enabled: true
labels:
prometheus: kube-prometheus
interval: "30s"
interval: "15s"
coordinator:
enabled: true
labels:
Expand Down
Loading
Loading