-
Notifications
You must be signed in to change notification settings - Fork 846
/
Copy pathmlperf_logger.py
112 lines (79 loc) · 2.8 KB
/
mlperf_logger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# @lint-ignore-every LICENSELINT
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
"""
Utilities for MLPerf logging
"""
import os
import torch
try:
from mlperf_logging import mllog
from mlperf_logging.mllog import constants
_MLLOGGER = mllog.get_mllogger()
except ImportError as error:
print("Unable to import mlperf_logging, ", error)
def log_start(*args, **kwargs):
"log with start tag"
_log_print(_MLLOGGER.start, *args, **kwargs)
def log_end(*args, **kwargs):
"log with end tag"
_log_print(_MLLOGGER.end, *args, **kwargs)
def log_event(*args, **kwargs):
"log with event tag"
_log_print(_MLLOGGER.event, *args, **kwargs)
def _log_print(logger, *args, **kwargs):
"makes mlperf logger aware of distributed execution"
if "stack_offset" not in kwargs:
kwargs["stack_offset"] = 3
if "value" not in kwargs:
kwargs["value"] = None
if kwargs.pop("log_all_ranks", False):
log = True
else:
log = get_rank() == 0
if log:
logger(*args, **kwargs)
def config_logger(benchmark):
"initiates mlperf logger"
mllog.config(
filename=os.path.join(
os.path.dirname(os.path.abspath(__file__)), f"{benchmark}.log"
)
)
_MLLOGGER.logger.propagate = False
def barrier():
"""
Works as a temporary distributed barrier, currently pytorch
doesn't implement barrier for NCCL backend.
Calls all_reduce on dummy tensor and synchronizes with GPU.
"""
if torch.distributed.is_available() and torch.distributed.is_initialized():
torch.distributed.all_reduce(torch.cuda.FloatTensor(1))
torch.cuda.synchronize()
def get_rank():
"""
Gets distributed rank or returns zero if distributed is not initialized.
"""
if torch.distributed.is_available() and torch.distributed.is_initialized():
rank = torch.distributed.get_rank()
else:
rank = 0
return rank
def mlperf_submission_log(benchmark):
"""
Logs information needed for MLPerf submission
"""
config_logger(benchmark)
log_event(
key=constants.SUBMISSION_BENCHMARK,
value=benchmark,
)
log_event(key=constants.SUBMISSION_ORG, value="reference_implementation")
log_event(key=constants.SUBMISSION_DIVISION, value="closed")
log_event(key=constants.SUBMISSION_STATUS, value="onprem")
log_event(key=constants.SUBMISSION_PLATFORM, value="reference_implementation")
log_event(key=constants.SUBMISSION_ENTRY, value="reference_implementation")
log_event(key=constants.SUBMISSION_POC_NAME, value="reference_implementation")
log_event(key=constants.SUBMISSION_POC_EMAIL, value="reference_implementation")