Skip to content

Commit

Permalink
Add initial ReplicationLogAnalyzer logic
Browse files Browse the repository at this point in the history
  • Loading branch information
droideck committed Dec 24, 2024
1 parent f5732e1 commit 960b13a
Show file tree
Hide file tree
Showing 2 changed files with 278 additions and 0 deletions.
58 changes: 58 additions & 0 deletions src/lib389/lib389/cli_conf/replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from lib389.cli_base import _get_arg, CustomHelpFormatter
from lib389.utils import is_a_dn, copy_with_permissions, ds_supports_new_changelog, get_passwd_from_file
from lib389.replica import Replicas, ReplicationMonitor, BootstrapReplicationManager, Changelog5, ChangelogLDIF, Changelog
from lib389.repltools import ReplicationLogAnalyzer
from lib389.tasks import CleanAllRUVTask, AbortCleanAllRUVTask
from lib389._mapped_object import DSLdapObjects

Expand Down Expand Up @@ -1276,6 +1277,38 @@ def restore_cl_ldif(inst, basedn, log, args):
os.rename(f'{target_ldif}.backup', target_ldif)


def analyze_replication_logs(inst, log, args):
"""Analyze replication logs across multiple servers"""
try:
analyzer = ReplicationLogAnalyzer(
args.log_dirs,
anonymous=args.anonymous
)

generated_files = analyzer.generate_report(
args.output_dir,
formats=args.formats,
start_time=args.start_time,
end_time=args.end_time,
repl_lag_threshold=args.repl_lag_threshold,
report_name=args.report_name
)

# Get and display statistics
stats = analyzer.get_lag_statistics()

log.info('Replication analysis completed successfully')
log.info(f'Generated files: {", ".join(generated_files.values())}')
log.info('Statistics:')
log.info(f' Average lag: {stats["avg_lag"]:.2f} seconds')
log.info(f' Maximum lag: {stats["max_lag"]:.2f} seconds')
log.info(f' Total updates analyzed: {stats["total_updates"]}')

except Exception as e:
log.error(f'Failed to analyze replication logs: {e}')
raise


def create_parser(subparsers):

############################################
Expand Down Expand Up @@ -1777,3 +1810,28 @@ def create_parser(subparsers):
task_abort_cleanallruv_list = task_subcommands.add_parser('list-abortruv-tasks', help='List all the running CleanAllRUV abort tasks', formatter_class=CustomHelpFormatter)
task_abort_cleanallruv_list.set_defaults(func=list_abort_cleanallruv)
task_abort_cleanallruv_list.add_argument('--suffix', help="Lists only tasks for the specified suffix")

# Add new analysis commands
analyze_parser = subparsers.add_parser('analyze-logs',
help='Analyze replication logs across multiple servers')
analyze_parser.add_argument('--log-dirs', required=True, nargs='+',
help='Directories containing server logs')
analyze_parser.add_argument('--output-dir', required=True,
help='Output directory for analysis reports')
analyze_parser.add_argument('--formats', nargs='+',
default=['html'],
choices=['csv', 'png', 'html'],
help='Output formats for analysis')
analyze_parser.add_argument('--start-time',
help='Start time filter (YYYY-MM-DD HH:MM:SS)')
analyze_parser.add_argument('--end-time',
help='End time filter (YYYY-MM-DD HH:MM:SS)')
analyze_parser.add_argument('--repl-lag-threshold', type=float,
default=0,
help='Replication lag threshold in seconds')
analyze_parser.add_argument('--anonymous', action='store_true',
help='Anonymize server names in reports')
analyze_parser.add_argument('--report-name',
default='replication_analysis',
help='Base name for report files')
analyze_parser.set_defaults(func=analyze_replication_logs)
220 changes: 220 additions & 0 deletions src/lib389/lib389/repltools.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import logging
from lib389._constants import *
from lib389.properties import *
from typing import List, Dict, Optional, Union

logging.getLogger(__name__).setLevel(logging.INFO)
log = logging.getLogger(__name__)
Expand Down Expand Up @@ -304,3 +305,222 @@ def createReplManager(server, repl_manager_dn=None, repl_manager_pw=None):
'passwordExpirationTime': '20381010000000Z'}
server.setupBindDN(repl_manager_dn, repl_manager_pw, attrs)



class ReplicationLogAnalyzer:
"""Analyzes replication logs across multiple servers in a topology.
This class processes access logs from multiple Directory Server instances
to analyze replication performance, lag times, and potential issues.
"""

def __init__(self, log_dirs: List[str], anonymous: bool = False):
"""Initialize the replication log analyzer
:param log_dirs: List of directories containing server logs
:type log_dirs: list
:param anonymous: Whether to anonymize server names in reports
:type anonymous: bool
"""
self.log_dirs = log_dirs
self.anonymous = anonymous
self.servers_data = {}
self._logger = logging.getLogger(__name__)

def process_logs(self) -> Dict:
"""Process all logs from provided directories
:returns: Dictionary containing processed replication data
:raises: ValueError if no valid logs found
OSError if log files cannot be accessed
"""
for log_dir in self.log_dirs:
server_name = os.path.basename(log_dir)
log_files = self._get_log_files(log_dir)

if not log_files:
self._logger.warning(f"No valid log files found in {log_dir}")
continue

parser = ReplLag({
'server_name': server_name,
'logfiles': log_files,
'anonymous': self.anonymous
})

try:
parser.parse_files()
self.servers_data[server_name] = parser.build_result()
except Exception as e:
self._logger.error(f"Failed to process logs for {server_name}: {e}")
raise

if not self.servers_data:
raise ValueError("No valid replication data found in any log directory")

return self._merge_results()

def _get_log_files(self, log_dir: str) -> List[str]:
"""Get all relevant log files from directory
:param log_dir: Directory containing log files
:type log_dir: str
:returns: List of log file paths sorted by name
:raises: OSError if directory cannot be accessed
"""
if not os.path.exists(log_dir):
raise OSError(f"Log directory does not exist: {log_dir}")

log_files = []
for file in os.listdir(log_dir):
if file.startswith('access'):
full_path = os.path.join(log_dir, file)
if os.path.isfile(full_path) and os.access(full_path, os.R_OK):
log_files.append(full_path)
else:
self._logger.warning(f"Cannot access log file: {full_path}")

return sorted(log_files)

def _merge_results(self) -> Dict:
"""Merge results from all servers into a consolidated report
:returns: Merged replication data dictionary
:raises: ValueError if no data to merge
"""
if not self.servers_data:
raise ValueError("No replication data available to merge")

merged = {
'start-time': min(data['start-time'] for data in self.servers_data.values()),
'servers': list(self.servers_data.keys()),
'lag': {},
'metadata': {
'analyzed_at': datetime.datetime.now().isoformat(),
'server_count': len(self.servers_data),
'anonymous': self.anonymous
}
}

for server_name, server_data in self.servers_data.items():
for csn, lag_data in server_data['lag'].items():
if csn not in merged['lag']:
merged['lag'][csn] = {}
merged['lag'][csn].update({
server_name: lag_data
})

return merged

def generate_report(self,
output_dir: str,
formats: List[str] = ['html'],
start_time: Optional[str] = None,
end_time: Optional[str] = None,
repl_lag_threshold: float = 0,
report_name: str = 'replication_analysis') -> Dict[str, str]:
"""Generate replication analysis report in specified formats
:param output_dir: Directory for output files
:type output_dir: str
:param formats: List of output formats ('csv', 'png', 'html')
:type formats: list
:param start_time: Start time filter (YYYY-MM-DD HH:MM:SS)
:type start_time: str
:param end_time: End time filter (YYYY-MM-DD HH:MM:SS)
:type end_time: str
:param repl_lag_threshold: Replication lag threshold in seconds
:type repl_lag_threshold: float
:param report_name: Base name for report files
:type report_name: str
:returns: Dictionary mapping formats to generated file paths
:raises: OSError if output directory issues
ValueError if invalid parameters
"""
if not os.path.exists(output_dir):
try:
os.makedirs(output_dir)
except OSError as e:
raise OSError(f"Failed to create output directory: {e}")

data = self.process_logs()

lag_info = LagInfo({
'start_time': start_time,
'end_time': end_time,
'repl_lag_threshold': repl_lag_threshold,
'only_fully_replicated': False,
'only_not_replicated': False,
'utc_offset': None
})

generated_files = {}

for fmt in formats:
if fmt not in ['csv', 'png', 'html']:
self._logger.warning(f"Unsupported format ignored: {fmt}")
continue

output_path = os.path.join(output_dir, f'{report_name}.{fmt}')
try:
if fmt == 'csv':
lag_info.plot_lag_csv(output_path)
elif fmt == 'png':
lag_info.plot_lag_png(output_path)
elif fmt == 'html':
lag_info.plot_interactive_html(output_path)
generated_files[fmt] = output_path
except Exception as e:
self._logger.error(f"Failed to generate {fmt} report: {e}")
raise

# Generate summary JSON
summary_path = os.path.join(output_dir, f'{report_name}_summary.json')
try:
with open(summary_path, 'w') as f:
json.dump({
'analysis_summary': {
'total_servers': len(self.servers_data),
'analyzed_logs': sum(len(d.get('log-files', []))
for d in self.servers_data.values()),
'time_range': {
'start': start_time,
'end': end_time
},
'lag_threshold': repl_lag_threshold,
'generated_files': generated_files
}
}, f, indent=2)
generated_files['summary'] = summary_path
except Exception as e:
self._logger.error(f"Failed to write summary JSON: {e}")
raise

return generated_files

def get_lag_statistics(self) -> Dict[str, Union[float, int]]:
"""Calculate replication lag statistics across all servers
:returns: Dictionary containing lag statistics
"""
data = self.process_logs()
lag_times = []

for csn_data in data['lag'].values():
for server_data in csn_data.values():
if 'logtime' in server_data:
lag_times.append(float(server_data['logtime']))

if not lag_times:
return {
'min_lag': 0,
'max_lag': 0,
'avg_lag': 0,
'total_updates': 0
}

return {
'min_lag': min(lag_times),
'max_lag': max(lag_times),
'avg_lag': sum(lag_times) / len(lag_times),
'total_updates': len(lag_times)
}

0 comments on commit 960b13a

Please sign in to comment.