From 6182c7c6f585f2647249cbc041d48dbb1c8c453a Mon Sep 17 00:00:00 2001 From: Starbuck5 <46412508+Starbuck5@users.noreply.github.com> Date: Thu, 21 Mar 2024 00:50:10 -0700 Subject: [PATCH] Remove pygame.threads --- buildconfig/stubs/gen_stubs.py | 1 - buildconfig/stubs/pygame/__init__.pyi | 1 - setup.py | 6 +- src_py/__init__.py | 4 - src_py/threads/__init__.py | 271 -------------------------- test/image_test.py | 12 +- test/test_utils/run_tests.py | 57 +++--- test/threads_test.py | 238 ---------------------- 8 files changed, 32 insertions(+), 558 deletions(-) delete mode 100644 src_py/threads/__init__.py delete mode 100644 test/threads_test.py diff --git a/buildconfig/stubs/gen_stubs.py b/buildconfig/stubs/gen_stubs.py index b62bf166e4..a64c57646a 100644 --- a/buildconfig/stubs/gen_stubs.py +++ b/buildconfig/stubs/gen_stubs.py @@ -35,7 +35,6 @@ "surfarray", "transform", "scrap", - "threads", "version", "base", "bufferproxy", diff --git a/buildconfig/stubs/pygame/__init__.pyi b/buildconfig/stubs/pygame/__init__.pyi index d5431cc840..c05a428fb4 100644 --- a/buildconfig/stubs/pygame/__init__.pyi +++ b/buildconfig/stubs/pygame/__init__.pyi @@ -22,7 +22,6 @@ from pygame import ( surfarray as surfarray, transform as transform, scrap as scrap, - threads as threads, version as version, base as base, bufferproxy as bufferproxy, diff --git a/setup.py b/setup.py index 1fa4d803bb..a6d265e30a 100644 --- a/setup.py +++ b/setup.py @@ -951,7 +951,6 @@ def run(self): PACKAGEDATA = { "cmdclass": cmdclass, "packages": ['pygame', - 'pygame.threads', 'pygame._sdl2', 'pygame.tests', 'pygame.tests.test_utils', @@ -960,7 +959,6 @@ def run(self): 'pygame.__pyinstaller'], "package_dir": {'pygame': 'src_py', 'pygame._sdl2': 'src_py/_sdl2', - 'pygame.threads': 'src_py/threads', 'pygame.tests': 'test', 'pygame.docs': 'docs', 'pygame.examples': 'examples', @@ -981,11 +979,9 @@ def run(self): PACKAGEDATA = { "cmdclass": cmdclass, "packages": ['pygame', - 'pygame.threads', 'pygame._sdl2'], "package_dir": {'pygame': 'src_py', - 'pygame._sdl2': 'src_py/_sdl2', - 'pygame.threads': 'src_py/threads'}, + 'pygame._sdl2': 'src_py/_sdl2'}, "ext_modules": extensions, "zip_safe": False, "data_files": data_files diff --git a/src_py/__init__.py b/src_py/__init__.py index 6b1b206326..8338ad04bf 100644 --- a/src_py/__init__.py +++ b/src_py/__init__.py @@ -197,10 +197,6 @@ def Cursor(*args): # pylint: disable=unused-argument except (ImportError, OSError): sprite = MissingModule("sprite", urgent=1) -try: - import pygame.threads -except (ImportError, OSError): - threads = MissingModule("threads", urgent=1) try: import pygame.pixelcopy diff --git a/src_py/threads/__init__.py b/src_py/threads/__init__.py deleted file mode 100644 index 3e996fac4f..0000000000 --- a/src_py/threads/__init__.py +++ /dev/null @@ -1,271 +0,0 @@ -""" -* Experimental * - -Like the map function, but can use a pool of threads. - -Really easy to use threads. eg. tmap(f, alist) - -If you know how to use the map function, you can use threads. -""" - -__author__ = "Rene Dudfield" -__version__ = "0.3.0" -__license__ = "Python license" - -from queue import Queue, Empty -import threading - - -Thread = threading.Thread - -STOP = object() -FINISH = object() - -# DONE_ONE = object() -# DONE_TWO = object() - -# a default worker queue. -_wq = None - -# if we are using threads or not. This is the number of workers. -_use_workers = 0 - -# Set this to the maximum for the amount of Cores/CPUs -# Note, that the tests early out. -# So it should only test the best number of workers +2 -MAX_WORKERS_TO_TEST = 64 - - -def init(number_of_workers=0): - """Does a little test to see if threading is worth it. - Sets up a global worker queue if it's worth it. - - Calling init() is not required, but is generally better to do. - """ - global _wq, _use_workers - - if number_of_workers: - _use_workers = number_of_workers - else: - _use_workers = benchmark_workers() - - # if it is best to use zero workers, then use that. - _wq = WorkerQueue(_use_workers) - - -def quit(): - """cleans up everything.""" - global _wq, _use_workers - _wq.stop() - _wq = None - _use_workers = False - - -def benchmark_workers(a_bench_func=None, the_data=None): - """does a little test to see if workers are at all faster. - Returns the number of workers which works best. - Takes a little bit of time to run, so you should only really call - it once. - You can pass in benchmark data, and functions if you want. - a_bench_func - f(data) - the_data - data to work on. - """ - # TODO: try and make this scale better with slower/faster cpus. - # first find some variables so that using 0 workers takes about 1.0 seconds. - # then go from there. - - # note, this will only work with pygame 1.8rc3+ - # replace the doit() and the_data with something that releases the GIL - - import pygame - import pygame.transform - import time - - if not a_bench_func: - - def doit(x): - return pygame.transform.scale(x, (544, 576)) - - else: - doit = a_bench_func - - if not the_data: - thedata = [pygame.Surface((155, 155), 0, 32) for x in range(10)] - else: - thedata = the_data - - best = time.time() + 100000000 - best_number = 0 - # last_best = -1 - - for num_workers in range(0, MAX_WORKERS_TO_TEST): - wq = WorkerQueue(num_workers) - t1 = time.time() - for _ in range(20): - print(f"active count:{threading.active_count()}") - tmap(doit, thedata, worker_queue=wq) - t2 = time.time() - - wq.stop() - - total_time = t2 - t1 - print(f"total time num_workers:{num_workers}: time:{total_time}:") - - if total_time < best: - # last_best = best_number - best_number = num_workers - best = total_time - - if num_workers - best_number > 1: - # We tried to add more, but it didn't like it. - # so we stop with testing at this number. - break - - return best_number - - -class WorkerQueue: - def __init__(self, num_workers=20): - self.queue = Queue() - self.pool = [] - self._setup_workers(num_workers) - - def _setup_workers(self, num_workers): - """Sets up the worker threads - NOTE: undefined behaviour if you call this again. - """ - self.pool = [] - - for _ in range(num_workers): - self.pool.append(Thread(target=self.threadloop)) - - for a_thread in self.pool: - a_thread.daemon = True - a_thread.start() - - def do(self, f, *args, **kwArgs): - """puts a function on a queue for running later.""" - self.queue.put((f, args, kwArgs)) - - def stop(self): - """Stops the WorkerQueue, waits for all of the threads to finish up.""" - self.queue.put(STOP) - for thread in self.pool: - thread.join() - - def threadloop(self): # , finish=False): - """Loops until all of the tasks are finished.""" - while True: - args = self.queue.get() - if args is STOP: - self.queue.put(STOP) - self.queue.task_done() - break - try: - args[0](*args[1], **args[2]) - finally: - # clean up the queue, raise the exception. - self.queue.task_done() - # raise - - def wait(self): - """waits until all tasks are complete.""" - self.queue.join() - - -class FuncResult: - """Used for wrapping up a function call so that the results are stored - inside the instances result attribute. - """ - - def __init__(self, f, callback=None, errback=None): - """f - is the function we that we call - callback(result) - this is called when the function(f) returns - errback(exception) - this is called when the function(f) raises - an exception. - """ - self.f = f - self.exception = None - self.result = None - self.callback = callback - self.errback = errback - - def __call__(self, *args, **kwargs): - # we try to call the function here. If it fails we store the exception. - try: - self.result = self.f(*args, **kwargs) - if self.callback: - self.callback(self.result) - except Exception as e: - self.exception = e - if self.errback: - self.errback(self.exception) - - -def tmap(f, seq_args, num_workers=20, worker_queue=None, wait=True, stop_on_error=True): - """like map, but uses a thread pool to execute. - num_workers - the number of worker threads that will be used. If pool - is passed in, then the num_workers arg is ignored. - worker_queue - you can optionally pass in an existing WorkerQueue. - wait - True means that the results are returned when everything is finished. - False means that we return the [worker_queue, results] right away instead. - results, is returned as a list of FuncResult instances. - stop_on_error - - """ - - if worker_queue: - wq = worker_queue - else: - # see if we have a global queue to work with. - if _wq: - wq = _wq - else: - if num_workers == 0: - return map(f, seq_args) - - wq = WorkerQueue(num_workers) - - # we shortcut it here if the number of workers is 0. - # normal map should be faster in this case. - if len(wq.pool) == 0: - return map(f, seq_args) - - # print("queue size:%s" % wq.queue.qsize()) - - # TODO: divide the data (seq_args) into even chunks and - # then pass each thread a map(f, equal_part(seq_args)) - # That way there should be less locking, and overhead. - - results = [] - for sa in seq_args: - results.append(FuncResult(f)) - wq.do(results[-1], sa) - - # wq.stop() - - if wait: - # print("wait") - wq.wait() - # print("after wait") - # print("queue size:%s" % wq.queue.qsize()) - if wq.queue.qsize(): - raise RuntimeError("buggy threadmap") - # if we created a worker queue, we need to stop it. - if not worker_queue and not _wq: - # print("stopping") - wq.stop() - if wq.queue.qsize(): - um = wq.queue.get() - if not um is STOP: - raise RuntimeError("buggy threadmap") - - # see if there were any errors. If so raise the first one. This matches map behaviour. - # TODO: the traceback doesn't show up nicely. - # NOTE: TODO: we might want to return the results anyway? This should be an option. - if stop_on_error: - error_ones = list(filter(lambda x: x.exception, results)) - if error_ones: - raise error_ones[0].exception - - return map(lambda x: x.result, results) - return [wq, results] diff --git a/test/image_test.py b/test/image_test.py index 1ba2ffcb87..4ce2e792c4 100644 --- a/test/image_test.py +++ b/test/image_test.py @@ -6,6 +6,7 @@ import unittest import glob import pathlib +from concurrent.futures import ThreadPoolExecutor from pygame.tests.test_utils import example_path, png, tostring import pygame, pygame.image, pygame.pkgdata @@ -1356,12 +1357,11 @@ def test_save_extended(self): ) def threads_load(self, images): - import pygame.threads - - for i in range(10): - surfs = pygame.threads.tmap(pygame.image.load, images) - for s in surfs: - self.assertIsInstance(s, pygame.Surface) + for _ in range(10): + with ThreadPoolExecutor(max_workers=20) as executor: + surfs = executor.map(pygame.image.load, images) + for s in surfs: + self.assertIsInstance(s, pygame.Surface) def test_load_png_threads(self): self.threads_load(glob.glob(example_path("data/*.png"))) diff --git a/test/test_utils/run_tests.py b/test/test_utils/run_tests.py index d4a17eb82e..793cab7f04 100644 --- a/test/test_utils/run_tests.py +++ b/test/test_utils/run_tests.py @@ -25,7 +25,6 @@ TEST_RESULTS_START, ) import pygame -import pygame.threads import os import re @@ -33,6 +32,7 @@ import tempfile import time import random +from concurrent.futures import ThreadPoolExecutor from pprint import pformat was_run = False @@ -250,41 +250,34 @@ def sub_test(module): ), ) - if option_multi_thread > 1: + with ThreadPoolExecutor(max_workers=option_multi_thread) as executor: + t = time.time() - def tmap(f, args): - return pygame.threads.tmap( - f, args, stop_on_error=False, num_workers=option_multi_thread - ) - - else: - tmap = map - - t = time.time() + for module, cmd, (return_code, raw_return) in executor.map( + sub_test, test_modules + ): + test_file = f"{os.path.join(test_subdir, module)}.py" + cmd, test_env, working_dir = cmd - for module, cmd, (return_code, raw_return) in tmap(sub_test, test_modules): - test_file = f"{os.path.join(test_subdir, module)}.py" - cmd, test_env, working_dir = cmd - - test_results = get_test_results(raw_return) - if test_results: - results.update(test_results) - else: - results[module] = {} - - results[module].update( - dict( - return_code=return_code, - raw_return=raw_return, - cmd=cmd, - test_file=test_file, - test_env=test_env, - working_dir=working_dir, - module=module, + test_results = get_test_results(raw_return) + if test_results: + results.update(test_results) + else: + results[module] = {} + + results[module].update( + dict( + return_code=return_code, + raw_return=raw_return, + cmd=cmd, + test_file=test_file, + test_env=test_env, + working_dir=working_dir, + module=module, + ) ) - ) - t = time.time() - t + t = time.time() - t ########################################################################### # Output Results diff --git a/test/threads_test.py b/test/threads_test.py deleted file mode 100644 index 612e6900f8..0000000000 --- a/test/threads_test.py +++ /dev/null @@ -1,238 +0,0 @@ -import unittest -from pygame.threads import FuncResult, tmap, WorkerQueue, Empty, STOP -from pygame import threads, Surface, transform - - -import time - - -class WorkerQueueTypeTest(unittest.TestCase): - def test_usage_with_different_functions(self): - def f(x): - return x + 1 - - def f2(x): - return x + 2 - - wq = WorkerQueue() - fr = FuncResult(f) - fr2 = FuncResult(f2) - wq.do(fr, 1) - wq.do(fr2, 1) - wq.wait() - wq.stop() - - self.assertEqual(fr.result, 2) - self.assertEqual(fr2.result, 3) - - def test_do(self): - """Tests function placement on queue and execution after blocking function completion.""" - # __doc__ (as of 2008-06-28) for pygame.threads.WorkerQueue.do: - - # puts a function on a queue for running _later_. - - # TODO: This tests needs refactoring to avoid sleep. - # sleep is slow and unreliable (especially on VMs). - - # def sleep_test(): - # time.sleep(0.5) - - # def calc_test(x): - # return x + 1 - - # worker_queue = WorkerQueue(num_workers=1) - # sleep_return = FuncResult(sleep_test) - # calc_return = FuncResult(calc_test) - # init_time = time.time() - # worker_queue.do(sleep_return) - # worker_queue.do(calc_return, 1) - # worker_queue.wait() - # worker_queue.stop() - # time_diff = time.time() - init_time - - # self.assertEqual(sleep_return.result, None) - # self.assertEqual(calc_return.result, 2) - # self.assertGreaterEqual(time_diff, 0.5) - - def test_stop(self): - """Ensure stop() stops the worker queue""" - wq = WorkerQueue() - - self.assertGreater(len(wq.pool), 0) - - for t in wq.pool: - self.assertTrue(t.is_alive()) - - for i in range(200): - wq.do(lambda x: x + 1, i) - - wq.stop() - - for t in wq.pool: - self.assertFalse(t.is_alive()) - - self.assertIs(wq.queue.get(), STOP) - - def test_threadloop(self): - # __doc__ (as of 2008-06-28) for pygame.threads.WorkerQueue.threadloop: - - # Loops until all of the tasks are finished. - - # Make a worker queue with only one thread - wq = WorkerQueue(1) - - # Occupy the one worker with the threadloop - # wq threads are just threadloop, so this makes an embedded threadloop - wq.do(wq.threadloop) - - # Make sure wq can still do work - # If wq can still do work, threadloop works - l = [] - wq.do(l.append, 1) - # Wait won't work because the primary thread is in an infinite loop - time.sleep(0.5) - self.assertEqual(l[0], 1) - - # Kill the embedded threadloop by sending stop onto the stack - # Threadloop puts STOP back onto the queue when it STOPs so this kills both loops - wq.stop() - - # Make sure wq has stopped - self.assertFalse(wq.pool[0].is_alive()) - - def test_wait(self): - # __doc__ (as of 2008-06-28) for pygame.threads.WorkerQueue.wait: - - # waits until all tasks are complete. - - wq = WorkerQueue() - - for i in range(2000): - wq.do(lambda x: x + 1, i) - wq.wait() - - self.assertRaises(Empty, wq.queue.get_nowait) - - wq.stop() - - -class ThreadsModuleTest(unittest.TestCase): - def test_benchmark_workers(self): - """Ensure benchmark_workers performance measure functions properly with both default and specified inputs""" - "tags:long_running" - - # __doc__ (as of 2008-06-28) for pygame.threads.benchmark_workers: - - # does a little test to see if workers are at all faster. - # Returns the number of workers which works best. - # Takes a little bit of time to run, so you should only really call - # it once. - # You can pass in benchmark data, and functions if you want. - # a_bench_func - f(data) - # the_data - data to work on. - optimal_workers = threads.benchmark_workers() - self.assertIsInstance(optimal_workers, int) - self.assertTrue(0 <= optimal_workers < 64) - - # Test passing benchmark data and function explicitly - def smooth_scale_bench(data): - transform.smoothscale(data, (128, 128)) - - surf_data = [Surface((x, x), 0, 32) for x in range(12, 64, 12)] - best_num_workers = threads.benchmark_workers(smooth_scale_bench, surf_data) - self.assertIsInstance(best_num_workers, int) - - def test_init(self): - """Ensure init() sets up the worker queue""" - threads.init(8) - - self.assertIsInstance(threads._wq, WorkerQueue) - - threads.quit() - - def test_quit(self): - """Ensure quit() cleans up the worker queue""" - threads.init(8) - threads.quit() - - self.assertIsNone(threads._wq) - - def test_tmap(self): - # __doc__ (as of 2008-06-28) for pygame.threads.tmap: - - # like map, but uses a thread pool to execute. - # num_workers - the number of worker threads that will be used. If pool - # is passed in, then the num_workers arg is ignored. - # worker_queue - you can optionally pass in an existing WorkerQueue. - # wait - True means that the results are returned when everything is finished. - # False means that we return the [worker_queue, results] right away instead. - # results, is returned as a list of FuncResult instances. - # stop_on_error - - - ## test that the outcomes of map and tmap are the same - func, data = lambda x: x + 1, range(100) - - tmapped = list(tmap(func, data)) - mapped = list(map(func, data)) - - self.assertEqual(tmapped, mapped) - - ## Test that setting tmap to not stop on errors produces the expected result - data2 = range(100) - always_excepts = lambda x: 1 / 0 - - tmapped2 = list(tmap(always_excepts, data2, stop_on_error=False)) - - # Use list comprehension to check all entries are None as all function - # calls made by tmap will have thrown an exception (ZeroDivisionError) - # Condense to single bool with `all`, which will return true if all - # entries are true - self.assertTrue(all([x is None for x in tmapped2])) - - def todo_test_tmap__None_func_and_multiple_sequences(self): - """Using a None as func and multiple sequences""" - self.fail() - - res = tmap(None, [1, 2, 3, 4]) - res2 = tmap(None, [1, 2, 3, 4], [22, 33, 44, 55]) - res3 = tmap(None, [1, 2, 3, 4], [22, 33, 44, 55, 66]) - res4 = tmap(None, [1, 2, 3, 4, 5], [22, 33, 44, 55]) - - self.assertEqual([1, 2, 3, 4], res) - self.assertEqual([(1, 22), (2, 33), (3, 44), (4, 55)], res2) - self.assertEqual([(1, 22), (2, 33), (3, 44), (4, 55), (None, 66)], res3) - self.assertEqual([(1, 22), (2, 33), (3, 44), (4, 55), (5, None)], res4) - - def test_tmap__wait(self): - r = range(1000) - wq, results = tmap(lambda x: x, r, num_workers=5, wait=False) - wq.wait() - r2 = map(lambda x: x.result, results) - self.assertEqual(list(r), list(r2)) - - def test_FuncResult(self): - """Ensure FuncResult sets its result and exception attributes""" - # Results are stored in result attribute - fr = FuncResult(lambda x: x + 1) - fr(2) - - self.assertEqual(fr.result, 3) - - # Exceptions are store in exception attribute - self.assertIsNone(fr.exception, "no exception should be raised") - - exception = ValueError("rast") - - def x(sdf): - raise exception - - fr = FuncResult(x) - fr(None) - - self.assertIs(fr.exception, exception) - - -################################################################################ - -if __name__ == "__main__": - unittest.main()