Skip to content

Commit

Permalink
Merge pull request #280 from rbikar/fix-copy-repos
Browse files Browse the repository at this point in the history
Fix CopyRepo task [RHELDST-23737, RHELDST-23709]
  • Loading branch information
rbikar authored Apr 30, 2024
2 parents 85c89f2 + 7e6ee22 commit ececfa4
Show file tree
Hide file tree
Showing 22 changed files with 441 additions and 79 deletions.
205 changes: 135 additions & 70 deletions pubtools/_pulp/tasks/copy_repo.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
import logging
from collections import namedtuple
from functools import partial
from itertools import chain

import attr
from more_executors.futures import f_map, f_sequence
from more_executors.futures import f_map, f_sequence, f_proxy
from pubtools.pulplib import (
ContainerImageRepository,
Criteria,
ErratumUnit,
Matcher,
FileUnit,
ModulemdUnit,
RpmUnit,
ErratumUnit,
ModulemdUnit,
ModulemdDefaultsUnit,
YumRepoMetadataFileUnit,
)


from pubtools._pulp.arguments import SplitAndExtend
from pubtools._pulp.services import CollectorService, PulpClientService
from pubtools._pulp.task import PulpTask
Expand All @@ -21,6 +27,52 @@

LOG = logging.getLogger("pubtools.pulp")

ContentType = namedtuple(
"ContentType", ["content_type_ids", "klass", "fields"], defaults=[None, None]
)

_RPM_FIELDS = (
"name",
"version",
"release",
"arch",
"sha256sum",
"md5sum",
"signing_key",
)
_MODULEMD_FIELDS = (
"name",
"stream",
"version",
"context",
"arch",
)
_FILE_FIELDS = (
"path",
"sha256sum",
)
_MINIMAL_FIELDS = ("unit_id",)

CONTENT_TYPES = (
ContentType(("iso",), FileUnit, _FILE_FIELDS),
ContentType(
(
"rpm",
"srpm",
),
RpmUnit,
_RPM_FIELDS,
),
ContentType(("erratum",), ErratumUnit, _MINIMAL_FIELDS),
ContentType(("modulemd",), ModulemdUnit, _MODULEMD_FIELDS),
ContentType(("modulemd_defaults",), ModulemdDefaultsUnit, _MINIMAL_FIELDS),
ContentType(("yum_repo_metadata_file",), YumRepoMetadataFileUnit, _MINIMAL_FIELDS),
ContentType(("package_group",)),
ContentType(("package_category",)),
ContentType(("package_environment",)),
ContentType(("package_langpacks",)),
)


@attr.s(slots=True)
class RepoCopy(object):
Expand All @@ -35,26 +87,62 @@ class RepoCopy(object):

class CopyRepo(CollectorService, PulpClientService, PulpRepositoryOperation):
@property
def content_type(self):
def content_type_criteria(self):
# Only return non-None if there were really any types given.
# Otherwise, return None to let library defaults apply
c = self.args.content_type or None
# Normalize content types (e.g., "ISO" -> "iso").
if c:
c = [t.lower() for t in c]
return c
out = None

def str_to_content_type(content_type_id):
out = None
for item in CONTENT_TYPES:
if content_type_id in item.content_type_ids:
out = item
break

if out is None:
self.fail("Unsupported content type: %s", content_type_id)

return out

if self.args.content_type:
# replace srpm with rpm - we don't need to specify it separately and remove duplicated entries
content_types = set(
map(lambda x: x.replace("srpm", "rpm"), self.args.content_type)
)
content_types = [
str_to_content_type(t.lower().strip()) for t in content_types
]
criteria = []
in_matcher = [] # to aggregate content types for Criteria.with_field()

for item in sorted(content_types):
if item.klass:
criteria.append(
Criteria.with_unit_type(item.klass, unit_fields=item.fields)
)
else:
in_matcher.extend(item.content_type_ids)
if in_matcher:
criteria.append(
Criteria.with_field("content_type_id", Matcher.in_(in_matcher))
)

out = criteria
return out

@property
def repo_pairs(self):
out = []
out = set()
for pair in self.args.repopairs:
pair = list(map(str, pair.split(",")))
if not len(pair) == 2 or any([not r_id.strip() for r_id in pair]):
parsed = tuple(pair.split(","))
out.add(parsed)

if not len(parsed) == 2 or any([not r_id.strip() for r_id in pair]):
self.fail(
"Pair(s) must contain two repository IDs, source and destination. Got: %s",
"Pair(s) must contain two repository IDs, source and destination. Got: '%s'",
pair,
)
out.append(pair)

return out

def add_args(self):
Expand All @@ -80,34 +168,15 @@ def get_repos(self):
# Eagerly loads the repos so we fail early if the user passed any nonexistent
# repo.
repo_ids = []
found_repos = []
repo_pairs = []

repo_ids = set(chain.from_iterable(self.repo_pairs))
# Eagerly load all repos to fail early if the user passed any nonexistent repo.
for id_pair in self.repo_pairs:
# We'll need a flat list of given IDs later.
repo_ids.extend(id_pair)

search = self.pulp_client.search_repository(Criteria.with_id(id_pair))

src = None
dest = None
for repo in search.result():
# We'll need a flat list of all search results later.
found_repos.append(repo)

if repo.id == id_pair[0]:
src = repo
if repo.id == id_pair[1]:
dest = repo

repo_pairs.append((src, dest))

search = self.pulp_client.search_repository(Criteria.with_id(repo_ids))
found_repos_map = {repo.id: repo for repo in search.result()}
# Bail out if user requested repos which don't exist
missing = set(repo_ids) - {repo.id for repo in found_repos}
missing = sorted(list(missing))
missing = repo_ids - set(found_repos_map)
if missing:
self.fail("Requested repo(s) don't exist: %s", ", ".join(missing))
self.fail("Requested repo(s) don't exist: %s", ", ".join(sorted(missing)))

# Bail out if we'd be processing any container image repos.
# We don't support this now because:
Expand All @@ -121,7 +190,7 @@ def get_repos(self):
container_repo_ids = sorted(
[
repo.id
for repo in found_repos
for repo in found_repos_map.values()
if isinstance(repo, ContainerImageRepository)
]
)
Expand All @@ -131,54 +200,50 @@ def get_repos(self):
% ", ".join(sorted(container_repo_ids))
)

return repo_pairs
return [
(found_repos_map[repo_id_src], found_repos_map[repo_id_dest])
for repo_id_src, repo_id_dest in self.repo_pairs
]

@step("Copy content")
def copy_content(self, src_repo, dest_repo):
futures = []
content_types = self.content_type or ["None"]
for t in content_types:
crit = None
if t == "iso":
crit = Criteria.with_unit_type(FileUnit)
if t in ("rpm", "srpm"):
crit = Criteria.with_unit_type(RpmUnit)
if t == "erratum":
crit = Criteria.with_unit_type(ErratumUnit)
if t == "modulemd":
crit = Criteria.with_unit_type(ModulemdUnit)

f = self.pulp_client.copy_content(src_repo, dest_repo, criteria=crit)
f = f_map(f, partial(RepoCopy, repo=dest_repo))
def copy_content(self, repo_pairs):
fts = []
criteria = self.content_type_criteria

def repo_copy(copy_tasks, repo):
tasks = list(chain.from_iterable(copy_tasks))
return RepoCopy(tasks=tasks, repo=repo)

for src_repo, dest_repo in repo_pairs:
one_pair_copies = []
for item in criteria or [None]:
tasks_f = self.pulp_client.copy_content(
src_repo, dest_repo, criteria=item
)
one_pair_copies.append(tasks_f)

f = f_map(f_sequence(one_pair_copies), partial(repo_copy, repo=dest_repo))
f = f_map(f, self.log_copy)
futures.append(f)
fts.append(f)

return futures
return fts

def run(self):
# Get a list of repo pairs we'll be dealing with.
# This is blocking so we'll fail early on missing/bad repos.
repo_pairs = self.get_repos()

# Start copying repos.
repos_to_flush = []
repo_copies_fs = []

for pair in repo_pairs:
repo_copies_fs.extend(self.copy_content(pair[0], pair[1]))

# We shouldn't need to flush the source repos, just the updated dest.
repos_to_flush.append(pair[1])
repo_copies_fs = self.copy_content(repo_pairs)

# As copying completes, record pushitem info on what was copied.
# We don't have to wait on this before continuing.
to_await = self.record_push_items(repo_copies_fs, "PUSHED")

# Don't need the repo copying tasks for anything more.
repos_fs = [f_map(f, lambda cr: cr.repo) for f in repo_copies_fs]
repos_fs = [f_proxy(f_map(f, lambda cr: cr.repo)) for f in repo_copies_fs]

# Now move repos into the desired state:

# They should be published.
publish_fs = self.publish(repos_fs)

Expand All @@ -189,10 +254,10 @@ def run(self):
f_sequence(publish_fs).result()

# They should have UD cache flushed.
to_await.extend(self.flush_ud(repos_to_flush))
to_await.extend(self.flush_ud(repos_fs))

# They should have CDN cache flushed.
to_await.extend(self.flush_cdn(repos_to_flush))
to_await.extend(self.flush_cdn(repos_fs))

# Now make sure we wait for everything to finish.
for f in to_await:
Expand Down
Loading

0 comments on commit ececfa4

Please sign in to comment.