From b3a97fa41dafb5c0d490f67b03fa5f13d38a3972 Mon Sep 17 00:00:00 2001 From: Nicolas Chiaruttini Date: Wed, 3 Jun 2026 23:36:00 +0200 Subject: [PATCH] Fix spurious "thread death" TOCTOU race in python_worker (#15) In Worker._process_input(), non-main tasks assigned task._thread before calling start(). The janitor thread (_cleanup_threads) flags any task where task._thread is set and the thread is not alive. A just-constructed, not-yet-started thread is not alive, so if the janitor's 0.05s poll landed in the window between Thread() construction and start(), it would wrongly fail the task with "thread death" even though the thread was about to run fine. Assign task._thread only after start() returns, closing the window. Also add test_thread_death_stress, which floods the worker with many concurrent tiny tasks (none of which can legitimately die) to surface the race. It failed reliably before the fix and passes consistently after. Co-Authored-By: Claude Opus 4.8 --- src/appose/python_worker.py | 11 +++++++++-- tests/test_service.py | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 2 deletions(-) diff --git a/src/appose/python_worker.py b/src/appose/python_worker.py index d4c2429..12c1371 100644 --- a/src/appose/python_worker.py +++ b/src/appose/python_worker.py @@ -222,8 +222,15 @@ def _process_input(self) -> None: else: # Create a thread and save a reference to it, in case its script # kills the thread. This happens e.g. if it calls sys.exit. - task._thread = Thread(target=task._run, name=f"Appose-{uuid}") - task._thread.start() + # + # Assign task._thread only AFTER start() returns. Otherwise the + # janitor (_cleanup_threads) can observe task._thread set while + # the thread is not yet alive (the window between Thread() + # construction and start()) and spuriously fail the task with + # "thread death". See apposed/appose#15. + t = Thread(target=task._run, name=f"Appose-{uuid}") + t.start() + task._thread = t elif request_type == RequestType.CANCEL: task = self.tasks.get(uuid) diff --git a/tests/test_service.py b/tests/test_service.py index 5a4afcc..798d577 100644 --- a/tests/test_service.py +++ b/tests/test_service.py @@ -354,3 +354,40 @@ def test_task_result_null(): # result() should return None. assert task.result() is None + + +def test_thread_death_stress(): + """Floods the worker with many concurrent tiny tasks to surface the + spurious 'thread death' race (apposed/appose#15). No task here can + legitimately die, so any 'thread death' is the bug.""" + import threading + env = appose.system() + n_threads = 16 + n_tasks = 200 # per thread + errors = [] + err_lock = threading.Lock() + submit_lock = threading.Lock() # serialize stdin writes only + + with env.python() as service: + maybe_debug(service) + + def worker(): + for _ in range(n_tasks): + with submit_lock: + task = service.task("task.outputs['result'] = 1") + task.start() + try: + task.wait_for() + except Exception as e: + with err_lock: + errors.append(str(e)) + + threads = [threading.Thread(target=worker) for _ in range(n_threads)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not errors, ( + f"{len(errors)}/{n_threads * n_tasks} tasks failed; sample: {errors[:5]}" + )