Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DNM Maybe working migrations #30980

Closed
wants to merge 50 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
a8ad4ba
storage: Simplify dataflow as_of computation
jkosh44 Jan 8, 2025
b84e290
storage: Always initialize ingestion statistics
jkosh44 Jan 8, 2025
785d6a7
More generic sender for `ReadHold` changes
teskje Oct 21, 2024
92f56f3
controller/compute: sequence external read hold changes
teskje Oct 21, 2024
062dcd8
Revert "controller/compute: defer application of early read hold chan…
teskje Dec 20, 2024
bbe7a98
compute: correctly propagate read holds in shutdown check
teskje Oct 23, 2024
84cdf01
controller/compute: send all read hold changes through command_tx
teskje Dec 20, 2024
ca7c42c
envd: make tests robust against slower read hold downgrade
teskje Dec 20, 2024
3dedd37
kafka-source: detect topic deletion in metadata fetcher
teskje Dec 19, 2024
bc83246
kafka: don't shutdown metadata fetcher on error
teskje Jan 6, 2025
3fe9148
storage: in upsert StateValue allow converting finalized tombstones
aljoscha Jan 8, 2025
2a90439
build: Use "limited" debug info instead of "line-tables" (#30962)
ParkMyCar Jan 8, 2025
2ec6f53
storage: Correct output index for source exports
jkosh44 Jan 8, 2025
a0b1299
Opt-in catalog migration for converting subsources to source tables
rjobanp Sep 16, 2024
4ce1748
Add migration logic for source statements
rjobanp Oct 23, 2024
f9b3c01
Rename the feature flag
rjobanp Oct 23, 2024
1094734
Add new table to audit log
rjobanp Oct 23, 2024
aa9d33c
Add platform check scenario to test migration
rjobanp Oct 23, 2024
b810598
Switch to using system vars instead of flags to allow console access …
rjobanp Oct 24, 2024
a9ac9dc
Fixes to migration based on testing
rjobanp Oct 24, 2024
869dde3
Also test the migration in the legacy upgrade tests
rjobanp Oct 24, 2024
7daeefc
platform checks: unique source name
nrainer-materialize Oct 25, 2024
3144c3e
Migration structure cleanup from feedback
rjobanp Oct 25, 2024
b08465c
Address more feedback; ensure new source name is unique
rjobanp Oct 25, 2024
8cc26c9
Fix legacy-upgrade checks
rjobanp Oct 25, 2024
a061b34
Fixes caused by rebase on main
rjobanp Oct 25, 2024
12428f7
ci: print source table migration issues
nrainer-materialize Oct 28, 2024
ebcd57e
migration tests: pg-cdc-old-syntax
nrainer-materialize Oct 25, 2024
db36e65
migration tests: extract logic
nrainer-materialize Oct 28, 2024
7a36b2a
migration tests: improve verification
nrainer-materialize Oct 28, 2024
093bc6c
migration tests: mysql-cdc-old-syntax
nrainer-materialize Oct 28, 2024
e2ec8a6
migration tests: testdrive-old-kafka-syntax
nrainer-materialize Oct 28, 2024
08b6748
migration tests: improve output
nrainer-materialize Oct 29, 2024
9879b1b
migration tests: fixes
nrainer-materialize Oct 29, 2024
6ac9b65
migration tests: fixes
nrainer-materialize Oct 29, 2024
03f7f3a
Fix for mysql source being restarted after new table added
rjobanp Oct 29, 2024
cb52018
Avoid needing to rewrite ids of dependent statements by changing the …
rjobanp Oct 29, 2024
6a42fca
Fix merge skew
jkosh44 Nov 14, 2024
5cb65cf
Fix dependency tracking
jkosh44 Nov 14, 2024
dc1a48a
Fix lint
jkosh44 Nov 14, 2024
b14cc63
Fix some issues
jkosh44 Nov 14, 2024
1763a86
Fix dependency tracking
jkosh44 Nov 14, 2024
ace0355
Fix merge skew
jkosh44 Nov 15, 2024
bdb5783
Update test versions
jkosh44 Nov 15, 2024
032cc84
More merge skew fixes
jkosh44 Dec 12, 2024
31fb760
Update item sorting to only sort within item groups
jkosh44 Dec 16, 2024
ac821fa
Experiment for migrate
jkosh44 Dec 17, 2024
f56fac0
Fix migration idempotency
jkosh44 Dec 18, 2024
51647b6
Fixup
jkosh44 Dec 18, 2024
6462bfc
resolve merge conflicts
jkosh44 Jan 2, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ build:debuginfo-full --copt=-g2
build:debuginfo-full --strip=never
build:debuginfo-full --@rules_rust//:extra_rustc_flag=-Cstrip=none

build:debuginfo-limited --@rules_rust//:extra_rustc_flag=-Cdebuginfo=line-tables-only
build:debuginfo-limited --copt=-gline-tables-only
build:debuginfo-limited --@rules_rust//:extra_rustc_flag=-Cdebuginfo=1
build:debuginfo-limited --copt=-g1
build:debuginfo-limited --strip=never
build:debuginfo-limited --@rules_rust//:extra_rustc_flag=-Cstrip=none

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions ci/nightly/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,17 @@ steps:
agents:
queue: hetzner-aarch64-4cpu-8gb

- id: mysql-cdc-migration
label: MySQL CDC source-versioning migration tests
depends_on: build-aarch64
timeout_in_minutes: 360
plugins:
- ./ci/plugins/mzcompose:
composition: mysql-cdc-old-syntax
run: migration
agents:
queue: hetzner-aarch64-4cpu-8gb

- id: mysql-cdc-resumption-old-syntax
label: MySQL CDC resumption tests (before source versioning)
depends_on: build-aarch64
Expand Down Expand Up @@ -602,6 +613,17 @@ steps:
queue: hetzner-aarch64-4cpu-8gb
# the mzbuild postgres version will be used, which depends on the Dockerfile specification

- id: pg-cdc-migration
label: Postgres CDC source-versioning migration tests
depends_on: build-aarch64
timeout_in_minutes: 360
plugins:
- ./ci/plugins/mzcompose:
composition: pg-cdc-old-syntax
run: migration
agents:
queue: hetzner-aarch64-4cpu-8gb

- id: pg-cdc-resumption-old-syntax
label: Postgres CDC resumption tests (before source versioning)
depends_on: build-aarch64
Expand Down Expand Up @@ -632,6 +654,17 @@ steps:
agents:
queue: hetzner-aarch64-8cpu-16gb

- id: testdrive-kafka-migration
label: "Testdrive %N migration tests"
depends_on: build-aarch64
timeout_in_minutes: 180
parallelism: 8
plugins:
- ./ci/plugins/mzcompose:
composition: testdrive-old-kafka-src-syntax
run: migration
agents:
queue: hetzner-aarch64-8cpu-16gb

- group: AWS
key: aws
Expand Down
56 changes: 56 additions & 0 deletions misc/python/materialize/checks/all_checks/upsert.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,59 @@ def validate(self) -> Testdrive:
"""
)
)


class UpsertLegacy(Check):
"""
An upsert source test that uses the legacy syntax to create the source
on all versions to ensure the source is properly migrated with the
ActivateSourceVersioningMigration scenario
"""

def initialize(self) -> Testdrive:
return Testdrive(
schemas()
+ dedent(
"""
$ kafka-create-topic topic=upsert-legacy-syntax

$ kafka-ingest format=avro key-format=avro topic=upsert-legacy-syntax key-schema=${keyschema} schema=${schema} repeat=10000
{"key1": "A${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}

> CREATE SOURCE upsert_insert_legacy
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-legacy-syntax-${testdrive.seed}')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE UPSERT

> CREATE MATERIALIZED VIEW upsert_insert_legacy_view AS SELECT COUNT(DISTINCT key1 || ' ' || f1) FROM upsert_insert_legacy;
"""
)
)

def manipulate(self) -> list[Testdrive]:
return [
Testdrive(schemas() + dedent(s))
for s in [
"""
$ kafka-ingest format=avro key-format=avro topic=upsert-legacy-syntax key-schema=${keyschema} schema=${schema} repeat=10000
{"key1": "A${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}
""",
"""
$ kafka-ingest format=avro key-format=avro topic=upsert-legacy-syntax key-schema=${keyschema} schema=${schema} repeat=10000
{"key1": "A${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}
""",
]
]

def validate(self) -> Testdrive:
return Testdrive(
dedent(
"""
> SELECT COUNT(*), COUNT(DISTINCT key1), COUNT(DISTINCT f1) FROM upsert_insert_legacy
10000 10000 10000

> SELECT * FROM upsert_insert_legacy_view;
10000
"""
)
)
46 changes: 46 additions & 0 deletions misc/python/materialize/checks/scenarios_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,3 +383,49 @@ def actions(self) -> list[Action]:
),
Validate(self),
]


class ActivateSourceVersioningMigration(Scenario):
"""
Starts MZ, initializes and manipulates, then forces the migration
of sources to the new table model (introducing Source Versioning).
"""

def base_version(self) -> MzVersion:
return get_last_version()

def actions(self) -> list[Action]:
print(f"Upgrading from tag {self.base_version()}")
return [
StartMz(
self,
tag=self.base_version(),
),
Initialize(self),
Manipulate(self, phase=1),
KillMz(
capture_logs=True
), # We always use True here otherwise docker-compose will lose the pre-upgrade logs
StartMz(
self,
tag=None,
# Activate the `force_source_table_syntax` flag
# which should trigger the migration of sources
# using the old syntax to the new table model.
additional_system_parameter_defaults={
"force_source_table_syntax": "true",
},
),
Manipulate(self, phase=2),
Validate(self),
# A second restart while already on the new version
KillMz(capture_logs=True),
StartMz(
self,
tag=None,
additional_system_parameter_defaults={
"force_source_table_syntax": "true",
},
),
Validate(self),
]
2 changes: 2 additions & 0 deletions misc/python/materialize/cli/ci_annotate_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@
| (FAIL|TIMEOUT)\s+\[\s*\d+\.\d+s\]
# parallel-workload
| worker_.*\ still\ running: [\s\S]* Threads\ have\ not\ stopped\ within\ 5\ minutes,\ exiting\ hard
# source-table migration
| source-table-migration\ issue
)
.* $
""",
Expand Down
5 changes: 4 additions & 1 deletion misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@


def get_default_system_parameters(
version: MzVersion | None = None, zero_downtime: bool = False
version: MzVersion | None = None,
zero_downtime: bool = False,
force_source_table_syntax: bool = False,
) -> dict[str, str]:
"""For upgrade tests we only want parameters set when all environmentd /
clusterd processes have reached a specific version (or higher)
Expand Down Expand Up @@ -123,6 +125,7 @@ def get_default_system_parameters(
"persist_record_schema_id": (
"true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false"
),
"force_source_table_syntax": "true" if force_source_table_syntax else "false",
"persist_batch_columnar_format": "both_v2",
"persist_batch_columnar_format_percent": "100",
"persist_batch_delete_enabled": "true",
Expand Down
71 changes: 71 additions & 0 deletions misc/python/materialize/source_table_migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

"""Utilities for testing the source table migration"""
from materialize.mz_version import MzVersion
from materialize.mzcompose.composition import Composition


def verify_sources_after_source_table_migration(
c: Composition, file: str, fail: bool = False
) -> None:
source_names_rows = c.sql_query(
"SELECT sm.name || '.' || src.name FROM mz_sources src INNER JOIN mz_schemas sm ON src.schema_id = sm.id WHERE src.id LIKE 'u%';"
)
source_names = [row[0] for row in source_names_rows]

print(f"Sources created in {file} are: {source_names}")

c.sql("SET statement_timeout = '20s'")

for source_name in source_names:
_verify_source(c, file, source_name, fail=fail)


def _verify_source(
c: Composition, file: str, source_name: str, fail: bool = False
) -> None:
try:
print(f"Checking source: {source_name}")

# must not crash
statement = f"SELECT count(*) FROM {source_name};"
print(statement)
c.sql_query(statement)

statement = f"SHOW CREATE SOURCE {source_name};"
print(statement)
result = c.sql_query(statement)
sql = result[0][1]
assert "FOR TABLE" not in sql, f"FOR TABLE found in: {sql}"
assert "FOR ALL TABLES" not in sql, f"FOR ALL TABLES found in: {sql}"

if not source_name.endswith("_progress"):
assert "CREATE SUBSOURCE" not in sql, f"CREATE SUBSOURCE found in: {sql}"

print("OK.")
except Exception as e:
print(f"source-table-migration issue in {file}: {str(e)}")

if fail:
raise e


def check_source_table_migration_test_sensible() -> None:
assert MzVersion.parse_cargo() < MzVersion.parse_mz(
"v0.133.0"
), "migration test probably no longer needed"


def get_old_image_for_source_table_migration_test() -> str:
return "materialize/materialized:v0.125.0"


def get_new_image_for_source_table_migration_test() -> str | None:
return None
Loading
Loading