Skip to content

Commit

Permalink
Run without gevent #notests
Browse files Browse the repository at this point in the history
  • Loading branch information
squeaky-pl committed Nov 6, 2024
1 parent 01f9b55 commit eb30cdb
Showing 1 changed file with 31 additions and 14 deletions.
45 changes: 31 additions & 14 deletions bin/recompress-raw-mime.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
#!/usr/bin/env python
from gevent import monkey

monkey.patch_all()

import datetime
import enum
Expand Down Expand Up @@ -40,7 +37,7 @@ class Resolution(enum.Enum):


# https://stackoverflow.com/questions/73395864/how-do-i-wait-when-all-threadpoolexecutor-threads-are-busy
class RecompressThreadPoolExecutor(ThreadPoolExecutor):
class CompressThreadPoolExecutor(ThreadPoolExecutor):
"""ThreadPoolExecutor that keeps track of the number of available workers.
Refs:
Expand Down Expand Up @@ -182,18 +179,34 @@ def overwrite_parallel(compressed_raw_mime_by_sha256: "dict[str, bytes]") -> Non
)


def recompress_batch(
recompress_sha256s: "dict[str, int]", *, dry_run=True, compression_level: int = 3
def compress_batch(
recompress_sha256s: "dict[str, int]",
*,
dry_run=True,
compression_level: int = 3,
recompress: bool = False,
) -> None:
if not recompress_sha256s:
return

data_by_sha256 = {
data_sha256: data
for data_sha256, data in download_parallel(set(recompress_sha256s))
if data is not None and not data.startswith(blockstore.ZSTD_MAGIC_NUMBER_PREFIX)
if data is not None
}

if recompress:
data_by_sha256 = {
data_sha256: blockstore.maybe_decompress_raw_mime(data)
for data_sha256, data in data_by_sha256.items()
}
else:
data_by_sha256 = {
data_sha256: data
for data_sha256, data in data_by_sha256.items()
if not data.startswith(blockstore.ZSTD_MAGIC_NUMBER_PREFIX)
}

if not data_by_sha256:
return

Expand Down Expand Up @@ -276,6 +289,7 @@ def recompress_batch(
"--max-recompress-batch-bytes", type=int, default=MAX_RECOMPRESS_BATCH_BYTES
)
@click.option("--fraction", type=str, default=None)
@click.option("--recompress/--no-recompress", default=False)
def run(
limit: "int | None",
after: "str | None",
Expand All @@ -294,6 +308,7 @@ def run(
min_size: "int | None",
max_recompress_batch_bytes: int,
fraction: "str | None",
recompress: bool,
) -> int:
shutting_down = False

Expand All @@ -317,7 +332,7 @@ def shutdown(signum, frame):
assert batch_size > 0
assert recompress_batch_size > 0

recompress_executor = RecompressThreadPoolExecutor(
compress_executor = CompressThreadPoolExecutor(
max_workers=recompress_executor_workers
)

Expand Down Expand Up @@ -377,24 +392,26 @@ def shutdown(signum, frame):
len(recompress_sha256s) >= recompress_batch_size
or recompress_bytes > max_recompress_batch_bytes
):
recompress_executor.wait_for_available_worker()
recompress_executor.submit(
recompress_batch,
compress_executor.wait_for_available_worker()
compress_executor.submit(
compress_batch,
recompress_sha256s.copy(),
dry_run=dry_run,
compression_level=compression_level,
recompress=recompress,
)
recompress_sha256s.clear()
recompress_bytes = 0

if shutting_down:
break

recompress_executor.submit(
recompress_batch,
compress_executor.submit(
compress_batch,
recompress_sha256s.copy(),
dry_run=dry_run,
compression_level=compression_level,
recompress=recompress,
)

if shutting_down:
Expand All @@ -405,7 +422,7 @@ def shutdown(signum, frame):

after_id = max_id + 1

recompress_executor.shutdown(wait=True)
compress_executor.shutdown(wait=True)


if __name__ == "__main__":
Expand Down

0 comments on commit eb30cdb

Please sign in to comment.