Skip to content

Commit

Permalink
use dictionary instead of list for Controller.state['backups']
Browse files Browse the repository at this point in the history
  • Loading branch information
kathia-barahona committed Oct 19, 2023
1 parent 9fb9bcc commit 775ed44
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 232 deletions.
24 changes: 22 additions & 2 deletions myhoard/backup_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,26 @@ class BaseBackupFailureReason(str, enum.Enum):
xtrabackup_error = "xtrabackup_error"


class BaseBackup(TypedDict, total=False):
binlog_index: Optional[int]
binlog_name: Optional[str]
binlog_position: Optional[int]
backup_reason: Optional["BackupStream.BackupReason"]
compressed_size: float
encryption_key: bool
end_size: Optional[int]
end_ts: float
gtid: str
gtid_executed: Dict[str, List[List[int]]]
initiated_at: float
lsn_info: Optional[Dict[str, float]]
normalized_backup_time: Optional[str]
number_of_files: int
start_size: Optional[int]
start_ts: float
uploaded_from: int


class BackupStream(threading.Thread):
"""Handles creating a single consistent backup stream. 'stream' here refers to uninterrupted sequence
of backup data that can be used to restore the system to a consistent state. It includes the basebackup,
Expand Down Expand Up @@ -121,7 +141,7 @@ class State(TypedDict):
backup_errors: int
basebackup_errors: int
basebackup_file_metadata: Optional[Dict]
basebackup_info: Dict
basebackup_info: BaseBackup
broken_info: Dict
closed_info: Dict
completed_info: Dict
Expand Down Expand Up @@ -1007,7 +1027,7 @@ def _take_basebackup(self) -> None:
self.log.info("Last basebackup GTID %r, truncating GTID executed %r accordingly", last_gtid, gtid_executed)
truncate_gtid_executed(gtid_executed, last_gtid)

info = {
info: BaseBackup = {
"binlog_index": int(binlog_info["file_name"].split(".")[-1]) if binlog_info else None,
"binlog_name": binlog_info["file_name"] if binlog_info else None,
"binlog_position": binlog_info["file_position"] if binlog_info else None,
Expand Down
147 changes: 63 additions & 84 deletions myhoard/controller.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright (c) 2019 Aiven, Helsinki, Finland. https://aiven.io/
from .backup_stream import BackupStream, RemoteBinlogInfo
from .backup_stream import BackupStream, BaseBackup, RemoteBinlogInfo
from .binlog_scanner import BinlogScanner
from .errors import BadRequest, UnknownBackupSite
from .restore_coordinator import BinlogStream, RestoreCoordinator
Expand Down Expand Up @@ -46,10 +46,6 @@
ERR_BACKUP_IN_PROGRESS = 4085


class BaseBackup(TypedDict):
end_ts: float


class Backup(TypedDict):
basebackup_info: BaseBackup
closed_at: Optional[float]
Expand Down Expand Up @@ -83,12 +79,12 @@ class RestoreOptions(TypedDict):
target_time_approximate_ok: bool


def sort_completed_backups(backups: List[Backup]) -> List[Backup]:
def sort_completed_backups(backups: Dict[str, Backup]) -> List[Backup]:
def key(backup):
assert backup["completed_at"] is not None
return backup["completed_at"]

return sorted((backup for backup in backups if backup["completed_at"]), key=key)
return sorted((backup for backup in backups.values() if backup["completed_at"]), key=key)


class Controller(threading.Thread):
Expand Down Expand Up @@ -118,7 +114,7 @@ class Mode(str, enum.Enum):

class State(TypedDict):
backup_request: Optional[BackupRequest]
backups: List[Backup]
backups: Dict[str, Backup]
backups_fetched_at: int
binlogs_purged_at: int
errors: int
Expand Down Expand Up @@ -203,7 +199,7 @@ def __init__(
self.site_transfers: Dict[str, BaseTransfer] = {}
self.state: Controller.State = {
"backup_request": None,
"backups": [],
"backups": {},
"backups_fetched_at": 0,
"binlogs_purged_at": 0,
"errors": 0,
Expand Down Expand Up @@ -298,27 +294,25 @@ def restore_backup(
# Could consider allowing restore request also when mode is `restore`
raise ValueError(f"Current mode is {self.mode}, restore only allowed while in idle mode")

for backup in list(self.state["backups"]):
if backup["stream_id"] != stream_id or backup["site"] != site:
continue
if not backup["basebackup_info"]:
raise ValueError(f"Backup {backup!r} cannot be restored")

if backup.get("broken_at"):
raise ValueError(f"Cannot restore a broken backup: {backup!r}")

if target_time:
if target_time < backup["basebackup_info"]["end_ts"]:
raise ValueError(f"Requested target time {target_time} predates backup completion: {backup!r}")
# Caller must make sure they pick a backup that contains the requested target time. If this backup
# has been closed (will not get any further updates) at a time that is before the requested target
# time it is not possible to satisfy the request
if backup["closed_at"] and target_time > backup["closed_at"]:
raise ValueError(f"Requested target time {target_time} is after backup close: {backup!r}")
break
else:
backup = self.state["backups"].get(stream_id)
if not backup or backup["site"] != site:
raise ValueError(f"Requested backup {stream_id!r} for site {site!r} not found")

if not backup["basebackup_info"]:
raise ValueError(f"Backup {backup!r} cannot be restored")

if backup.get("broken_at"):
raise ValueError(f"Cannot restore a broken backup: {backup!r}")

if target_time:
if target_time < backup["basebackup_info"]["end_ts"]:
raise ValueError(f"Requested target time {target_time} predates backup completion: {backup!r}")
# Caller must make sure they pick a backup that contains the requested target time. If this backup
# has been closed (will not get any further updates) at a time that is before the requested target
# time it is not possible to satisfy the request
if backup["closed_at"] and target_time > backup["closed_at"]:
raise ValueError(f"Requested target time {target_time} is after backup close: {backup!r}")

self.log.info(
"Restoring backup stream %r, target time %r%s",
stream_id,
Expand Down Expand Up @@ -563,38 +557,28 @@ def collect_binlogs_to_purge(
break
return binlogs_to_purge, bool(only_binlogs_without_gtids or only_binlogs_that_are_too_new)

@staticmethod
def get_backup_list(backup_sites: Dict[str, BackupSiteInfo], *, seen_basebackup_infos=None, site_transfers=None):
if seen_basebackup_infos is None:
seen_basebackup_infos = {}
if site_transfers is None:
site_transfers = {}
backups = []
for site_name, site_config in backup_sites.items():
file_storage = site_transfers.get(site_name)
def get_backups(self) -> Dict[str, Backup]:
backups = {}
for site_name, site_config in self.backup_sites.items():
file_storage = self.site_transfers.get(site_name)
if file_storage is None:
file_storage = get_transfer(site_config["object_storage"])
site_transfers[site_name] = file_storage
self.site_transfers[site_name] = file_storage
streams = list(file_storage.list_prefixes(site_name))
for site_and_stream_id in streams:
basebackup_compressed_size = None
basebackup_info = {}
basebackup_info: BaseBackup = self.seen_basebackup_infos.get(site_and_stream_id, {})
broken_info = {}
closed_info = {}
completed_info = {}
preserved_info = {}
for info in file_storage.list_iter(site_and_stream_id):
file_name = info["name"].rsplit("/", 1)[-1]
if file_name == "basebackup.xbstream":
basebackup_compressed_size = info["size"]
elif file_name == "basebackup.json":
if file_name == "basebackup.json" and not basebackup_info:
# The basebackup info json contents never change after creation so we can use cached
# value if available to avoid re-fetching the same content over and over again
basebackup_info = seen_basebackup_infos.get(site_and_stream_id)
if basebackup_info is None:
info_str, _ = file_storage.get_contents_to_string(info["name"])
basebackup_info = json.loads(info_str.decode("utf-8"))
seen_basebackup_infos[site_and_stream_id] = basebackup_info
info_str, _ = file_storage.get_contents_to_string(info["name"])
basebackup_info = json.loads(info_str.decode("utf-8"))
self.seen_basebackup_infos[site_and_stream_id] = basebackup_info
elif file_name == "broken.json":
broken_info = parse_fs_metadata(info["metadata"])
elif file_name == "closed.json":
Expand All @@ -604,25 +588,23 @@ def get_backup_list(backup_sites: Dict[str, BackupSiteInfo], *, seen_basebackup_
elif file_name == "preserved.json":
preserved_info = parse_fs_metadata(info["metadata"])

if basebackup_info and basebackup_compressed_size:
basebackup_info = dict(basebackup_info, compressed_size=basebackup_compressed_size)
resumable = basebackup_info and basebackup_compressed_size
resumable = basebackup_info and basebackup_info["compressed_size"]
completed = resumable and completed_info
closed = completed and closed_info

preserve_until = preserved_info.get("preserve_until")
backups.append(
{
"basebackup_info": basebackup_info,
"broken_at": broken_info.get("broken_at"),
"closed_at": closed_info["closed_at"] if closed else None,
"completed_at": completed_info["completed_at"] if completed else None,
"preserve_until": preserve_until,
"recovery_site": site_config.get("recovery_only", False),
"stream_id": site_and_stream_id.rsplit("/", 1)[-1],
"resumable": bool(resumable),
"site": site_name,
}

stream_id = site_and_stream_id.rsplit("/", 1)[-1]
backups[stream_id] = Backup(
basebackup_info=basebackup_info,
broken_at=broken_info.get("broken_at"),
closed_at=closed_info["closed_at"] if closed else None,
completed_at=completed_info["completed_at"] if completed else None,
preserve_until=preserve_until,
recovery_site=site_config.get("recovery_only", False),
stream_id=stream_id,
resumable=bool(resumable),
site=site_name,
)
return backups

Expand Down Expand Up @@ -1110,13 +1092,14 @@ def _get_upload_backup_site(self):
def _get_site_for_stream_id(self, stream_id: str):
backup = self.get_backup_by_stream_id(stream_id)
if not backup:
KeyError(f"Stream {stream_id} not found in backups")
raise KeyError(f"Stream {stream_id} not found in backups")

return backup["site"]

def get_backup_by_stream_id(self, stream_id: str):
for backup in self.state["backups"]:
if backup["stream_id"] == stream_id:
return backup
with self.lock:
if stream_id in self.state["backups"]:
return self.state["backups"][stream_id]

return None

Expand Down Expand Up @@ -1290,7 +1273,7 @@ def _process_removed_binlogs(self, binlogs):
stream.remove_binlogs(binlogs)

def _purge_old_backups(self):
purgeable = [backup for backup in self.state["backups"] if backup["completed_at"]]
purgeable = sort_completed_backups(self.state["backups"])
broken_backups_count = sum(backup["broken_at"] is not None for backup in purgeable)
# do not consider broken backups for the count, they will still be purged
# but we should only purge when the count of non-broken backups has exceeded the limit.
Expand All @@ -1310,7 +1293,6 @@ def _purge_old_backups(self):
# For simplicity only ever drop one backup here. This function
# is called repeatedly so if there are for any reason more backups
# to drop they will be dropped soon enough
purgeable = sort_completed_backups(purgeable)
backup = purgeable[0]

if not backup["closed_at"]:
Expand Down Expand Up @@ -1342,11 +1324,11 @@ def _purge_old_backups(self):
# lock the controller, this way other requests do not access backups till backup is purged
with self.lock:
self.state_manager.update_state(stream_to_be_purged=None)
current_backups = [
current_backup
for current_backup in self.state["backups"]
if current_backup["stream_id"] != backup["stream_id"]
]
current_backups = {
stream_id: current_backup
for stream_id, current_backup in self.state["backups"].items()
if stream_id != backup["stream_id"]
}
self.state_manager.update_state(backups=current_backups)
owned_stream_ids = [sid for sid in self.state["owned_stream_ids"] if sid != backup["stream_id"]]
self.state_manager.update_state(owned_stream_ids=owned_stream_ids)
Expand Down Expand Up @@ -1450,21 +1432,19 @@ def _purge_old_binlogs(self, *, mysql_maybe_not_running=False):
self.stats.gauge_float("myhoard.binlog.time_since_any_purged", current_time - last_purge)
self.stats.gauge_float("myhoard.binlog.time_since_could_have_purged", current_time - last_could_have_purged)

def _refresh_backups_list(self, force_refresh: bool = False):
def _refresh_backups_list(self, force_refresh: bool = False) -> Optional[Dict[str, Backup]]:
interval = self.backup_refresh_interval_base
if self.mode == self.Mode.active:
interval *= self.BACKUP_REFRESH_ACTIVE_MULTIPLIER

if force_refresh is False and time.time() - self.state["backups_fetched_at"] < interval:
return None

backups = self.get_backup_list(
self.backup_sites, seen_basebackup_infos=self.seen_basebackup_infos, site_transfers=self.site_transfers
)
new_backups_ids = {backup["stream_id"] for backup in backups}
for backup in self.state["backups"]:
if backup["stream_id"] not in new_backups_ids:
self._delete_backup_stream_state(backup["stream_id"])
backups = self.get_backups()

stream_ids_to_delete = set(self.state["backups"]) - set(backups)
for stream_id in stream_ids_to_delete:
self._delete_backup_stream_state(stream_id)

with self.lock:
self.state_manager.update_state(backups=backups, backups_fetched_at=time.time())
Expand Down Expand Up @@ -1518,8 +1498,7 @@ def _refresh_backups_list_and_streams(self):
# Keep any streams that are in basebackup phase because those haven't necessarily
# yet uploaded any files so the remote backup directory might not exist
new_streams = basebackup_streams
for backup in backups:
stream_id = backup["stream_id"]
for stream_id, backup in backups.items():
site_info = self.backup_sites.get(backup["site"])
# We do not create backup streams for recovery sites. Those are only used for restoring
# basic backup data, never to stream any changes. Also, if config is updated not to
Expand Down
2 changes: 1 addition & 1 deletion myhoard/web_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ async def backup_list(self, _request):
}
with self.controller.lock:
if self.controller.state["backups_fetched_at"]:
response["backups"] = self.controller.state["backups"]
response["backups"] = list(self.controller.state["backups"].values())
return json_response(response)

async def backup_preserve(self, request):
Expand Down
Loading

0 comments on commit 775ed44

Please sign in to comment.