Skip to content

Commit

Permalink
[ServiceBus] stress optional rotate logs (Azure#38871)
Browse files Browse the repository at this point in the history
* [ServiceBus] make rotating logs optional

* add todo

* only add handler to loggers if one does not exist

* TimedRotatingFileHandler -> RotatingFileHandler
  • Loading branch information
swathipil authored Jan 8, 2025
1 parent 90da0a6 commit d26753e
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 22 deletions.
52 changes: 33 additions & 19 deletions sdk/servicebus/azure-servicebus/stress/scripts/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import os
import sys
import logging
from logging.handlers import TimedRotatingFileHandler
from logging.handlers import RotatingFileHandler

from opencensus.ext.azure.log_exporter import AzureLogHandler


def get_base_logger(log_filename, logger_name, level=logging.ERROR, print_console=False, log_format=None):
def get_base_logger(log_filename, logger_name, level=logging.ERROR, print_console=False, log_format=None, rotating_logs=True):
logger = logging.getLogger(logger_name)
logger.setLevel(level)
formatter = log_format or logging.Formatter(
Expand All @@ -24,20 +23,24 @@ def get_base_logger(log_filename, logger_name, level=logging.ERROR, print_consol
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
else:
# rotated hourly if small file, o/w rotated bi-hourly
if level == logging.DEBUG or level == logging.INFO:
time = 30
if rotating_logs:
if not logger.handlers:
# 5 MB max file size, 350 files max
mb = 50
bytes_in_mb = 1_048_576
bytes = mb * bytes_in_mb
file_handler = RotatingFileHandler(log_filename, maxBytes=bytes, backupCount=300)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
else:
time = 60
file_handler = TimedRotatingFileHandler(log_filename, when="M", interval=time, utc=True)
if not logger.handlers:
file_handler = logging.FileHandler(log_filename)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)

return logger


def get_logger(log_filename, logger_name, level=logging.ERROR, print_console=False, log_format=None):
def get_logger(log_filename, logger_name, level=logging.ERROR, print_console=False, log_format=None, rotating_logs=True):
stress_logger = logging.getLogger(logger_name)
stress_logger.setLevel(level)
servicebus_logger = logging.getLogger("azure.servicebus")
Expand All @@ -48,16 +51,27 @@ def get_logger(log_filename, logger_name, level=logging.ERROR, print_console=Fal
formatter = log_format or logging.Formatter(
"%(asctime)s - [%(thread)d.%(threadName)s] - %(name)-12s %(levelname)-8s %(funcName)s(%(lineno)d) %(message)s"
)
# rotated hourly if small file, o/w rotated bi-hourly
if level == logging.DEBUG or level == logging.INFO:
time = 30
if rotating_logs:
# If any do not have handlers, create a new file handler and add.
if not servicebus_logger.handlers or not pyamqp_logger.handlers or not stress_logger.handlers:
# 5 MB max file size, 350 files max
mb = 50
bytes_in_mb = 1_048_576
bytes = mb * bytes_in_mb
file_handler = RotatingFileHandler(log_filename, maxBytes=bytes, backupCount=300)
file_handler.setFormatter(formatter)
if not servicebus_logger.handlers:
servicebus_logger.addHandler(file_handler)
if not pyamqp_logger.handlers:
pyamqp_logger.addHandler(file_handler)
if not stress_logger.handlers:
stress_logger.addHandler(file_handler)
else:
time = 60
file_handler = TimedRotatingFileHandler(log_filename, when="M", interval=time, utc=True)
file_handler.setFormatter(formatter)
servicebus_logger.addHandler(file_handler)
pyamqp_logger.addHandler(file_handler)
stress_logger.addHandler(file_handler)
console_handler = logging.FileHandler(log_filename)
console_handler.setFormatter(formatter)
servicebus_logger.addHandler(console_handler)
pyamqp_logger.addHandler(console_handler)
stress_logger.addHandler(console_handler)

return stress_logger

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def __init__(
process_monitor=None,
logging_level=logging.ERROR,
transport_type=False,
rotating_logs=True,
):
self.senders = senders
self.receivers = receivers
Expand All @@ -113,12 +114,13 @@ def __init__(
self.send_session_id = send_session_id
self.azure_monitor_metric = azure_monitor_metric or AbstractMonitorMetric("fake_test_name")
self.logging_level = logging_level
self.rotating_logs = rotating_logs
logfile_name = LOGFILE_NAME
if transport_type:
logfile_name += "_ws.log"
else:
logfile_name += ".log"
self.logger = get_logger(logfile_name, "stress_test", self.logging_level)
self.logger = get_logger(logfile_name, "stress_test", self.logging_level, rotating_logs=self.rotating_logs)
self.process_monitor = process_monitor or ProcessMonitor(
"monitor_{}".format(logfile_name),
"test_stress_queues",
Expand Down Expand Up @@ -349,6 +351,7 @@ def __init__(
process_monitor=None,
logging_level=logging.ERROR,
transport_type=False,
rotating_logs=True,
):
super(StressTestRunnerAsync, self).__init__(
senders,
Expand All @@ -369,6 +372,7 @@ def __init__(
process_monitor=process_monitor,
logging_level=logging_level,
transport_type=transport_type,
rotating_logs=rotating_logs,
)

async def _send_async(self, sender, end_time):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def test_stress_queue_send_and_receive(args):
azure_monitor_metric=AzureMonitorMetric("test_stress_queue_send_and_receive"),
logging_level=LOGGING_LEVEL,
transport_type=args.transport,
rotating_logs=args.rotating_logs,
)

result = stress_test.run()
Expand All @@ -63,6 +64,7 @@ def test_stress_queue_send_and_pull_receive(args):
azure_monitor_metric=AzureMonitorMetric("test_stress_queue_send_and_pull_receive"),
logging_level=LOGGING_LEVEL,
transport_type=args.transport,
rotating_logs=args.rotating_logs,
)

result = stress_test.run()
Expand All @@ -87,6 +89,7 @@ def test_stress_queue_batch_send_and_receive(args):
azure_monitor_metric=AzureMonitorMetric("test_stress_queue_batch_send_and_receive"),
logging_level=LOGGING_LEVEL,
transport_type=args.transport,
rotating_logs=args.rotating_logs,
)

result = stress_test.run()
Expand All @@ -111,6 +114,7 @@ def test_stress_queue_slow_send_and_receive(args):
azure_monitor_metric=AzureMonitorMetric("test_stress_queue_slow_send_and_receive"),
logging_level=LOGGING_LEVEL,
transport_type=args.transport,
rotating_logs=args.rotating_logs,
)

result = stress_test.run()
Expand Down Expand Up @@ -140,6 +144,7 @@ def test_stress_queue_receive_and_delete(args):
azure_monitor_metric=AzureMonitorMetric("test_stress_queue_slow_send_and_receive"),
logging_level=LOGGING_LEVEL,
transport_type=args.transport,
rotating_logs=args.rotating_logs,
)

result = stress_test.run()
Expand All @@ -164,6 +169,7 @@ def test_stress_queue_unsettled_messages(args):
azure_monitor_metric=AzureMonitorMetric("test_stress_queue_unsettled_messages"),
logging_level=LOGGING_LEVEL,
transport_type=args.transport,
rotating_logs=args.rotating_logs,
)

result = stress_test.run()
Expand All @@ -190,6 +196,7 @@ def test_stress_queue_receive_large_batch_size(args):
azure_monitor_metric=AzureMonitorMetric("test_stress_queue_receive_large_batch_size"),
logging_level=LOGGING_LEVEL,
transport_type=args.transport,
rotating_logs=args.rotating_logs,
)

result = stress_test.run()
Expand Down Expand Up @@ -224,6 +231,7 @@ def test_stress_queue_pull_receive_timeout(args):
azure_monitor_metric=AzureMonitorMetric("test_stress_queue_pull_receive_timeout"),
logging_level=LOGGING_LEVEL,
transport_type=args.transport,
rotating_logs=args.rotating_logs,
)

result = stress_test.run()
Expand Down Expand Up @@ -256,6 +264,7 @@ def test_stress_queue_long_renew_send_and_receive(args):
azure_monitor_metric=AzureMonitorMetric("test_stress_queue_long_renew_send_and_receive"),
logging_level=LOGGING_LEVEL,
transport_type=args.transport,
rotating_logs=args.rotating_logs,
)

result = stress_test.run()
Expand Down Expand Up @@ -299,6 +308,7 @@ def test_stress_queue_long_renew_session_send_and_receive(args):
azure_monitor_metric=AzureMonitorMetric("test_stress_queue_long_renew_session_send_and_receive"),
logging_level=LOGGING_LEVEL,
transport_type=args.transport,
rotating_logs=args.rotating_logs,
)

result = stress_test.run()
Expand Down Expand Up @@ -330,6 +340,7 @@ def test_stress_queue_peek_messages(args):
azure_monitor_metric=AzureMonitorMetric("test_stress_queue_peek_messages"),
logging_level=LOGGING_LEVEL,
transport_type=args.transport,
rotating_logs=args.rotating_logs,
)

result = stress_test.run()
Expand Down Expand Up @@ -370,6 +381,7 @@ def test_stress_queue_close_and_reopen(args):
azure_monitor_metric=AzureMonitorMetric("test_stress_queue_close_and_reopen"),
logging_level=LOGGING_LEVEL,
transport_type=args.transport,
rotating_logs=args.rotating_logs,
)

result = stress_test.run()
Expand Down Expand Up @@ -425,6 +437,7 @@ def test_stress_queue_check_for_dropped_messages(args):
azure_monitor_metric=AzureMonitorMetric("test_stress_queue_check_for_dropped_messages"),
logging_level=LOGGING_LEVEL,
transport_type=args.transport,
rotating_logs=args.rotating_logs,
)

result = stress_test.run()
Expand Down Expand Up @@ -465,8 +478,12 @@ def test_stress_queue_check_for_dropped_messages(args):
type=str,
default="Error",
)
# rotate logs by default, if you want to disable it, use --no-rotating-logs flag
parser.add_argument("--no-rotating-logs", action="store_true")

args, _ = parser.parse_known_args()
# store rotating_logs in args for later use
args.rotating_logs = not args.no_rotating_logs

if args.transport:
TRANSPORT_TYPE = TransportType.AmqpOverWebsocket
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ async def test_stress_queue_send_and_receive(args):
azure_monitor_metric=AzureMonitorMetric("test_stress_queue_send_and_receive"),
logging_level=LOGGING_LEVEL,
transport_type=args.transport,
rotating_logs=args.rotating_logs,
)

result = await stress_test.run_async()
Expand All @@ -66,6 +67,7 @@ async def test_stress_queue_send_and_pull_receive(args):
azure_monitor_metric=AzureMonitorMetric("test_stress_queue_send_and_pull_receive"),
logging_level=LOGGING_LEVEL,
transport_type=args.transport,
rotating_logs=args.rotating_logs,
)

result = await stress_test.run_async()
Expand All @@ -90,6 +92,7 @@ async def test_stress_queue_batch_send_and_receive(args):
azure_monitor_metric=AzureMonitorMetric("test_stress_queue_batch_send_and_receive"),
logging_level=LOGGING_LEVEL,
transport_type=args.transport,
rotating_logs=args.rotating_logs,
)

result = await stress_test.run_async()
Expand All @@ -114,6 +117,7 @@ async def test_stress_queue_slow_send_and_receive(args):
azure_monitor_metric=AzureMonitorMetric("test_stress_queue_slow_send_and_receive"),
logging_level=LOGGING_LEVEL,
transport_type=args.transport,
rotating_logs=args.rotating_logs,
)

result = await stress_test.run_async()
Expand Down Expand Up @@ -144,6 +148,7 @@ async def test_stress_queue_receive_and_delete(args):
azure_monitor_metric=AzureMonitorMetric("test_stress_queue_slow_send_and_receive"),
logging_level=LOGGING_LEVEL,
transport_type=args.transport,
rotating_logs=args.rotating_logs,
)

result = await stress_test.run_async()
Expand All @@ -168,6 +173,7 @@ async def test_stress_queue_unsettled_messages(args):
azure_monitor_metric=AzureMonitorMetric("test_stress_queue_unsettled_messages"),
logging_level=LOGGING_LEVEL,
transport_type=args.transport,
rotating_logs=args.rotating_logs,
)

result = await stress_test.run_async()
Expand All @@ -194,6 +200,7 @@ async def test_stress_queue_receive_large_batch_size(args):
azure_monitor_metric=AzureMonitorMetric("test_stress_queue_receive_large_batch_size"),
logging_level=LOGGING_LEVEL,
transport_type=args.transport,
rotating_logs=args.rotating_logs,
)

result = await stress_test.run_async()
Expand Down Expand Up @@ -228,6 +235,7 @@ async def test_stress_queue_pull_receive_timeout(args):
azure_monitor_metric=AzureMonitorMetric("test_stress_queue_pull_receive_timeout"),
logging_level=LOGGING_LEVEL,
transport_type=args.transport,
rotating_logs=args.rotating_logs,
)

result = await stress_test.run_async()
Expand Down Expand Up @@ -260,6 +268,7 @@ async def test_stress_queue_long_renew_send_and_receive(args):
azure_monitor_metric=AzureMonitorMetric("test_stress_queue_long_renew_send_and_receive"),
logging_level=LOGGING_LEVEL,
transport_type=args.transport,
rotating_logs=args.rotating_logs,
)

result = await stress_test.run_async()
Expand Down Expand Up @@ -303,6 +312,7 @@ async def test_stress_queue_long_renew_session_send_and_receive(args):
azure_monitor_metric=AzureMonitorMetric("test_stress_queue_long_renew_session_send_and_receive"),
logging_level=LOGGING_LEVEL,
transport_type=args.transport,
rotating_logs=args.rotating_logs,
)

result = await stress_test.run_async()
Expand Down Expand Up @@ -334,6 +344,7 @@ async def test_stress_queue_peek_messages(args):
azure_monitor_metric=AzureMonitorMetric("test_stress_queue_peek_messages"),
logging_level=LOGGING_LEVEL,
transport_type=args.transport,
rotating_logs=args.rotating_logs,
)

result = await stress_test.run_async()
Expand Down Expand Up @@ -374,6 +385,7 @@ async def test_stress_queue_close_and_reopen(args):
azure_monitor_metric=AzureMonitorMetric("test_stress_queue_close_and_reopen"),
logging_level=LOGGING_LEVEL,
transport_type=args.transport,
rotating_logs=args.rotating_logs,
)

result = await stress_test.run_async()
Expand Down Expand Up @@ -429,6 +441,7 @@ async def test_stress_queue_check_for_dropped_messages(args):
azure_monitor_metric=AzureMonitorMetric("test_stress_queue_check_for_dropped_messages"),
logging_level=LOGGING_LEVEL,
transport_type=args.transport,
rotating_logs=args.rotating_logs,
)

result = await stress_test.run_async()
Expand Down Expand Up @@ -512,7 +525,12 @@ async def run(args):
type=str,
default="Error",
)
# rotate logs by default, if you want to disable it, use --no-rotating-logs flag
parser.add_argument("--no-rotating-logs", action="store_true")

args, _ = parser.parse_known_args()
# store rotating_logs in args for later use
args.rotating_logs = not args.no_rotating_logs

if args.transport:
TRANSPORT_TYPE = TransportType.AmqpOverWebsocket
Expand Down
5 changes: 3 additions & 2 deletions sdk/servicebus/azure-servicebus/stress/templates/testjob.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,13 @@ spec:
command: ['bash', '-c', 'mkdir -p $DEBUG_SHARE && python3 test_stress_queues.py --method send_receive_batch --duration 300000 --logging-enable --debug_level Debug ']
{{- end -}}

# TODO: Running with both memray and RotatingFileHandler is resulting in a memory leak. Turn off rotating logs as a workaround to avoid the memory leak when running memray.
{{ if eq .Stress.testTarget "amemray" }}
command: ['bash', '-c', 'mkdir -p $DEBUG_SHARE && memray run --output $DEBUG_SHARE/sb_async_memray_output.bin test_stress_queues_async.py --method send_pull_receive --duration 300000 --logging-enable --debug_level Debug']
command: ['bash', '-c', 'mkdir -p $DEBUG_SHARE && memray run --output $DEBUG_SHARE/sb_async_memray_output.bin test_stress_queues_async.py --method send_pull_receive --duration 300000 --logging-enable --debug_level Debug --no-rotating-logs']
{{- end -}}

{{ if eq .Stress.testTarget "memray" }}
command: ['bash', '-c', 'mkdir -p $DEBUG_SHARE && memray run --output $DEBUG_SHARE/sb_memray_output.bin test_stress_queues.py --method send_pull_receive --duration 300000 --logging-enable --debug_level Debug']
command: ['bash', '-c', 'mkdir -p $DEBUG_SHARE && memray run --output $DEBUG_SHARE/sb_memray_output.bin test_stress_queues.py --method send_pull_receive --duration 300000 --logging-enable --debug_level Debug --no-rotating-logs']
{{- end -}}

{{- include "stress-test-addons.container-env" . | nindent 6 }}
Expand Down

0 comments on commit d26753e

Please sign in to comment.