diff --git a/Dockerfile b/Dockerfile index 75dece9..8380644 100644 --- a/Dockerfile +++ b/Dockerfile @@ -23,7 +23,7 @@ RUN apt update && \ liblz4-1 liblz4-dev libldap2-dev libsasl2-dev libsasl2-modules-gssapi-mit libkrb5-dev wget \ libreadline-dev libudev-dev libev-dev libev4 libprocps-dev vim-common # Download boost and percona-xtrabackup -RUN wget https://boostorg.jfrog.io/artifactory/main/release/1.77.0/source/boost_1_77_0.tar.gz && \ +RUN wget https://archives.boost.io/release/1.77.0/source/boost_1_77_0.tar.gz && \ tar -zxvf boost_1_77_0.tar.gz @@ -83,9 +83,9 @@ COPY --from=builder-percona-server /usr/local/mysql/bin /usr/bin COPY --from=builder-percona-server /usr/local/mysql/lib /usr/lib ADD requirement* /src/ -RUN scripts/install-python-deps RUN sudo scripts/create-user ADD . /src/ +RUN scripts/install-python-deps RUN git config --global --add safe.directory /src RUN python -m pip install -e . diff --git a/myhoard.json b/myhoard.json index 78fae96..a1e1f85 100644 --- a/myhoard.json +++ b/myhoard.json @@ -7,7 +7,11 @@ "backup_interval_minutes": 1440, "backup_minute": 0, "forced_binlog_rotation_interval": 300, - "upload_site": "default" + "upload_site": "default", + "incremental": { + "enabled": false, + "full_backup_week_schedule": "sun,wed" + } }, "backup_sites": { "default": { diff --git a/myhoard/backup_stream.py b/myhoard/backup_stream.py index 0d84c25..9ab5540 100644 --- a/myhoard/backup_stream.py +++ b/myhoard/backup_stream.py @@ -79,6 +79,11 @@ class BaseBackupFailureReason(str, enum.Enum): xtrabackup_error = "xtrabackup_error" +class IncrementalBackupInfo(TypedDict): + last_checkpoint: str | None + required_streams: List[str] | None + + class BackupStream(threading.Thread): """Handles creating a single consistent backup stream. 'stream' here refers to uninterrupted sequence of backup data that can be used to restore the system to a consistent state. It includes the basebackup, @@ -132,6 +137,7 @@ class State(TypedDict): backup_reason: Optional["BackupStream.BackupReason"] created_at: float immediate_scan_required: bool + incremental: bool initial_latest_complete_binlog_index: Optional[int] last_binlog_upload_time: int last_processed_local_index: Optional[int] @@ -181,6 +187,7 @@ def __init__( temp_dir: str, xtrabackup_settings: Optional[Dict[str, int]] = None, split_size: Optional[int] = 0, + incremental_backup_info: IncrementalBackupInfo | None = None, ) -> None: super().__init__() stream_id = stream_id or self.new_stream_id() @@ -195,6 +202,7 @@ def __init__( self.file_storage_setup_fn = file_storage_setup_fn self.file_storage: Optional["BaseTransfer"] = None self.file_uploaded_callback = file_uploaded_callback + self.incremental_backup_info = incremental_backup_info self.is_running = True self.iteration_sleep = BackupStream.ITERATION_SLEEP self.last_basebackup_attempt: Optional[float] = None @@ -236,6 +244,7 @@ def __init__( "backup_reason": backup_reason, "created_at": time.time(), "immediate_scan_required": False, + "incremental": incremental_backup_info is not None, "initial_latest_complete_binlog_index": latest_complete_binlog_index, "last_binlog_upload_time": 0, "last_processed_local_index": None, @@ -1014,6 +1023,9 @@ def _take_basebackup(self) -> None: stats=self.stats, stream_handler=self._basebackup_stream_handler, temp_dir=self.temp_dir, + incremental_since_checkpoint=self.incremental_backup_info.get("last_checkpoint") + if self.incremental_backup_info + else None, ) try: self.basebackup_operation.create_backup() @@ -1070,12 +1082,17 @@ def _take_basebackup(self) -> None: "binlog_name": binlog_info["file_name"] if binlog_info else None, "binlog_position": binlog_info["file_position"] if binlog_info else None, "backup_reason": self.state["backup_reason"], + "checkpoints_file_content": self.basebackup_operation.checkpoints_file_content, "compressed_size": compressed_size, "encryption_key": rsa_encrypt_bytes(self.rsa_public_key_pem, encryption_key).hex(), "end_size": self.basebackup_operation.data_directory_size_end, "end_ts": end_time, "gtid": last_gtid, "gtid_executed": gtid_executed, + "incremental": self.basebackup_operation.incremental_since_checkpoint is not None, + "required_streams": self.incremental_backup_info.get("required_streams") + if self.incremental_backup_info + else None, "initiated_at": self.created_at, "lsn_info": self.basebackup_operation.lsn_info, "normalized_backup_time": self.state["normalized_backup_time"], diff --git a/myhoard/basebackup_operation.py b/myhoard/basebackup_operation.py index 729c606..a0a7233 100644 --- a/myhoard/basebackup_operation.py +++ b/myhoard/basebackup_operation.py @@ -1,10 +1,10 @@ # Copyright (c) 2019 Aiven, Helsinki, Finland. https://aiven.io/ from contextlib import suppress from myhoard.errors import BlockMismatchError, XtraBackupError -from myhoard.util import get_mysql_version, mysql_cursor +from myhoard.util import CHECKPOINT_FILENAME, get_mysql_version, mysql_cursor from packaging.version import Version from rohmu.util import increase_pipe_capacity, set_stream_nonblocking -from typing import Optional +from typing import Any, Dict, Optional import base64 import logging @@ -61,9 +61,11 @@ def __init__( stats, stream_handler, temp_dir, + incremental_since_checkpoint: str | None = None, ): self.abort_reason = None - self.binlog_info = None + self.binlog_info: Dict[str, Any] | None = None + self.checkpoints_file_content: str | None = None self.copy_threads = copy_threads self.compress_threads = compress_threads self.current_file = None @@ -75,7 +77,9 @@ def __init__( self.encryption_key = encryption_key self.has_block_mismatch = False self.log = logging.getLogger(self.__class__.__name__) - self.lsn_dir = None + self.incremental_since_checkpoint = incremental_since_checkpoint + self.prev_checkpoint_dir = None + self.lsn_dir: str | None = None self.lsn_info = None self.mysql_client_params = mysql_client_params with open(mysql_config_file_name, "r") as config: @@ -146,6 +150,12 @@ def create_backup(self): if self.register_redo_log_consumer: command_line.append("--register-redo-log-consumer") + if self.incremental_since_checkpoint: + self.prev_checkpoint_dir = tempfile.mkdtemp(dir=self.temp_dir_base, prefix="xtrabackupcheckpoint") + with open(os.path.join(self.prev_checkpoint_dir, CHECKPOINT_FILENAME), "w") as checkpoint_file: + checkpoint_file.write(self.incremental_since_checkpoint) + command_line.extend(["--incremental-basedir", self.prev_checkpoint_dir]) + with self.stats.timing_manager("myhoard.basebackup.xtrabackup_backup"): with subprocess.Popen( command_line, bufsize=0, stdout=subprocess.PIPE, stderr=subprocess.PIPE @@ -262,6 +272,7 @@ def _process_input_output(self): reader_thread = None if exit_code == 0: + self._save_checkpoints_file() self._process_binlog_info() except AbortRequested as ex: @@ -285,10 +296,9 @@ def _process_input_output(self): if reader_thread: reader_thread.join() self.log.info("Thread joined") - if self.lsn_dir: - shutil.rmtree(self.lsn_dir) - if self.temp_dir: - shutil.rmtree(self.temp_dir) + for d in [self.lsn_dir, self.temp_dir, self.prev_checkpoint_dir]: + if d: + shutil.rmtree(d) self.proc = None if exit_code != 0: @@ -354,6 +364,12 @@ def _process_output_line_file_finished(self, line): self.current_file = None return True + def _save_checkpoints_file(self) -> None: + assert self.lsn_dir + + with open(os.path.join(self.lsn_dir, CHECKPOINT_FILENAME)) as checkpoints_file: + self.checkpoints_file_content = checkpoints_file.read() + def _process_binlog_info(self) -> None: assert self.lsn_dir diff --git a/myhoard/basebackup_restore_operation.py b/myhoard/basebackup_restore_operation.py index 37ba721..6eee786 100644 --- a/myhoard/basebackup_restore_operation.py +++ b/myhoard/basebackup_restore_operation.py @@ -1,9 +1,9 @@ # Copyright (c) 2019 Aiven, Helsinki, Finland. https://aiven.io/ from .errors import DiskFullError -from .util import get_xtrabackup_version, parse_version, parse_xtrabackup_info +from .util import CHECKPOINT_FILENAME, get_xtrabackup_version, parse_version, parse_xtrabackup_info from contextlib import suppress from rohmu.util import increase_pipe_capacity, set_stream_nonblocking -from typing import Final, Optional, Tuple +from typing import Dict, Final, Optional, Tuple import base64 import fnmatch @@ -38,7 +38,8 @@ def __init__( mysql_data_directory, stats, stream_handler, - temp_dir, + target_dir: str, + temp_dir: str, ): self.current_file = None self.data_directory_size_end = None @@ -51,34 +52,36 @@ def __init__( self.mysql_data_directory = mysql_data_directory self.number_of_files = 0 self.prepared_lsn = None - self.proc = None + self.proc: subprocess.Popen[bytes] | None = None self.stats = stats self.stream_handler = stream_handler - self.temp_dir = None - self.temp_dir_base = temp_dir - self.backup_xtrabackup_info = None - - def restore_backup(self): - if os.path.exists(self.mysql_data_directory): - raise ValueError(f"MySQL data directory {self.mysql_data_directory!r} already exists") + self.target_dir = target_dir + self.temp_dir = temp_dir + self.backup_xtrabackup_info: Dict[str, str] | None = None + def prepare_backup( + self, incremental: bool = False, apply_log_only: bool = False, checkpoints_file_content: str | None = None + ): # Write encryption key to file to avoid having it on command line. NamedTemporaryFile has mode 0600 - with tempfile.NamedTemporaryFile() as encryption_key_file: - encryption_key_file.write(base64.b64encode(self.encryption_key)) - encryption_key_file.flush() - cpu_count = multiprocessing.cpu_count() + incremental_dir = None + try: + if incremental: + incremental_dir = tempfile.mkdtemp(dir=self.temp_dir, prefix="myhoard_inc_") + + with tempfile.NamedTemporaryFile() as encryption_key_file: + encryption_key_file.write(base64.b64encode(self.encryption_key)) + encryption_key_file.flush() - self.temp_dir = tempfile.mkdtemp(dir=self.temp_dir_base) + cpu_count = multiprocessing.cpu_count() - try: command_line = [ "xbstream", # TODO: Check if it made sense to restore directly to MySQL data directory so that # we could skip the move phase. It's not clear if move does anything worthwhile # except skips a few extra files, which could instead be deleted explicitly "--directory", - self.temp_dir, + self.target_dir if not incremental_dir else incremental_dir, "--extract", "--decompress", "--decompress-threads", @@ -101,8 +104,9 @@ def restore_backup(self): self.proc = xbstream self._process_xbstream_input_output() - self.data_directory_size_start = self._get_directory_size(self.temp_dir) - xtrabackup_info_path = os.path.join(self.temp_dir, "xtrabackup_info") + self.data_directory_size_start = self._get_directory_size(self.target_dir) + + xtrabackup_info_path = os.path.join(self.target_dir, "xtrabackup_info") if os.path.exists(xtrabackup_info_path): with open(xtrabackup_info_path) as fh: xtrabackup_info_text = fh.read() @@ -122,8 +126,20 @@ def restore_backup(self): "--no-version-check", "--prepare", "--target-dir", - self.temp_dir, + self.target_dir, ] + + if apply_log_only: + # This is needed to prepare all the backups preceding the one to be restored in case of incremental + command_line.append("--apply-log-only") + if incremental_dir: + command_line.extend(["--incremental-dir", incremental_dir]) + + if checkpoints_file_content: + checkpoints_file_dir = incremental_dir if incremental_dir else self.target_dir + with open(os.path.join(checkpoints_file_dir, CHECKPOINT_FILENAME), "w") as checkpoints_file: + checkpoints_file.write(checkpoints_file_content) + # --use-free-memory-pct introduced in 8.0.30, but it doesn't work in 8.0.30 and leads to PBX crash if self.free_memory_percentage is not None and get_xtrabackup_version() >= (8, 0, 32): command_line.insert(2, f"--use-free-memory-pct={self.free_memory_percentage}") @@ -133,42 +149,42 @@ def restore_backup(self): ) as prepare: self.proc = prepare self._process_prepare_input_output() + finally: + if incremental_dir: + shutil.rmtree(incremental_dir) - # As of Percona XtraBackup 8.0.5 the backup contains binlog.index and one binlog that --move-back - # tries to restore to appropriate location, presumable with the intent of making MySQL patch gtid_executed - # table based on what's in the log. We already have logic for patching the table and we don't need - # this logic from XtraBackup. The logic is also flawed as it tries to restore to location that existed - # on the source server, which may not be valid for destination server. Just delete the files to disable - # that restoration logic. - binlog_index = os.path.join(self.temp_dir, "binlog.index") - if os.path.exists(binlog_index): - with open(binlog_index) as f: - binlogs = f.read().split("\n") - binlogs = [binlog.rsplit("/", 1)[-1] for binlog in binlogs if binlog.strip()] - self.log.info("Deleting redundant binlog index %r and binlogs %r before move", binlog_index, binlogs) - os.remove(binlog_index) - for binlog_name in binlogs: - binlog_name = os.path.join(self.temp_dir, binlog_name) - if os.path.exists(binlog_name): - os.remove(binlog_name) - - command_line = [ - "xtrabackup", - # defaults file must be given with --defaults-file=foo syntax, space here does not work - f"--defaults-file={self.mysql_config_file_name}", - "--move-back", - "--no-version-check", - "--target-dir", - self.temp_dir, - ] - with self.stats.timing_manager("myhoard.basebackup_restore.xtrabackup_move"): - with subprocess.Popen( - command_line, bufsize=0, stdout=subprocess.PIPE, stderr=subprocess.PIPE - ) as move_back: - self.proc = move_back - self._process_move_input_output() - finally: - shutil.rmtree(self.temp_dir) + def restore_backup(self): + # As of Percona XtraBackup 8.0.5 the backup contains binlog.index and one binlog that --move-back + # tries to restore to appropriate location, presumable with the intent of making MySQL patch gtid_executed + # table based on what's in the log. We already have logic for patching the table and we don't need + # this logic from XtraBackup. The logic is also flawed as it tries to restore to location that existed + # on the source server, which may not be valid for destination server. Just delete the files to disable + # that restoration logic. + binlog_index = os.path.join(self.target_dir, "binlog.index") + if os.path.exists(binlog_index): + with open(binlog_index) as f: + binlogs = f.read().split("\n") + binlogs = [binlog.rsplit("/", 1)[-1] for binlog in binlogs if binlog.strip()] + self.log.info("Deleting redundant binlog index %r and binlogs %r before move", binlog_index, binlogs) + os.remove(binlog_index) + for binlog_name in binlogs: + binlog_name = os.path.join(self.target_dir, binlog_name) + if os.path.exists(binlog_name): + os.remove(binlog_name) + + command_line = [ + "xtrabackup", + # defaults file must be given with --defaults-file=foo syntax, space here does not work + f"--defaults-file={self.mysql_config_file_name}", + "--move-back", + "--no-version-check", + "--target-dir", + self.target_dir, + ] + with self.stats.timing_manager("myhoard.basebackup_restore.xtrabackup_move"): + with subprocess.Popen(command_line, bufsize=0, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as move_back: + self.proc = move_back + self._process_move_input_output() self.data_directory_size_end = self._get_directory_size(self.mysql_data_directory, cleanup=True) diff --git a/myhoard/controller.py b/myhoard/controller.py index f143ef0..ded5de3 100644 --- a/myhoard/controller.py +++ b/myhoard/controller.py @@ -1,5 +1,5 @@ # Copyright (c) 2019 Aiven, Helsinki, Finland. https://aiven.io/ -from .backup_stream import BackupStream, RemoteBinlogInfo +from .backup_stream import BackupStream, IncrementalBackupInfo, RemoteBinlogInfo from .binlog_scanner import BinlogScanner from .errors import BadRequest, UnknownBackupSite from .restore_coordinator import BinlogStream, RestoreCoordinator @@ -14,6 +14,7 @@ GtidRangeDict, make_gtid_range_string, mysql_cursor, + parse_dow_schedule, parse_fs_metadata, RateTracker, relay_log_name, @@ -48,6 +49,8 @@ class BaseBackup(TypedDict): end_ts: float + checkpoints_file_content: str | None + incremental: bool class Backup(TypedDict): @@ -65,6 +68,7 @@ class Backup(TypedDict): class BackupRequest(TypedDict): backup_reason: BackupStream.BackupReason normalized_backup_time: str + incremental_backup_info: IncrementalBackupInfo | None class BackupSiteInfo(TypedDict): @@ -84,12 +88,12 @@ class RestoreOptions(TypedDict): target_time_approximate_ok: bool -def sort_completed_backups(backups: List[Backup]) -> List[Backup]: +def sort_completed_backups(backups: List[Backup], reverse: bool = False) -> List[Backup]: def key(backup): assert backup["completed_at"] is not None return backup["completed_at"] - return sorted((backup for backup in backups if backup["completed_at"]), key=key) + return sorted((backup for backup in backups if backup["completed_at"]), key=key, reverse=reverse) class Controller(threading.Thread): @@ -253,10 +257,18 @@ def is_safe_to_reload(self) -> bool: return True def mark_backup_requested( - self, *, backup_reason: BackupStream.BackupReason, normalized_backup_time: Optional[str] = None + self, + *, + backup_reason: BackupStream.BackupReason, + normalized_backup_time: str | None = None, + incremental_backup_info: IncrementalBackupInfo | None = None, ) -> None: backup_time: str = normalized_backup_time or self._current_normalized_backup_timestamp() - new_request: BackupRequest = {"backup_reason": backup_reason, "normalized_backup_time": backup_time} + new_request: BackupRequest = { + "backup_reason": backup_reason, + "normalized_backup_time": backup_time, + "incremental_backup_info": incremental_backup_info, + } with self.lock: if self.state["backup_request"]: old_request: BackupRequest = self.state["backup_request"] @@ -854,6 +866,7 @@ def _create_new_backup_stream_if_requested_and_max_streams_not_exceeded(self): self._start_new_backup( backup_reason=request["backup_reason"], # pylint: disable=unsubscriptable-object normalized_backup_time=request["normalized_backup_time"], # pylint: disable=unsubscriptable-object + incremental_backup_info=request["incremental_backup_info"], # pylint: disable=unsubscriptable-object ) def _create_restore_coordinator_if_missing(self): @@ -864,6 +877,7 @@ def _create_restore_coordinator_if_missing(self): backup_site = self._lookup_backup_site(options["site"]) storage_config = backup_site["object_storage"] self.log.info("Creating new restore coordinator") + # TODO site change triggers full backup self.restore_coordinator = RestoreCoordinator( binlog_streams=options["binlog_streams"], file_storage_config=storage_config, @@ -1248,6 +1262,54 @@ def _mark_failed_restore_backup_as_broken(self) -> None: self._build_backup_stream(broken_backup).mark_as_broken() + def _should_schedule_incremental_backup(self) -> bool: + incremental_settings = self.backup_settings.get("incremental", {}) + if not incremental_settings.get("enabled", False): + self.log.info("Incremental backup is disabled in configuration") + return False + + dow_idx = datetime.datetime.now(datetime.timezone.utc).weekday() + dow_schedule = incremental_settings.get("full_backup_week_schedule") + if dow_idx not in parse_dow_schedule(dow_schedule): + self.log.info( + "According to `full_backup_week_schedule` incremental backup should be scheduled (day: %r)", dow_idx + ) + return True + + self.log.info("According to `full_backup_week_schedule` full backup should be scheduled (day: %r)", dow_idx) + return False + + def get_incremental_backup_info(self) -> IncrementalBackupInfo | None: + """ + Incremental backup is possible when all the backups starting from the recent full backup have checkpoints stored + """ + self.log.warning("Determining if incremental backup is possible") + + backups = sort_completed_backups(self.state["backups"], reverse=True) + required_streams: List[str] = [] + prev_backup = None + for backup in backups: + required_streams.append(backup["stream_id"]) + if not prev_backup: + prev_backup = backup + if backup.get("broken_at"): + self.log.warning("Incremental backup is not possible - found broken backup %r", backup["stream_id"]) + return None + if not backup["basebackup_info"].get("checkpoints_file_content"): + self.log.warning( + "Incremental backup is not possible - backup %r does not have a checkpoint", backup["stream_id"] + ) + return None + if not backup["basebackup_info"].get("incremental"): + self.log.info("Incremental backup requirements are met.") + return { + "last_checkpoint": prev_backup["basebackup_info"]["checkpoints_file_content"], + "required_streams": list(reversed(required_streams)), + } + + self.log.warning("Incremental is not possible - no previous backups found") + return None + def _mark_periodic_backup_requested_if_interval_exceeded(self): normalized_backup_time = self._current_normalized_backup_timestamp() last_normalized_backup_time = self._previous_normalized_backup_timestamp() @@ -1273,13 +1335,24 @@ def _mark_periodic_backup_requested_if_interval_exceeded(self): ) and (not most_recent_scheduled or time.time() - most_recent_scheduled >= half_backup_interval_s) ): + incremental_backup_info = None + if self._should_schedule_incremental_backup(): + incremental_backup_info = self.get_incremental_backup_info() + if not incremental_backup_info: + self.log.warning( + "Incremental backup is configured but not possible to take, proceeding with full backup" + ) + self.log.info( - "New normalized time %r differs from previous %r, adding new backup request", + "New normalized time %r differs from previous %r, adding new backup request (incremental_backup_info: %r)", normalized_backup_time, last_normalized_backup_time, + incremental_backup_info, ) self.mark_backup_requested( - backup_reason=BackupStream.BackupReason.scheduled, normalized_backup_time=normalized_backup_time + backup_reason=BackupStream.BackupReason.scheduled, + normalized_backup_time=normalized_backup_time, + incremental_backup_info=incremental_backup_info, ) def _prepare_streams_for_promotion(self): @@ -1310,7 +1383,7 @@ def _process_removed_binlogs(self, binlogs): for stream in self.backup_streams: stream.remove_binlogs(binlogs) - def _purge_old_backups(self): + def _purge_old_backups(self): # # pylint: disable=too-many-return-statements purgeable = [backup for backup in self.state["backups"] if backup["completed_at"] and not backup["recovery_site"]] broken_backups_count = sum(backup["broken_at"] is not None for backup in purgeable) # do not consider broken backups for the count, they will still be purged @@ -1328,21 +1401,43 @@ def _purge_old_backups(self): if non_broken_backups_count <= self.backup_settings["backup_count_min"]: return + # TODO mark broken all incremental backups up until the full one + # TODO Create a full backup if the previous one is broken + # For simplicity only ever drop one backup here. This function # is called repeatedly so if there are for any reason more backups # to drop they will be dropped soon enough purgeable = sort_completed_backups(purgeable) backup = purgeable[0] + def is_backup_incremental(b: Dict[str, Any]) -> bool: + return b["basebackup_info"].get("incremental", False) + + # If this is the only full backup, we can't purge it, as following incremental backups depend on this one + if not is_backup_incremental(backup) and all(is_backup_incremental(p) for p in purgeable[1:]): + self.log.info( + "Not purging old backup %r, because following incremental backups depend on it", backup["stream_id"] + ) + return + if not backup["closed_at"]: return - # do not purge backup if its preserved + # do not purge backup if this one or any dependant incremental backup is preserved preserve_until = backup["preserve_until"] + if not is_backup_incremental(backup): + for p in purgeable[1:]: + if not is_backup_incremental(p): + break + preserve_until = max(preserve_until, p["preserve_until"]) + if preserve_until and datetime.datetime.now(datetime.timezone.utc) < datetime.datetime.fromisoformat(preserve_until): return - if time.time() > backup["closed_at"] + self.backup_settings["backup_age_days_max"] * 24 * 60 * 60: + # If the backup is incremental, we should drop it because there is no leading full backup anyway + if is_backup_incremental(backup): + self.log.info("Backup %r is incremental, dropping it due to full backup being dropped", backup["stream_id"]) + elif time.time() > backup["closed_at"] + self.backup_settings["backup_age_days_max"] * 24 * 60 * 60: self.log.info("Backup %r is older than max backup age, dropping it", backup["stream_id"]) elif non_broken_backups_count > self.backup_settings["backup_count_max"]: self.log.info( @@ -1725,7 +1820,13 @@ def _should_purge_binlogs(self, *, backup_streams, binlogs, purge_settings, repl return True - def _start_new_backup(self, *, backup_reason: BackupStream.BackupReason, normalized_backup_time: str) -> None: + def _start_new_backup( + self, + *, + backup_reason: BackupStream.BackupReason, + normalized_backup_time: str, + incremental_backup_info: IncrementalBackupInfo | None = None, + ) -> None: stream_id = BackupStream.new_stream_id() site_id, backup_site = self._get_upload_backup_site() stream = BackupStream( @@ -1752,6 +1853,7 @@ def _start_new_backup(self, *, backup_reason: BackupStream.BackupReason, normali temp_dir=self.temp_dir, xtrabackup_settings=self.xtrabackup_settings, split_size=backup_site.get("split_size", 0), + incremental_backup_info=incremental_backup_info, ) self.backup_streams.append(stream) self.state_manager.update_state( diff --git a/myhoard/myhoard.py b/myhoard/myhoard.py index f7f626b..cb2aef5 100644 --- a/myhoard/myhoard.py +++ b/myhoard/myhoard.py @@ -2,7 +2,7 @@ from myhoard import version from myhoard.controller import Controller from myhoard.statsd import StatsClient -from myhoard.util import DEFAULT_XTRABACKUP_SETTINGS, detect_running_process_id, wait_for_port +from myhoard.util import DEFAULT_XTRABACKUP_SETTINGS, detect_running_process_id, parse_dow_schedule, wait_for_port from myhoard.web_server import WebServer import argparse @@ -98,6 +98,13 @@ def _load_configuration(self): if (ival > 1440 and ival // 1440 * 1440 != ival) or (ival < 1440 and 1440 // ival * ival != 1440): raise Exception("Backup interval must be 1440, multiple of 1440, or integer divisor of 1440") + incremental = backup_settings.get("incremental", {}) + if incremental and incremental.get("enabled", False): + dow_schedule = incremental.get("full_backup_week_schedule") + if not dow_schedule: + raise ValueError("Incremental backups require `full_backup_week_schedule`") + parse_dow_schedule(dow_schedule) + if self.config["http_address"] not in {"127.0.0.1", "::1", "localhost"}: self.log.warning("Binding to non-localhost address %r is highly discouraged", self.config["http_address"]) diff --git a/myhoard/restore_coordinator.py b/myhoard/restore_coordinator.py index a352101..e7b29fd 100644 --- a/myhoard/restore_coordinator.py +++ b/myhoard/restore_coordinator.py @@ -31,6 +31,7 @@ track_rate, ) from contextlib import suppress +from functools import partial from pymysql import OperationalError from rohmu import errors as rohmu_errors from rohmu.transfer_pool import TransferPool @@ -44,6 +45,7 @@ import os import pymysql import queue +import tempfile import threading import time @@ -147,6 +149,8 @@ class State(TypedDict): promotions: List remote_read_errors: int restore_errors: int + required_backups: List[Dict] + required_backups_restored: int server_uuid: Optional[str] target_time_reached: bool write_relay_log_manually: bool @@ -258,6 +262,8 @@ def __init__( "prefetched_binlogs": {}, "promotions": [], "remote_read_errors": 0, + "required_backups": [], + "required_backups_restored": 0, "restore_errors": 0, "server_uuid": None, "target_time_reached": False, @@ -409,8 +415,20 @@ def get_backup_info(self) -> None: basebackup_info = self._load_file_data("basebackup.json") if not basebackup_info: return + + required_backups = [] + if basebackup_info["incremental"]: + # We need to load all backups infos until the latest full one + for stream_id in basebackup_info["required_streams"]: + info = self._load_file_data("basebackup.json", stream_id=stream_id) + if not info: + self.log.error("Required backup %r is not complete, cannot restore", stream_id) + return + required_backups.append((stream_id, info)) + self.update_state( basebackup_info=basebackup_info, + required_backups=required_backups, phase=self.Phase.initiating_binlog_downloads, ) @@ -418,52 +436,120 @@ def initiate_binlog_downloads(self) -> None: self._fetch_more_binlog_infos() self.update_state(phase=self.Phase.restoring_basebackup) - def restore_basebackup(self) -> None: - start_time = time.monotonic() - encryption_key = rsa_decrypt_bytes( - self.rsa_private_key_pem, bytes.fromhex(self.state["basebackup_info"]["encryption_key"]) - ) + def _run_basebackup_restore_operation( + self, + backup_info: Dict[str, Any], + target_dir: str, + temp_dir: str, + stream_id: str, + prepare_only: bool = False, + apply_log_only: bool = False, + incremental: bool = False, + ) -> None: + encryption_key = rsa_decrypt_bytes(self.rsa_private_key_pem, bytes.fromhex(backup_info["encryption_key"])) self.basebackup_restore_operation = BasebackupRestoreOperation( encryption_algorithm="AES256", encryption_key=encryption_key, mysql_config_file_name=self.mysql_config_file_name, mysql_data_directory=self.mysql_data_directory, stats=self.stats, - stream_handler=self._basebackup_data_provider, - temp_dir=self.temp_dir, + stream_handler=partial(self._basebackup_data_provider, stream_id=stream_id), + target_dir=target_dir, + temp_dir=temp_dir, free_memory_percentage=self.free_memory_percentage, ) try: - try: - self.basebackup_restore_operation.restore_backup() - except DiskFullError: - self.stats.increase("myhoard.disk_full_errors") - self.update_state(phase=self.Phase.failed) - raise - duration = time.monotonic() - start_time - self.log.info("Basebackup restored in %.2f seconds", duration) - next_phase = self.Phase.rebuilding_tables if self.should_rebuild_tables else self.Phase.refreshing_binlogs - self.update_state( - phase=next_phase, - basebackup_restore_duration=duration, - last_rebuilt_table=None, + self.basebackup_restore_operation.prepare_backup( + apply_log_only=apply_log_only, + incremental=incremental, + checkpoints_file_content=backup_info.get("checkpoints_file_content"), ) - except Exception as ex: # pylint: disable=broad-except - self.log.exception("Failed to restore basebackup: %r", ex) - self.state_manager.increment_counter(name="basebackup_restore_errors") - self.state_manager.increment_counter(name="restore_errors") - self.stats.increase("myhoard.restore_errors", tags={"ex": ex.__class__.__name__}) - if self.state["basebackup_restore_errors"] >= self.MAX_BASEBACKUP_ERRORS: - self.log.error( - "Restoring basebackup failed %s times, assuming the backup is broken", self.MAX_BASEBACKUP_ERRORS + if not prepare_only: + self.basebackup_restore_operation.restore_backup() + except DiskFullError: + self.stats.increase("myhoard.disk_full_errors") + self.update_state(phase=self.Phase.failed) + raise + + def restore_basebackup(self) -> None: + if os.path.exists(self.mysql_data_directory): + raise ValueError(f"MySQL data directory {self.mysql_data_directory!r} already exists") + + start_time = time.monotonic() + restore_basebackup_info = self.state["basebackup_info"] + required_backups = self.state.get("required_backups", []) + + with tempfile.TemporaryDirectory(dir=self.temp_dir, prefix="myhoard_target_") as temp_target_dir: + try: + if required_backups: + self.log.info( + "Restoring previous basebackups required by currently restored backup: %r", + restore_basebackup_info["required_streams"], + ) + for idx, required_backup in enumerate(required_backups, 1): + stream_id, required_basebackup_info = required_backup + current_start_time = time.monotonic() + self.log.info( + "Preparing required backup (incremental: %r): %r (%r/%r)", + required_basebackup_info["incremental"], + stream_id, + idx, + len(required_backups), + ) + assert required_basebackup_info["incremental"] == (idx != 1) + self._run_basebackup_restore_operation( + required_basebackup_info, + target_dir=temp_target_dir, + temp_dir=self.temp_dir, + stream_id=stream_id, + prepare_only=True, + apply_log_only=True, + incremental=(idx != 1), # Only the first one should be full + ) + self.update_state(required_backups_restored=idx) + current_duration = time.monotonic() - current_start_time + self.log.info( + "Basebackup %r/%r prepared in %.2f seconds", idx, len(required_backups), current_duration + ) + + self.log.info("Preparing and restoring backup: %r", self.stream_id) + self._run_basebackup_restore_operation( + restore_basebackup_info, + stream_id=self.stream_id, + target_dir=temp_target_dir, + prepare_only=False, + apply_log_only=False, + temp_dir=self.temp_dir, + incremental=restore_basebackup_info.get("incremental", False), ) - self.update_state(phase=self.Phase.failed_basebackup) - self.stats.increase("myhoard.basebackup_broken") - finally: - self.update_state( - backup_xtrabackup_version=self.basebackup_restore_operation.backup_xtrabackup_version, - ) - self.basebackup_restore_operation = None + + duration = time.monotonic() - start_time + self.log.info("Basebackup fully restored in %.2f seconds", duration) + + next_phase = self.Phase.rebuilding_tables if self.should_rebuild_tables else self.Phase.refreshing_binlogs + self.update_state( + phase=next_phase, + basebackup_restore_duration=duration, + last_rebuilt_table=None, + ) + + except Exception as ex: # pylint: disable=broad-except + self.log.exception("Failed to restore basebackup: %r", ex) + self.state_manager.increment_counter(name="basebackup_restore_errors") + self.state_manager.increment_counter(name="restore_errors") + self.stats.increase("myhoard.restore_errors", tags={"ex": ex.__class__.__name__}) + if self.state["basebackup_restore_errors"] >= self.MAX_BASEBACKUP_ERRORS: + self.log.error( + "Restoring basebackup failed %s times, assuming the backup is broken", self.MAX_BASEBACKUP_ERRORS + ) + self.update_state(phase=self.Phase.failed_basebackup) + self.stats.increase("myhoard.basebackup_broken") + finally: + if self.basebackup_restore_operation: + self.update_state( + backup_xtrabackup_version=self.basebackup_restore_operation.backup_xtrabackup_version, + ) + self.basebackup_restore_operation = None def rebuild_tables(self) -> None: excluded_tables = { @@ -853,13 +939,13 @@ def _build_binlog_full_name(self, name: str) -> str: stream_id = binlog_stream["stream_id"] return f"{site}/{stream_id}/{name}" - def _build_full_name(self, name: str) -> str: - return f"{self.site}/{self.stream_id}/{name}" + def _build_full_name(self, name: str, stream_id: str | None = None) -> str: + return f"{self.site}/{stream_id or self.stream_id}/{name}" - def _load_file_data(self, name, missing_ok=False): + def _load_file_data(self, name, missing_ok=False, stream_id: str | None = None): try: with self.file_storage_pool.with_transfer(self.file_storage_config) as file_storage: - info_str, _ = file_storage.get_contents_to_string(self._build_full_name(name)) + info_str, _ = file_storage.get_contents_to_string(self._build_full_name(name, stream_id)) return json.loads(info_str) except rohmu_errors.FileNotFoundFromStorageError as ex: if not missing_ok: @@ -874,7 +960,7 @@ def _load_file_data(self, name, missing_ok=False): self.stats.increase("myhoard.remote_read_errors") return None - def _basebackup_data_provider(self, target_stream) -> None: + def _basebackup_data_provider(self, target_stream, stream_id: str | None = None) -> None: compressed_size = self.state["basebackup_info"].get("compressed_size") with self.file_storage_pool.with_transfer(self.file_storage_config) as file_storage: last_time = [time.monotonic()] @@ -903,7 +989,7 @@ def download_progress(progress, max_progress): ) num_splits = self.state["basebackup_info"].get("number_of_splits", 1) - name = self._build_full_name("basebackup.xbstream") + name = self._build_full_name("basebackup.xbstream", stream_id=stream_id) current_split = 0 while num_splits > 0: num_splits -= 1 diff --git a/myhoard/util.py b/myhoard/util.py index 8f7a1dd..3893f0a 100644 --- a/myhoard/util.py +++ b/myhoard/util.py @@ -39,6 +39,9 @@ "register_redo_log_consumer": False, } +DOW_TO_IDX = {"mon": 0, "tue": 1, "wed": 2, "thu": 3, "fri": 4, "sat": 5, "sun": 6} +CHECKPOINT_FILENAME = "xtrabackup_checkpoints" + GtidRangeTuple = tuple[int, int, str, int, int] @@ -56,6 +59,16 @@ class GtidRangeDict(TypedDict): GtidExecuted = Dict[str, List[List[int]]] +def parse_dow_schedule(dow_schedule: str) -> set[int]: + impossible_idx = 7 # mon..sun=0..6 + res = set(DOW_TO_IDX.get(dow.lower().strip(), impossible_idx) for dow in dow_schedule.split(",")) + if impossible_idx in res: + raise ValueError( + f"`full_backup_week_schedule` must be a comma-separated list consisting of {', '.join(DOW_TO_IDX.keys())}" + ) + return res + + @contextlib.contextmanager def atomic_create_file( file_path, diff --git a/myhoard/web_server.py b/myhoard/web_server.py index 84d143e..9c383b7 100644 --- a/myhoard/web_server.py +++ b/myhoard/web_server.py @@ -3,7 +3,7 @@ from aiohttp.web_response import json_response from datetime import datetime, timezone from myhoard.backup_stream import BackupStream -from myhoard.controller import Controller +from myhoard.controller import Controller, IncrementalBackupInfo from myhoard.errors import BadRequest import asyncio @@ -42,12 +42,20 @@ async def backup_create(self, request): body = await self._get_request_json(request) log_index = None backup_type = body.get("backup_type") + incremental = body.get("incremental", False) wait_for_upload = body.get("wait_for_upload") with self.controller.lock: if backup_type == self.BackupType.basebackup: if wait_for_upload: raise BadRequest("wait_for_upload currently not supported for basebackup") - self.controller.mark_backup_requested(backup_reason=BackupStream.BackupReason.requested) + incremental_backup_info: IncrementalBackupInfo | None = None + if incremental: + incremental_backup_info = self.controller.get_incremental_backup_info() + if not incremental_backup_info: + raise BadRequest("Can't schedule incremental checkpoint (missing checkpoints)") + self.controller.mark_backup_requested( + backup_reason=BackupStream.BackupReason.requested, incremental_backup_info=incremental_backup_info + ) elif backup_type == self.BackupType.binlog: log_index = self.controller.rotate_and_back_up_binlog() else: @@ -71,12 +79,19 @@ async def backup_create(self, request): async def backup_list(self, _request): with self._handle_request(name="backup_list"): + order = _request.rel_url.query.get("order") response = { "backups": None, } with self.controller.lock: if self.controller.state["backups_fetched_at"]: - response["backups"] = self.controller.state["backups"] + if order is None: + response["backups"] = self.controller.state["backups"] + else: + response["backups"] = sorted( + self.controller.state["backups"], key=lambda b: b["stream_id"], reverse=order.lower() != "asc" + ) + return json_response(response) async def backup_preserve(self, request): diff --git a/test/conftest.py b/test/conftest.py index d219714..48fcb8e 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -334,6 +334,7 @@ def fixture_myhoard_config(default_backup_site, mysql_master, session_tmpdir): "backup_minute": 0, "forced_binlog_rotation_interval": 300, "upload_site": "default", + "incremental": {"enabled": False, "full_backup_week_schedule": "sun,wed"}, }, "backup_sites": { "default": default_backup_site, diff --git a/test/helpers/incremental.py b/test/helpers/incremental.py new file mode 100644 index 0000000..e99b77b --- /dev/null +++ b/test/helpers/incremental.py @@ -0,0 +1,11 @@ +# Copyright (c) 2025 Aiven, Helsinki, Finland. https://aiven.io/ +from typing import Dict + + +def parse_checkpoint_file_content(checkpoint_file_content) -> Dict[str, str]: + res = {} + for line in checkpoint_file_content.splitlines(): + var, val = line.split("=") + res[var.strip()] = val.strip() + + return res diff --git a/test/test_basebackup_operation.py b/test/test_basebackup_operation.py index 51c4258..a69a6d8 100644 --- a/test/test_basebackup_operation.py +++ b/test/test_basebackup_operation.py @@ -1,5 +1,6 @@ # Copyright (c) 2019 Aiven, Helsinki, Finland. https://aiven.io/ from . import build_statsd_client, MySQLConfig, restart_mysql +from .helpers.incremental import parse_checkpoint_file_content from myhoard.basebackup_operation import BasebackupOperation from packaging.version import Version from typing import IO @@ -95,6 +96,30 @@ def stream_handler(stream): # Even almost empty backup is at least 1.5 megs due to standard files that are always included assert bytes_read[0] > 1.5 * 1024 * 1024 + # Now add incremental backup + assert op.checkpoints_file_content is not None + inc_op = BasebackupOperation( + encryption_algorithm="AES256", + encryption_key=encryption_key, + mysql_client_params=mysql_master.connect_options, + mysql_config_file_name=mysql_master.config_name, + mysql_data_directory=mysql_master.config_options.datadir, + progress_callback=progress_callback, + stats=build_statsd_client(), + stream_handler=stream_handler, + temp_dir=mysql_master.base_dir, + incremental_since_checkpoint=op.checkpoints_file_content, + ) + inc_op.create_backup() + assert inc_op.checkpoints_file_content is not None + + full_backup_checkpoint = parse_checkpoint_file_content(op.checkpoints_file_content) + inc_backup_checkpoint = parse_checkpoint_file_content(inc_op.checkpoints_file_content) + assert full_backup_checkpoint["backup_type"] == "full-backuped" + assert inc_backup_checkpoint["backup_type"] == "incremental" + assert full_backup_checkpoint["from_lsn"] == "0" + assert full_backup_checkpoint["to_lsn"] == inc_backup_checkpoint["from_lsn"] + def test_stream_handler_error_is_propagated(mysql_master): def stream_handler(_stream): diff --git a/test/test_basebackup_restore_operation.py b/test/test_basebackup_restore_operation.py index 1357d04..410def3 100644 --- a/test/test_basebackup_restore_operation.py +++ b/test/test_basebackup_restore_operation.py @@ -49,17 +49,20 @@ def input_stream_handler(stream): shutil.copyfileobj(backup_file, stream) stream.close() - restore_op = BasebackupRestoreOperation( - encryption_algorithm="AES256", - encryption_key=encryption_key, - free_memory_percentage=80, - mysql_config_file_name=mysql_empty.config_name, - mysql_data_directory=mysql_empty.config_options.datadir, - stats=build_statsd_client(), - stream_handler=input_stream_handler, - temp_dir=mysql_empty.base_dir, - ) - restore_op.restore_backup() + with tempfile.TemporaryDirectory(dir=mysql_empty.base_dir, prefix="myhoard_target_") as temp_target_dir: + restore_op = BasebackupRestoreOperation( + encryption_algorithm="AES256", + encryption_key=encryption_key, + free_memory_percentage=80, + mysql_config_file_name=mysql_empty.config_name, + mysql_data_directory=mysql_empty.config_options.datadir, + stats=build_statsd_client(), + stream_handler=input_stream_handler, + target_dir=temp_target_dir, + temp_dir=mysql_empty.base_dir, + ) + restore_op.prepare_backup() + restore_op.restore_backup() assert restore_op.number_of_files >= backup_op.number_of_files diff --git a/test/test_controller.py b/test/test_controller.py index 9cfc759..d16fd3e 100644 --- a/test/test_controller.py +++ b/test/test_controller.py @@ -1812,7 +1812,11 @@ def remove_backup(backup_stream) -> None: def _append_backup(stream_id: str, ts: float, read_only: bool = True) -> None: controller.state["backups"].append( { - "basebackup_info": {"end_ts": ts}, + "basebackup_info": { + "checkpoints_file_content": None, + "end_ts": ts, + "incremental": False, + }, "closed_at": ts, "completed_at": ts, "broken_at": None, @@ -1868,7 +1872,9 @@ def _add_fake_backup(stream_id: str) -> None: controller.state["backups"].append( { "basebackup_info": { + "checkpoints_file_content": None, "end_ts": now - 20 * 60, + "incremental": False, }, "closed_at": now - 3 * 60, "completed_at": now - 5 * 60, @@ -1996,3 +2002,68 @@ def __call__(self, *args, **kwargs) -> Any: raise Exception() return self.original_function_to_mock(*args, **kwargs) + + +def test_get_incremental_backup_info(default_backup_site, mysql_empty, session_tmpdir): + backup1 = Backup( + basebackup_info=BaseBackup(incremental=False, end_ts=1.0, checkpoints_file_content="backup_type = full-backuped..."), + closed_at=1, + completed_at=1, + recovery_site=False, + stream_id="1000", + resumable=False, + site="not_default_site", + broken_at=None, + preserve_until=None, + ) + controller = build_controller( + Controller, + default_backup_site=default_backup_site, + mysql_config=mysql_empty, + session_tmpdir=session_tmpdir, + ) + import copy + + backup2 = copy.deepcopy(backup1) + backup2["basebackup_info"]["incremental"] = True + backup2["basebackup_info"]["checkpoints_file_content"] = "backup_type = incremental..." + backup2["completed_at"] = 2 + backup2["closed_at"] = 2 + backup2["stream_id"] = "1001" + + backup3 = copy.deepcopy(backup1) + backup3["basebackup_info"]["incremental"] = True + backup3["basebackup_info"]["checkpoints_file_content"] = "backup_type = incremental..." + backup3["completed_at"] = 3 + backup3["closed_at"] = 3 + backup3["stream_id"] = "1002" + + controller.state["backups"] = [backup1, backup2, backup3] + info = controller.get_incremental_backup_info() + assert info["required_streams"] == ["1000", "1001", "1002"] + assert info["last_checkpoint"] == "backup_type = incremental..." + + # Breaking one of the backups or having no checkpoint should return nothing, as incremental backup is not possible + for b in [backup1, backup2, backup3]: + b["broken_at"] = 1 + assert controller.get_incremental_backup_info() is None + b["broken_at"] = None + + for b in [backup1, backup2, backup3]: + checkpoint = b["basebackup_info"]["checkpoints_file_content"] + b["basebackup_info"]["checkpoints_file_content"] = None + assert controller.get_incremental_backup_info() is None + b["basebackup_info"]["checkpoints_file_content"] = checkpoint + + assert controller.get_incremental_backup_info() is not None + + # For whatever reason there is no full backup in the list + backup1["basebackup_info"]["incremental"] = True + assert controller.get_incremental_backup_info() is None + + # We have only one full backup + backup1["basebackup_info"]["incremental"] = False + controller.state["backups"] = [backup1] + info = controller.get_incremental_backup_info() + assert info["required_streams"] == ["1000"] + assert info["last_checkpoint"] == "backup_type = full-backuped..."