diff --git a/CHANGELOG b/CHANGELOG index 605c23c..da29421 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,5 @@ +0.20.3 + - enh: improve strategy for stalling for slow writer 0.20.2 - ref: remove __init__ from SegmentThresh 0.20.1 diff --git a/src/dcnum/feat/event_extractor_manager_thread.py b/src/dcnum/feat/event_extractor_manager_thread.py index 7398bf6..a0e96c6 100644 --- a/src/dcnum/feat/event_extractor_manager_thread.py +++ b/src/dcnum/feat/event_extractor_manager_thread.py @@ -95,12 +95,14 @@ def run(self): while True: # If the writer_dq starts filling up, then this could lead to # an oom-kill signal. Stall for the writer to prevent this. - ldq = len(self.writer_dq) - if ldq > 1000: - stallsec = ldq / 1000 + if (ldq := len(self.writer_dq)) > 1000: + time.sleep(1) + ldq2 = len(self.writer_dq) + stall_time = (ldq2 - 200) / (ldq - ldq2) if ldq2 > 200 else 0 + time.sleep(stall_time) self.logger.warning( - f"Stalling {stallsec:.1f}s for slow writer") - time.sleep(stallsec) + f"Stalled {stall_time + 1:.1f}s for slow writer " + f"({ldq} chunks queued)") cur_slot = 0 unavailable_slots = 0