diff --git a/python/ray/_private/ray_logging.py b/python/ray/_private/ray_logging.py index e9ee12195340b..5699251e31ced 100644 --- a/python/ray/_private/ray_logging.py +++ b/python/ray/_private/ray_logging.py @@ -284,11 +284,16 @@ def deduplicate(self, batch: LogBatch) -> List[LogBatch]: continue dedup_key = _canonicalise_log_line(line) + if dedup_key == "": + # Don't dedup messages that are empty after canonicalization. + # Because that's all the information users want to see. + output[0]["lines"].append(line) + continue + if dedup_key in self.recent: sources = self.recent[dedup_key].sources sources.add(source) - # We deduplicate the warnings/errorm essages from - # raylet by default. + # We deduplicate the warnings/error messages from raylet by default. if len(sources) > 1 or batch["pid"] == "raylet": state = self.recent[dedup_key] self.recent[dedup_key] = DedupState( diff --git a/python/ray/tests/test_log_dedup.py b/python/ray/tests/test_log_dedup.py index c1353d7c9458f..833af274bea0d 100644 --- a/python/ray/tests/test_log_dedup.py +++ b/python/ray/tests/test_log_dedup.py @@ -19,6 +19,60 @@ def test_nodedup_logs_single_process(): assert out1 == [batch1] +def test_nodedup_logs_buffer_only_lines(): + now = 142300000.0 + + def gettime(): + return now + + dedup = LogDeduplicator(5, None, None, _timesource=gettime) + batch1 = { + "ip": "node1", + "pid": 100, + # numbers are canonicalised, so this would lead to empty dedup_key + "lines": ["1"], + } + + # Immediately prints always. + out1 = dedup.deduplicate(batch1) + assert out1 == [batch1] + + now += 1.0 + + # Should print new lines even if it is number only again + batch2 = { + "ip": "node2", + "pid": 200, + "lines": ["2"], + } + out2 = dedup.deduplicate(batch2) + assert out2 == [ + { + "ip": "node2", + "pid": 200, + "lines": ["2"], + } + ] + + now += 3.0 + + # Should print new lines even if it is same number + batch3 = { + "ip": "node3", + "pid": 300, + "lines": ["2"], + } + # Should buffer duplicates. + out3 = dedup.deduplicate(batch3) + assert out3 == [ + { + "ip": "node3", + "pid": 300, + "lines": ["2"], + } + ] + + def test_dedup_logs_multiple_processes(): now = 142300000.0