Skip to content

Commit

Permalink
Add support for incremental backups
Browse files Browse the repository at this point in the history
  • Loading branch information
alexole committed Feb 5, 2025
1 parent 81bf125 commit 528f978
Show file tree
Hide file tree
Showing 15 changed files with 522 additions and 135 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ RUN apt update && \
liblz4-1 liblz4-dev libldap2-dev libsasl2-dev libsasl2-modules-gssapi-mit libkrb5-dev wget \
libreadline-dev libudev-dev libev-dev libev4 libprocps-dev vim-common
# Download boost and percona-xtrabackup
RUN wget https://boostorg.jfrog.io/artifactory/main/release/1.77.0/source/boost_1_77_0.tar.gz && \
RUN wget https://archives.boost.io/release/1.77.0/source/boost_1_77_0.tar.gz && \
tar -zxvf boost_1_77_0.tar.gz


Expand Down Expand Up @@ -83,9 +83,9 @@ COPY --from=builder-percona-server /usr/local/mysql/bin /usr/bin
COPY --from=builder-percona-server /usr/local/mysql/lib /usr/lib

ADD requirement* /src/
RUN scripts/install-python-deps
RUN sudo scripts/create-user

ADD . /src/
RUN scripts/install-python-deps
RUN git config --global --add safe.directory /src
RUN python -m pip install -e .
6 changes: 5 additions & 1 deletion myhoard.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
"backup_interval_minutes": 1440,
"backup_minute": 0,
"forced_binlog_rotation_interval": 300,
"upload_site": "default"
"upload_site": "default",
"incremental": {
"enabled": false,
"full_backup_week_schedule": "sun,wed"
}
},
"backup_sites": {
"default": {
Expand Down
17 changes: 17 additions & 0 deletions myhoard/backup_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ class BaseBackupFailureReason(str, enum.Enum):
xtrabackup_error = "xtrabackup_error"


class IncrementalBackupInfo(TypedDict):
last_checkpoint: str | None
required_streams: List[str] | None


class BackupStream(threading.Thread):
"""Handles creating a single consistent backup stream. 'stream' here refers to uninterrupted sequence
of backup data that can be used to restore the system to a consistent state. It includes the basebackup,
Expand Down Expand Up @@ -132,6 +137,7 @@ class State(TypedDict):
backup_reason: Optional["BackupStream.BackupReason"]
created_at: float
immediate_scan_required: bool
incremental: bool
initial_latest_complete_binlog_index: Optional[int]
last_binlog_upload_time: int
last_processed_local_index: Optional[int]
Expand Down Expand Up @@ -181,6 +187,7 @@ def __init__(
temp_dir: str,
xtrabackup_settings: Optional[Dict[str, int]] = None,
split_size: Optional[int] = 0,
incremental_backup_info: IncrementalBackupInfo | None = None,
) -> None:
super().__init__()
stream_id = stream_id or self.new_stream_id()
Expand All @@ -195,6 +202,7 @@ def __init__(
self.file_storage_setup_fn = file_storage_setup_fn
self.file_storage: Optional["BaseTransfer"] = None
self.file_uploaded_callback = file_uploaded_callback
self.incremental_backup_info = incremental_backup_info
self.is_running = True
self.iteration_sleep = BackupStream.ITERATION_SLEEP
self.last_basebackup_attempt: Optional[float] = None
Expand Down Expand Up @@ -236,6 +244,7 @@ def __init__(
"backup_reason": backup_reason,
"created_at": time.time(),
"immediate_scan_required": False,
"incremental": incremental_backup_info is not None,
"initial_latest_complete_binlog_index": latest_complete_binlog_index,
"last_binlog_upload_time": 0,
"last_processed_local_index": None,
Expand Down Expand Up @@ -1014,6 +1023,9 @@ def _take_basebackup(self) -> None:
stats=self.stats,
stream_handler=self._basebackup_stream_handler,
temp_dir=self.temp_dir,
incremental_since_checkpoint=self.incremental_backup_info.get("last_checkpoint")
if self.incremental_backup_info
else None,
)
try:
self.basebackup_operation.create_backup()
Expand Down Expand Up @@ -1070,12 +1082,17 @@ def _take_basebackup(self) -> None:
"binlog_name": binlog_info["file_name"] if binlog_info else None,
"binlog_position": binlog_info["file_position"] if binlog_info else None,
"backup_reason": self.state["backup_reason"],
"checkpoints_file_content": self.basebackup_operation.checkpoints_file_content,
"compressed_size": compressed_size,
"encryption_key": rsa_encrypt_bytes(self.rsa_public_key_pem, encryption_key).hex(),
"end_size": self.basebackup_operation.data_directory_size_end,
"end_ts": end_time,
"gtid": last_gtid,
"gtid_executed": gtid_executed,
"incremental": self.basebackup_operation.incremental_since_checkpoint is not None,
"required_streams": self.incremental_backup_info.get("required_streams")
if self.incremental_backup_info
else None,
"initiated_at": self.created_at,
"lsn_info": self.basebackup_operation.lsn_info,
"normalized_backup_time": self.state["normalized_backup_time"],
Expand Down
32 changes: 24 additions & 8 deletions myhoard/basebackup_operation.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Copyright (c) 2019 Aiven, Helsinki, Finland. https://aiven.io/
from contextlib import suppress
from myhoard.errors import BlockMismatchError, XtraBackupError
from myhoard.util import get_mysql_version, mysql_cursor
from myhoard.util import CHECKPOINT_FILENAME, get_mysql_version, mysql_cursor
from packaging.version import Version
from rohmu.util import increase_pipe_capacity, set_stream_nonblocking
from typing import Optional
from typing import Any, Dict, Optional

import base64
import logging
Expand Down Expand Up @@ -61,9 +61,11 @@ def __init__(
stats,
stream_handler,
temp_dir,
incremental_since_checkpoint: str | None = None,
):
self.abort_reason = None
self.binlog_info = None
self.binlog_info: Dict[str, Any] | None = None
self.checkpoints_file_content: str | None = None
self.copy_threads = copy_threads
self.compress_threads = compress_threads
self.current_file = None
Expand All @@ -75,7 +77,9 @@ def __init__(
self.encryption_key = encryption_key
self.has_block_mismatch = False
self.log = logging.getLogger(self.__class__.__name__)
self.lsn_dir = None
self.incremental_since_checkpoint = incremental_since_checkpoint
self.prev_checkpoint_dir = None
self.lsn_dir: str | None = None
self.lsn_info = None
self.mysql_client_params = mysql_client_params
with open(mysql_config_file_name, "r") as config:
Expand Down Expand Up @@ -146,6 +150,12 @@ def create_backup(self):
if self.register_redo_log_consumer:
command_line.append("--register-redo-log-consumer")

if self.incremental_since_checkpoint:
self.prev_checkpoint_dir = tempfile.mkdtemp(dir=self.temp_dir_base, prefix="xtrabackupcheckpoint")
with open(os.path.join(self.prev_checkpoint_dir, CHECKPOINT_FILENAME), "w") as checkpoint_file:
checkpoint_file.write(self.incremental_since_checkpoint)
command_line.extend(["--incremental-basedir", self.prev_checkpoint_dir])

with self.stats.timing_manager("myhoard.basebackup.xtrabackup_backup"):
with subprocess.Popen(
command_line, bufsize=0, stdout=subprocess.PIPE, stderr=subprocess.PIPE
Expand Down Expand Up @@ -262,6 +272,7 @@ def _process_input_output(self):
reader_thread = None

if exit_code == 0:
self._save_checkpoints_file()
self._process_binlog_info()

except AbortRequested as ex:
Expand All @@ -285,10 +296,9 @@ def _process_input_output(self):
if reader_thread:
reader_thread.join()
self.log.info("Thread joined")
if self.lsn_dir:
shutil.rmtree(self.lsn_dir)
if self.temp_dir:
shutil.rmtree(self.temp_dir)
for d in [self.lsn_dir, self.temp_dir, self.prev_checkpoint_dir]:
if d:
shutil.rmtree(d)
self.proc = None

if exit_code != 0:
Expand Down Expand Up @@ -354,6 +364,12 @@ def _process_output_line_file_finished(self, line):
self.current_file = None
return True

def _save_checkpoints_file(self) -> None:
assert self.lsn_dir

with open(os.path.join(self.lsn_dir, CHECKPOINT_FILENAME)) as checkpoints_file:
self.checkpoints_file_content = checkpoints_file.read()

def _process_binlog_info(self) -> None:
assert self.lsn_dir

Expand Down
128 changes: 72 additions & 56 deletions myhoard/basebackup_restore_operation.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Copyright (c) 2019 Aiven, Helsinki, Finland. https://aiven.io/
from .errors import DiskFullError
from .util import get_xtrabackup_version, parse_version, parse_xtrabackup_info
from .util import CHECKPOINT_FILENAME, get_xtrabackup_version, parse_version, parse_xtrabackup_info
from contextlib import suppress
from rohmu.util import increase_pipe_capacity, set_stream_nonblocking
from typing import Final, Optional, Tuple
from typing import Dict, Final, Optional, Tuple

import base64
import fnmatch
Expand Down Expand Up @@ -38,7 +38,8 @@ def __init__(
mysql_data_directory,
stats,
stream_handler,
temp_dir,
target_dir: str,
temp_dir: str,
):
self.current_file = None
self.data_directory_size_end = None
Expand All @@ -51,34 +52,36 @@ def __init__(
self.mysql_data_directory = mysql_data_directory
self.number_of_files = 0
self.prepared_lsn = None
self.proc = None
self.proc: subprocess.Popen[bytes] | None = None
self.stats = stats
self.stream_handler = stream_handler
self.temp_dir = None
self.temp_dir_base = temp_dir
self.backup_xtrabackup_info = None

def restore_backup(self):
if os.path.exists(self.mysql_data_directory):
raise ValueError(f"MySQL data directory {self.mysql_data_directory!r} already exists")
self.target_dir = target_dir
self.temp_dir = temp_dir
self.backup_xtrabackup_info: Dict[str, str] | None = None

def prepare_backup(
self, incremental: bool = False, apply_log_only: bool = False, checkpoints_file_content: str | None = None
):
# Write encryption key to file to avoid having it on command line. NamedTemporaryFile has mode 0600
with tempfile.NamedTemporaryFile() as encryption_key_file:
encryption_key_file.write(base64.b64encode(self.encryption_key))
encryption_key_file.flush()

cpu_count = multiprocessing.cpu_count()
incremental_dir = None
try:
if incremental:
incremental_dir = tempfile.mkdtemp(dir=self.temp_dir, prefix="myhoard_inc_")

with tempfile.NamedTemporaryFile() as encryption_key_file:
encryption_key_file.write(base64.b64encode(self.encryption_key))
encryption_key_file.flush()

self.temp_dir = tempfile.mkdtemp(dir=self.temp_dir_base)
cpu_count = multiprocessing.cpu_count()

try:
command_line = [
"xbstream",
# TODO: Check if it made sense to restore directly to MySQL data directory so that
# we could skip the move phase. It's not clear if move does anything worthwhile
# except skips a few extra files, which could instead be deleted explicitly
"--directory",
self.temp_dir,
self.target_dir if not incremental_dir else incremental_dir,
"--extract",
"--decompress",
"--decompress-threads",
Expand All @@ -101,8 +104,9 @@ def restore_backup(self):
self.proc = xbstream
self._process_xbstream_input_output()

self.data_directory_size_start = self._get_directory_size(self.temp_dir)
xtrabackup_info_path = os.path.join(self.temp_dir, "xtrabackup_info")
self.data_directory_size_start = self._get_directory_size(self.target_dir)

xtrabackup_info_path = os.path.join(self.target_dir, "xtrabackup_info")
if os.path.exists(xtrabackup_info_path):
with open(xtrabackup_info_path) as fh:
xtrabackup_info_text = fh.read()
Expand All @@ -122,8 +126,20 @@ def restore_backup(self):
"--no-version-check",
"--prepare",
"--target-dir",
self.temp_dir,
self.target_dir,
]

if apply_log_only:
# This is needed to prepare all the backups preceding the one to be restored in case of incremental
command_line.append("--apply-log-only")
if incremental_dir:
command_line.extend(["--incremental-dir", incremental_dir])

if checkpoints_file_content:
checkpoints_file_dir = incremental_dir if incremental_dir else self.target_dir
with open(os.path.join(checkpoints_file_dir, CHECKPOINT_FILENAME), "w") as checkpoints_file:
checkpoints_file.write(checkpoints_file_content)

# --use-free-memory-pct introduced in 8.0.30, but it doesn't work in 8.0.30 and leads to PBX crash
if self.free_memory_percentage is not None and get_xtrabackup_version() >= (8, 0, 32):
command_line.insert(2, f"--use-free-memory-pct={self.free_memory_percentage}")
Expand All @@ -133,42 +149,42 @@ def restore_backup(self):
) as prepare:
self.proc = prepare
self._process_prepare_input_output()
finally:
if incremental_dir:
shutil.rmtree(incremental_dir)

# As of Percona XtraBackup 8.0.5 the backup contains binlog.index and one binlog that --move-back
# tries to restore to appropriate location, presumable with the intent of making MySQL patch gtid_executed
# table based on what's in the log. We already have logic for patching the table and we don't need
# this logic from XtraBackup. The logic is also flawed as it tries to restore to location that existed
# on the source server, which may not be valid for destination server. Just delete the files to disable
# that restoration logic.
binlog_index = os.path.join(self.temp_dir, "binlog.index")
if os.path.exists(binlog_index):
with open(binlog_index) as f:
binlogs = f.read().split("\n")
binlogs = [binlog.rsplit("/", 1)[-1] for binlog in binlogs if binlog.strip()]
self.log.info("Deleting redundant binlog index %r and binlogs %r before move", binlog_index, binlogs)
os.remove(binlog_index)
for binlog_name in binlogs:
binlog_name = os.path.join(self.temp_dir, binlog_name)
if os.path.exists(binlog_name):
os.remove(binlog_name)

command_line = [
"xtrabackup",
# defaults file must be given with --defaults-file=foo syntax, space here does not work
f"--defaults-file={self.mysql_config_file_name}",
"--move-back",
"--no-version-check",
"--target-dir",
self.temp_dir,
]
with self.stats.timing_manager("myhoard.basebackup_restore.xtrabackup_move"):
with subprocess.Popen(
command_line, bufsize=0, stdout=subprocess.PIPE, stderr=subprocess.PIPE
) as move_back:
self.proc = move_back
self._process_move_input_output()
finally:
shutil.rmtree(self.temp_dir)
def restore_backup(self):
# As of Percona XtraBackup 8.0.5 the backup contains binlog.index and one binlog that --move-back
# tries to restore to appropriate location, presumable with the intent of making MySQL patch gtid_executed
# table based on what's in the log. We already have logic for patching the table and we don't need
# this logic from XtraBackup. The logic is also flawed as it tries to restore to location that existed
# on the source server, which may not be valid for destination server. Just delete the files to disable
# that restoration logic.
binlog_index = os.path.join(self.target_dir, "binlog.index")
if os.path.exists(binlog_index):
with open(binlog_index) as f:
binlogs = f.read().split("\n")
binlogs = [binlog.rsplit("/", 1)[-1] for binlog in binlogs if binlog.strip()]
self.log.info("Deleting redundant binlog index %r and binlogs %r before move", binlog_index, binlogs)
os.remove(binlog_index)
for binlog_name in binlogs:
binlog_name = os.path.join(self.target_dir, binlog_name)
if os.path.exists(binlog_name):
os.remove(binlog_name)

command_line = [
"xtrabackup",
# defaults file must be given with --defaults-file=foo syntax, space here does not work
f"--defaults-file={self.mysql_config_file_name}",
"--move-back",
"--no-version-check",
"--target-dir",
self.target_dir,
]
with self.stats.timing_manager("myhoard.basebackup_restore.xtrabackup_move"):
with subprocess.Popen(command_line, bufsize=0, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as move_back:
self.proc = move_back
self._process_move_input_output()

self.data_directory_size_end = self._get_directory_size(self.mysql_data_directory, cleanup=True)

Expand Down
Loading

0 comments on commit 528f978

Please sign in to comment.