Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Browse files Browse the repository at this point in the history
…nto wcy/azblob_file_scan.pr
  • Loading branch information
wcy-fdu committed Jan 9, 2025
2 parents 14ef860 + bb4dec2 commit c41d56b
Show file tree
Hide file tree
Showing 65 changed files with 2,872 additions and 2,144 deletions.
145 changes: 62 additions & 83 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,6 @@ tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev =
sqlx = { git = "https://github.com/madsim-rs/sqlx.git", rev = "3efe6d0065963db2a2b7f30dee69f647e28dec81" }
# patch to remove preserve_order from serde_json
bson = { git = "https://github.com/risingwavelabs/bson-rust", rev = "e5175ec" }
# TODO: unpatch after PR merged https://github.com/tokio-rs/prost/pull/1210
prost-build = { git = "https://github.com/xxchan/prost.git", rev = "0eb1c7b09976cf6b5231e4b8d87bb5908ae6a163" }

[workspace.metadata.dylint]
libraries = [{ path = "./lints" }]
6 changes: 6 additions & 0 deletions ci/scripts/check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ configure_static_openssl
echo "--- Run trailing spaces check"
scripts/check/check-trailing-spaces.sh

echo "--- Check protobuf code format && Lint protobuf"
cd proto
buf format -d --exit-code
buf lint
cd ..

echo "--- Rust cargo-sort check"
cargo sort --check --workspace --grouped

Expand Down
5 changes: 5 additions & 0 deletions ci/scripts/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ export RW_SECRET_STORE_PRIVATE_KEY_HEX="0123456789abcdef0123456789abcdef"
unset LANG

function dump_diagnose_info() {
ret=$?
if [ $ret -eq 0 ]; then
exit 0
fi

echo "^^^ +++"
echo "--- Failed to run command! Dumping diagnose info..."
if [ -f .risingwave/config/risedev-env ]; then
Expand Down
3 changes: 3 additions & 0 deletions ci/scripts/e2e-iceberg-sink-v2-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ echo "--- preparing iceberg"
cd e2e_test/iceberg
bash ./start_spark_connect_server.sh

echo "--- Running tests"
# Don't remove the `--quiet` option since poetry has a bug when printing output, see
# https://github.com/python-poetry/poetry/issues/3412
poetry update --quiet
Expand All @@ -52,6 +53,8 @@ poetry run python main.py -t ./test_case/iceberg_source_all_delete.toml
poetry run python main.py -t ./test_case/iceberg_source_explain_for_delete.toml
poetry run python main.py -t ./test_case/iceberg_predicate_pushdown.toml

echo "--- Running benchmarks"
poetry run python main.py -t ./benches/predicate_pushdown.toml

echo "--- Kill cluster"
cd ../../
Expand Down
11 changes: 0 additions & 11 deletions ci/scripts/misc-check.sh

This file was deleted.

2 changes: 1 addition & 1 deletion ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ steps:
<<: *docker-compose-common
run: source-test-env
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 15
timeout_in_minutes: 20
retry: *auto-retry

- label: "end-to-end sink test ({{matrix.backend}} backend)"
Expand Down
15 changes: 0 additions & 15 deletions ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -656,21 +656,6 @@ steps:
cancel_on_build_failing: true
retry: *auto-retry

- label: "misc check"
command: "ci/scripts/misc-check.sh"
if: |
!(build.pull_request.labels includes "ci/pr/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-misc-check"
|| build.env("CI_STEPS") =~ /(^|,)misc-check(,|$$)/
plugins:
- docker-compose#v5.5.0:
run: rw-build-env
config: ci/docker-compose.yml
- shellcheck#v1.2.0:
files: ./**/*.sh
timeout_in_minutes: 5
retry: *auto-retry

# The following jobs are triggered only when PR has corresponding labels.

# Generates cpu flamegraph env
Expand Down
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

2 changes: 0 additions & 2 deletions docs/dev/src/tests/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ RisingWave requires all code to pass fmt, clippy, sort and hakari checks. Run th
./risedev c # Run all checks. Shortcut for ./risedev check
```

There are also some miscellaneous checks. See `ci/scripts/misc-check.sh`.

## Unit and integration tests

RiseDev runs unit tests with cargo-nextest. To run unit tests:
Expand Down
65 changes: 65 additions & 0 deletions e2e_test/iceberg/benches/predicate_pushdown.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
statement ok
CREATE TABLE t1 (i1 int, i2 varchar, i3 varchar);

statement ok
INSERT INTO t1 select key, 'some long string of text', 'another long string of text' from generate_series(1, 1000000) as key;

statement ok
CREATE SINK sink1 AS select * from t1 WITH (
connector = 'iceberg',
type = 'append-only',
force_append_only = 'true',
database.name = 'demo_db',
table.name = 't1',
catalog.name = 'demo',
catalog.type = 'storage',
warehouse.path = 's3a://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
commit_checkpoint_interval = 1,
create_table_if_not_exists = 'true'
);

statement ok
CREATE SOURCE iceberg_t1_source
WITH (
connector = 'iceberg',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
s3.path.style.access = 'true',
catalog.type = 'storage',
warehouse.path = 's3a://icebergdata/demo',
database.name = 'demo_db',
table.name = 't1',
);

statement ok
flush;

query I
select count(*) from iceberg_t1_source;
----
1000000

# warmup
include ./predicate_pushdown/point_get.slt.part
# bench
include ./predicate_pushdown/point_get.slt.part

# warmup
include ./predicate_pushdown/filter.slt.part
# bench
include ./predicate_pushdown/filter.slt.part

statement ok
DROP SINK sink1;

statement ok
DROP SOURCE iceberg_t1_source;

statement ok
DROP TABLE t1 cascade;
11 changes: 11 additions & 0 deletions e2e_test/iceberg/benches/predicate_pushdown.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
init_sqls = [
'CREATE SCHEMA IF NOT EXISTS demo_db',
'DROP TABLE IF EXISTS demo_db.t1',
]

slt = 'benches/predicate_pushdown.slt'

drop_sqls = [
'DROP TABLE IF EXISTS demo_db.t1',
'DROP SCHEMA IF EXISTS demo_db',
]
4 changes: 4 additions & 0 deletions e2e_test/iceberg/benches/predicate_pushdown/filter.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
query I
select count(*) from iceberg_t1_source where i1 > 1001 and i1 < 1110;
----
108
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
query I
select * from iceberg_t1_source where i1 = 100000;
----
100000 some long string of text another long string of text
55 changes: 55 additions & 0 deletions e2e_test/s3/file_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,46 @@ def do_test(config, file_num, item_num_per_file, prefix):
def _table():
return 's3_test_parquet'

print("test table function file scan")
cur.execute(f'''
SELECT
id,
name,
sex,
mark,
test_int,
test_int8,
test_uint8,
test_uint16,
test_uint32,
test_uint64,
test_float_16,
test_real,
test_double_precision,
test_varchar,
test_bytea,
test_date,
test_time,
test_timestamp_s,
test_timestamp_ms,
test_timestamp_us,
test_timestamp_ns,
test_timestamptz_s,
test_timestamptz_ms,
test_timestamptz_us,
test_timestamptz_ns
FROM file_scan(
'parquet',
's3',
'http://127.0.0.1:9301',
'hummockadmin',
'hummockadmin',
's3://hummock001/test_file_scan/test_file_scan.parquet'
);''')
result = cur.fetchone()
assert result[0] == 0, f'file scan assertion failed: the first column is {result[0]}, expect 0.'

print("file scan test pass")
# Execute a SELECT statement
cur.execute(f'''CREATE TABLE {_table()}(
id bigint primary key,
Expand Down Expand Up @@ -491,6 +531,21 @@ def _assert_greater(field, got, expect):
_s3(idx),
_local(idx)
)
# put parquet file to test table function file scan
if data:
first_file_data = data[0]
first_table = pa.Table.from_pandas(pd.DataFrame(first_file_data))

first_file_name = f"test_file_scan.parquet"
first_file_path = f"test_file_scan/{first_file_name}"

pq.write_table(first_table, "data_0.parquet")

client.fput_object(
"hummock001",
first_file_path,
"data_0.parquet"
)

# do test
do_test(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id)
Expand Down
108 changes: 108 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -4661,6 +4661,113 @@ def section_udf(outer_panels):
)
]

def section_alert_overview(panels):
return [
panels.row("Alert Overview"),
panels.timeseries_count(
"Alerts",
"""Alerts in the system group by type:
- Too Many Barriers: there are too many uncommitted barriers generated. This means the streaming graph is stuck or under heavy load. Check 'Barrier Latency' panel.
- Recovery Triggered: cluster recovery is triggered. Check 'Errors by Type' / 'Node Count' panels.
- Lagging Version: the checkpointed or pinned version id is lagging behind the current version id. Check 'Hummock Manager' section in dev dashboard.
- Lagging Compaction: there are too many ssts in L0. This can be caused by compactor failure or lag of compactor resource. Check 'Compaction' section in dev dashboard, and take care of the type of 'Commit Flush Bytes' and 'Compaction Throughput', whether the throughput is too low.
- Lagging Vacuum: there are too many stale files waiting to be cleaned. This can be caused by compactor failure or lag of compactor resource. Check 'Compaction' section in dev dashboard.
- Abnormal Meta Cache Memory: the meta cache memory usage is too large, exceeding the expected 10 percent.
- Abnormal Block Cache Memory: the block cache memory usage is too large, exceeding the expected 10 percent.
- Abnormal Uploading Memory Usage: uploading memory is more than 70 percent of the expected, and is about to spill.
- Write Stall: Compaction cannot keep up. Stall foreground write, Check 'Compaction' section in dev dashboard.
- Abnormal Version Size: the size of the version is too large, exceeding the expected 300MB. Check 'Hummock Manager' section in dev dashboard.
- Abnormal Delta Log Number: the number of delta logs is too large, exceeding the expected 5000. Check 'Hummock Manager' and `Compaction` section in dev dashboard and take care of the type of 'Compaction Success Count', whether the number of trivial-move tasks spiking.
- Abnormal Pending Event Number: the number of pending events is too large, exceeding the expected 10000000. Check 'Hummock Write' section in dev dashboard and take care of the 'Event handle latency', whether the time consumed exceeds the barrier latency.
- Abnormal Object Storage Failure: the number of object storage failures is too large, exceeding the expected 50. Check 'Object Storage' section in dev dashboard and take care of the 'Object Storage Failure Rate', whether the rate is too high.
""",
[
panels.target(
f"{metric('all_barrier_nums')} >= bool 200",
"Too Many Barriers {{database_id}}",
),
panels.target(
f"sum(rate({metric('recovery_latency_count')}[$__rate_interval])) > bool 0 + sum({metric('recovery_failure_cnt')}) > bool 0",
"Recovery Triggered",
),
panels.target(
f"(({metric('storage_current_version_id')} - {metric('storage_checkpoint_version_id')}) >= bool 100) + "
+ f"(({metric('storage_current_version_id')} - {metric('storage_min_pinned_version_id')}) >= bool 100)",
"Lagging Version",
),
panels.target(
f"sum(label_replace({metric('storage_level_total_file_size')}, 'L0', 'L0', 'level_index', '.*_L0') unless "
+ f"{metric('storage_level_total_file_size')}) by (L0) >= bool 52428800",
"Lagging Compaction",
),
panels.target(
f"{metric('storage_stale_object_count')} >= bool 200",
"Lagging Vacuum",
),
panels.target(
f"{metric('state_store_meta_cache_usage_ratio')} >= bool 1.1",
"Abnormal Meta Cache Memory",
),
panels.target(
f"{metric('state_store_block_cache_usage_ratio')} >= bool 1.1",
"Abnormal Block Cache Memory",
),
panels.target(
f"{metric('state_store_uploading_memory_usage_ratio')} >= bool 0.7",
"Abnormal Uploading Memory Usage",
),
panels.target(
f"{metric('storage_write_stop_compaction_groups')} > bool 0",
"Write Stall",
),
panels.target(
f"{metric('storage_version_size')} >= bool 314572800",
"Abnormal Version Size",
),
panels.target(
f"{metric('storage_delta_log_count')} >= bool 5000",
"Abnormal Delta Log Number",
),
panels.target(
f"{metric('state_store_event_handler_pending_event')} >= bool 10000000",
"Abnormal Pending Event Number",
),
panels.target(
f"{metric('object_store_failure_count')} >= bool 50",
"Abnormal Object Storage Failure",
),
],
["last"],
),
panels.timeseries_count(
"Errors",
"Errors in the system group by type",
[
panels.target(
f"sum({metric('user_compute_error')}) by (error_type, executor_name, fragment_id)",
"{{error_type}} @ {{executor_name}} (fragment_id={{fragment_id}})",
),
panels.target(
f"sum({metric('user_source_error')}) by (error_type, source_id, source_name, fragment_id)",
"{{error_type}} @ {{source_name}} (source_id={{source_id}} fragment_id={{fragment_id}})",
),
panels.target(
f"sum({metric('user_sink_error')}) by (error_type, sink_id, sink_name, fragment_id)",
"{{error_type}} @ {{sink_name}} (sink_id={{sink_id}} fragment_id={{fragment_id}})",
),
panels.target(
f"{metric('source_status_is_up')} == 0",
"source error: source_id={{source_id}}, source_name={{source_name}} @ {{%s}}"
% NODE_LABEL,
),
panels.target(
f"sum(rate({metric('object_store_failure_count')}[$__rate_interval])) by ({NODE_LABEL}, {COMPONENT_LABEL}, type)",
"remote storage error {{type}}: {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
],
),
]

templating_list = []
if dynamic_source_enabled:
Expand Down Expand Up @@ -4840,5 +4947,6 @@ def section_udf(outer_panels):
*section_network_connection(panels),
*section_iceberg_metrics(panels),
*section_udf(panels),
*section_alert_overview(panels),
],
).auto_panel_ids()
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

Loading

0 comments on commit c41d56b

Please sign in to comment.