Skip to content

Commit

Permalink
Merge pull request #327 from rajulkumar/reduce_copy_repo_calls
Browse files Browse the repository at this point in the history
copy-repo makes only two copy calls for a repo pair [RHELDST-28235]
  • Loading branch information
rajulkumar authored Dec 16, 2024
2 parents fa6b407 + 1788ebd commit 49e0556
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 50 deletions.
98 changes: 71 additions & 27 deletions src/pubtools/_pulp/tasks/copy_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from itertools import chain

import attr
from more_executors.futures import f_map, f_sequence, f_proxy
from more_executors.futures import f_map, f_sequence, f_proxy, f_flat_map, f_return
from pubtools.pulplib import (
ContainerImageRepository,
Criteria,
Expand Down Expand Up @@ -73,6 +73,23 @@
ContentType(("package_langpacks",)),
)

RPM_CONTENT_TYPES = (
"rpm",
"srpm",
)

NON_RPM_CONTENT_TYPES = (
"iso",
"erratum",
"modulemd",
"modulemd_defaults",
"yum_repo_metadata_file",
"package_group",
"package_category",
"package_environment",
"package_langpacks",
)


@attr.s(slots=True)
class RepoCopy(object):
Expand All @@ -93,38 +110,56 @@ def content_type_criteria(self):
out = None

def str_to_content_type(content_type_id):
out = None
for item in CONTENT_TYPES:
if content_type_id in item.content_type_ids:
out = item
break

if out is None:
self.fail("Unsupported content type: %s", content_type_id)

return out
return item

if self.args.content_type:
# replace srpm with rpm - we don't need to specify it separately and remove duplicated entries
content_types = set(
map(lambda x: x.replace("srpm", "rpm"), self.args.content_type)
map(
lambda x: x.lower().strip().replace("srpm", "rpm"),
self.args.content_type,
)
)

# check for unsupported content types
unsupported = content_types.difference(
RPM_CONTENT_TYPES + NON_RPM_CONTENT_TYPES
)
content_types = [
str_to_content_type(t.lower().strip()) for t in content_types
if unsupported:
self.fail("Unsupported content type(s): %s", ",".join(unsupported))

rpm_content_types = [
str_to_content_type(t) for t in content_types if t in RPM_CONTENT_TYPES
]
non_rpm_content_types = [
t for t in content_types if t in NON_RPM_CONTENT_TYPES
]

# NOTE: Order of appending the criteria is critical here.
# Non-rpm content types should be copied first as it may contain modulemd
# content type. Modulemd units should be copied before the modular rpms,
# so as in case of a failure or partial copy, the modular rpms aren't
# available to the users. Hence, non-rpm content type criteria is appended
# first in the list of criteria
criteria = []
in_matcher = [] # to aggregate content types for Criteria.with_field()

for item in sorted(content_types):
if item.klass:
criteria.append(
Criteria.with_unit_type(item.klass, unit_fields=item.fields)
)
else:
in_matcher.extend(item.content_type_ids)
if in_matcher:

# criteria for all non-rpm content types
# unit_fields are ignored as they are small in size and the repos have
# small unit counts for non-rpm content types
criteria.append(
Criteria.with_field(
"content_type_id", Matcher.in_(sorted(non_rpm_content_types))
)
)

# criteria for rpm content types
# unit_fields to keep a check on memory consumption with large rpm unit
# counts in the repo
for item in sorted(rpm_content_types):
criteria.append(
Criteria.with_field("content_type_id", Matcher.in_(in_matcher))
Criteria.with_unit_type(item.klass, unit_fields=item.fields)
)

out = criteria
Expand Down Expand Up @@ -214,14 +249,23 @@ def repo_copy(copy_tasks, repo):
tasks = list(chain.from_iterable(copy_tasks))
return RepoCopy(tasks=tasks, repo=repo)

for src_repo, dest_repo in repo_pairs:
for src_repo, dest_repo in sorted(repo_pairs):
one_pair_copies = []
tasks_f = f_return()
for item in criteria or [None]:
tasks_f = self.pulp_client.copy_content(
src_repo, dest_repo, criteria=item
# ensure the criterias are processed and completed/resolved in order
# so that non-rpm copy completes before rpm copy
# pylint:disable=cell-var-from-loop
tasks_f = f_flat_map(
tasks_f,
lambda _: self.pulp_client.copy_content(
src_repo,
dest_repo,
criteria=item,
),
)
# pylint:enable=cell-var-from-loop
one_pair_copies.append(tasks_f)

f = f_map(f_sequence(one_pair_copies), partial(repo_copy, repo=dest_repo))
f = f_map(f, self.log_copy)
fts.append(f)
Expand Down
82 changes: 63 additions & 19 deletions tests/copy_repo/test_copy_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,16 @@ def test_copy_repo_multiple_content_types(command_tester, fake_collector):
relative_url="another/publish/url",
mutable_urls=["repomd.xml"],
)
repoC = YumRepository(
id="other-yumrepo",
relative_url="other/publish/url",
mutable_urls=["repomd.xml"],
)
repoD = YumRepository(
id="yet-another-yumrepo",
relative_url="yet/another/publish/url",
mutable_urls=["repomd.xml"],
)

files = [
RpmUnit(
Expand All @@ -400,11 +410,28 @@ def test_copy_repo_multiple_content_types(command_tester, fake_collector):
name="mymod", stream="s1", version=123, context="a1c2", arch="s390x"
),
]
files2 = [
RpmUnit(
name="dash",
version="1.24",
release="1.test8",
arch="x86_64",
sha256sum="a" * 64,
md5sum="b" * 32,
signing_key="aabbcc",
),
ModulemdUnit(
name="othermod", stream="s1", version=123, context="a1c2", arch="s390x"
),
]

with FakeCopyRepo() as task_instance:
task_instance.pulp_client_controller.insert_repository(repoA)
task_instance.pulp_client_controller.insert_repository(repoB)
task_instance.pulp_client_controller.insert_repository(repoC)
task_instance.pulp_client_controller.insert_repository(repoD)
task_instance.pulp_client_controller.insert_units(repoA, files)
task_instance.pulp_client_controller.insert_units(repoC, files2)

# It should run with expected output.
command_tester.test(
Expand All @@ -422,6 +449,7 @@ def test_copy_repo_multiple_content_types(command_tester, fake_collector):
"--content-type",
"erratum",
"some-yumrepo,another-yumrepo",
"other-yumrepo,yet-another-yumrepo",
],
)

Expand All @@ -441,6 +469,16 @@ def test_copy_repo_multiple_content_types(command_tester, fake_collector):
"signing_key": None,
"build": None,
},
{
"state": "PUSHED",
"origin": "pulp",
"src": None,
"dest": "yet-another-yumrepo",
"filename": "dash-1.24-1.test8.x86_64.rpm",
"checksums": {"sha256": "a" * 64},
"signing_key": None,
"build": None,
},
{
"state": "PUSHED",
"origin": "pulp",
Expand All @@ -451,6 +489,16 @@ def test_copy_repo_multiple_content_types(command_tester, fake_collector):
"signing_key": None,
"build": None,
},
{
"state": "PUSHED",
"origin": "pulp",
"src": None,
"dest": "yet-another-yumrepo",
"filename": "othermod:s1:123:a1c2:s390x",
"checksums": None,
"signing_key": None,
"build": None,
},
]


Expand Down Expand Up @@ -482,11 +530,11 @@ def test_copy_repo_criteria(command_tester):
"--content-type", # duplicate
"modulemd",
"--content-type",
"iso",
"ISO",
"--content-type",
"erratum",
"Erratum",
"--content-type",
"package_group",
"package_group ",
"--content-type",
"package_langpacks",
"some-yumrepo,another-yumrepo",
Expand All @@ -507,6 +555,18 @@ def test_copy_repo_criteria(command_tester):
[
str(item)
for item in [
Criteria.with_field(
"content_type_id",
Matcher.in_(
[
"erratum",
"iso",
"modulemd",
"package_group",
"package_langpacks",
]
),
),
Criteria.with_unit_type(
RpmUnit,
unit_fields=(
Expand All @@ -519,22 +579,6 @@ def test_copy_repo_criteria(command_tester):
"signing_key",
),
),
Criteria.with_unit_type(ErratumUnit, unit_fields=("unit_id",)),
Criteria.with_unit_type(
ModulemdUnit,
unit_fields=(
"name",
"stream",
"version",
"context",
"arch",
),
),
Criteria.with_unit_type(FileUnit, unit_fields=("unit_id",)),
Criteria.with_field(
"content_type_id",
Matcher.in_(["package_group", "package_langpacks"]),
),
]
]
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[ INFO] Check repos: started
[ INFO] Check repos: finished
[ INFO] Copy content: started
[ ERROR] Unsupported content type: container
[ ERROR] Unsupported content type(s): container
[ ERROR] Copy content: failed
# Raised: 30
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[ INFO] Check repos: started
[ INFO] Check repos: finished
[ INFO] Copy content: started
[ WARNING] another-yumrepo: no content copied, tasks: 23a7711a-8133-2876-37eb-dcd9e87a1613, 82e2e662-f728-b4fa-4248-5e3a0a5d2f34, d4713d60-c8a7-0639-eb11-67b367a9c378, e3e70682-c209-4cac-629f-6fbed82c07cd, e6f4590b-9a16-4106-cf6a-659eb4862b21
[ WARNING] another-yumrepo: no content copied, tasks: 82e2e662-f728-b4fa-4248-5e3a0a5d2f34, e3e70682-c209-4cac-629f-6fbed82c07cd
[ INFO] Copy content: finished
[ INFO] Record push items: started
[ INFO] Record push items: finished
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
[ INFO] Check repos: started
[ INFO] Check repos: finished
[ INFO] Copy content: started
[ INFO] another-yumrepo: copied 1 modulemd(s), 1 rpm(s), tasks: 23a7711a-8133-2876-37eb-dcd9e87a1613, 82e2e662-f728-b4fa-4248-5e3a0a5d2f34, d4713d60-c8a7-0639-eb11-67b367a9c378, e3e70682-c209-4cac-629f-6fbed82c07cd
[ INFO] yet-another-yumrepo: copied 1 modulemd(s), 1 rpm(s), tasks: 82e2e662-f728-b4fa-4248-5e3a0a5d2f34, e3e70682-c209-4cac-629f-6fbed82c07cd
[ INFO] another-yumrepo: copied 1 modulemd(s), 1 rpm(s), tasks: 23a7711a-8133-2876-37eb-dcd9e87a1613, d4713d60-c8a7-0639-eb11-67b367a9c378
[ INFO] Copy content: finished
[ INFO] Record push items: started
[ INFO] Record push items: finished
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[ INFO] Check repos: started
[ INFO] Check repos: finished
[ INFO] Copy content: started
[ ERROR] Unsupported content type: container
[ ERROR] Unsupported content type(s): container
[ ERROR] Copy content: failed
# Raised: 30

0 comments on commit 49e0556

Please sign in to comment.