diff --git a/src/lib389/lib389/cli_conf/replication.py b/src/lib389/lib389/cli_conf/replication.py index 53a18ced6..b0447bc6c 100644 --- a/src/lib389/lib389/cli_conf/replication.py +++ b/src/lib389/lib389/cli_conf/replication.py @@ -18,6 +18,7 @@ from lib389.cli_base import _get_arg, CustomHelpFormatter from lib389.utils import is_a_dn, copy_with_permissions, ds_supports_new_changelog, get_passwd_from_file from lib389.replica import Replicas, ReplicationMonitor, BootstrapReplicationManager, Changelog5, ChangelogLDIF, Changelog +from lib389.repltools import ReplicationLogAnalyzer from lib389.tasks import CleanAllRUVTask, AbortCleanAllRUVTask from lib389._mapped_object import DSLdapObjects @@ -1276,6 +1277,38 @@ def restore_cl_ldif(inst, basedn, log, args): os.rename(f'{target_ldif}.backup', target_ldif) +def analyze_replication_logs(inst, log, args): + """Analyze replication logs across multiple servers""" + try: + analyzer = ReplicationLogAnalyzer( + args.log_dirs, + anonymous=args.anonymous + ) + + generated_files = analyzer.generate_report( + args.output_dir, + formats=args.formats, + start_time=args.start_time, + end_time=args.end_time, + repl_lag_threshold=args.repl_lag_threshold, + report_name=args.report_name + ) + + # Get and display statistics + stats = analyzer.get_lag_statistics() + + log.info('Replication analysis completed successfully') + log.info(f'Generated files: {", ".join(generated_files.values())}') + log.info('Statistics:') + log.info(f' Average lag: {stats["avg_lag"]:.2f} seconds') + log.info(f' Maximum lag: {stats["max_lag"]:.2f} seconds') + log.info(f' Total updates analyzed: {stats["total_updates"]}') + + except Exception as e: + log.error(f'Failed to analyze replication logs: {e}') + raise + + def create_parser(subparsers): ############################################ @@ -1777,3 +1810,28 @@ def create_parser(subparsers): task_abort_cleanallruv_list = task_subcommands.add_parser('list-abortruv-tasks', help='List all the running CleanAllRUV abort tasks', formatter_class=CustomHelpFormatter) task_abort_cleanallruv_list.set_defaults(func=list_abort_cleanallruv) task_abort_cleanallruv_list.add_argument('--suffix', help="Lists only tasks for the specified suffix") + + # Add new analysis commands + analyze_parser = subparsers.add_parser('analyze-logs', + help='Analyze replication logs across multiple servers') + analyze_parser.add_argument('--log-dirs', required=True, nargs='+', + help='Directories containing server logs') + analyze_parser.add_argument('--output-dir', required=True, + help='Output directory for analysis reports') + analyze_parser.add_argument('--formats', nargs='+', + default=['html'], + choices=['csv', 'png', 'html'], + help='Output formats for analysis') + analyze_parser.add_argument('--start-time', + help='Start time filter (YYYY-MM-DD HH:MM:SS)') + analyze_parser.add_argument('--end-time', + help='End time filter (YYYY-MM-DD HH:MM:SS)') + analyze_parser.add_argument('--repl-lag-threshold', type=float, + default=0, + help='Replication lag threshold in seconds') + analyze_parser.add_argument('--anonymous', action='store_true', + help='Anonymize server names in reports') + analyze_parser.add_argument('--report-name', + default='replication_analysis', + help='Base name for report files') + analyze_parser.set_defaults(func=analyze_replication_logs) \ No newline at end of file diff --git a/src/lib389/lib389/repltools.py b/src/lib389/lib389/repltools.py index 6f83e40a2..39d43615e 100644 --- a/src/lib389/lib389/repltools.py +++ b/src/lib389/lib389/repltools.py @@ -14,6 +14,7 @@ import logging from lib389._constants import * from lib389.properties import * +from typing import List, Dict, Optional, Union logging.getLogger(__name__).setLevel(logging.INFO) log = logging.getLogger(__name__) @@ -304,3 +305,222 @@ def createReplManager(server, repl_manager_dn=None, repl_manager_pw=None): 'passwordExpirationTime': '20381010000000Z'} server.setupBindDN(repl_manager_dn, repl_manager_pw, attrs) + + +class ReplicationLogAnalyzer: + """Analyzes replication logs across multiple servers in a topology. + This class processes access logs from multiple Directory Server instances + to analyze replication performance, lag times, and potential issues. + """ + + def __init__(self, log_dirs: List[str], anonymous: bool = False): + """Initialize the replication log analyzer + + :param log_dirs: List of directories containing server logs + :type log_dirs: list + :param anonymous: Whether to anonymize server names in reports + :type anonymous: bool + """ + self.log_dirs = log_dirs + self.anonymous = anonymous + self.servers_data = {} + self._logger = logging.getLogger(__name__) + + def process_logs(self) -> Dict: + """Process all logs from provided directories + + :returns: Dictionary containing processed replication data + :raises: ValueError if no valid logs found + OSError if log files cannot be accessed + """ + for log_dir in self.log_dirs: + server_name = os.path.basename(log_dir) + log_files = self._get_log_files(log_dir) + + if not log_files: + self._logger.warning(f"No valid log files found in {log_dir}") + continue + + parser = ReplLag({ + 'server_name': server_name, + 'logfiles': log_files, + 'anonymous': self.anonymous + }) + + try: + parser.parse_files() + self.servers_data[server_name] = parser.build_result() + except Exception as e: + self._logger.error(f"Failed to process logs for {server_name}: {e}") + raise + + if not self.servers_data: + raise ValueError("No valid replication data found in any log directory") + + return self._merge_results() + + def _get_log_files(self, log_dir: str) -> List[str]: + """Get all relevant log files from directory + + :param log_dir: Directory containing log files + :type log_dir: str + :returns: List of log file paths sorted by name + :raises: OSError if directory cannot be accessed + """ + if not os.path.exists(log_dir): + raise OSError(f"Log directory does not exist: {log_dir}") + + log_files = [] + for file in os.listdir(log_dir): + if file.startswith('access'): + full_path = os.path.join(log_dir, file) + if os.path.isfile(full_path) and os.access(full_path, os.R_OK): + log_files.append(full_path) + else: + self._logger.warning(f"Cannot access log file: {full_path}") + + return sorted(log_files) + + def _merge_results(self) -> Dict: + """Merge results from all servers into a consolidated report + + :returns: Merged replication data dictionary + :raises: ValueError if no data to merge + """ + if not self.servers_data: + raise ValueError("No replication data available to merge") + + merged = { + 'start-time': min(data['start-time'] for data in self.servers_data.values()), + 'servers': list(self.servers_data.keys()), + 'lag': {}, + 'metadata': { + 'analyzed_at': datetime.datetime.now().isoformat(), + 'server_count': len(self.servers_data), + 'anonymous': self.anonymous + } + } + + for server_name, server_data in self.servers_data.items(): + for csn, lag_data in server_data['lag'].items(): + if csn not in merged['lag']: + merged['lag'][csn] = {} + merged['lag'][csn].update({ + server_name: lag_data + }) + + return merged + + def generate_report(self, + output_dir: str, + formats: List[str] = ['html'], + start_time: Optional[str] = None, + end_time: Optional[str] = None, + repl_lag_threshold: float = 0, + report_name: str = 'replication_analysis') -> Dict[str, str]: + """Generate replication analysis report in specified formats + + :param output_dir: Directory for output files + :type output_dir: str + :param formats: List of output formats ('csv', 'png', 'html') + :type formats: list + :param start_time: Start time filter (YYYY-MM-DD HH:MM:SS) + :type start_time: str + :param end_time: End time filter (YYYY-MM-DD HH:MM:SS) + :type end_time: str + :param repl_lag_threshold: Replication lag threshold in seconds + :type repl_lag_threshold: float + :param report_name: Base name for report files + :type report_name: str + :returns: Dictionary mapping formats to generated file paths + :raises: OSError if output directory issues + ValueError if invalid parameters + """ + if not os.path.exists(output_dir): + try: + os.makedirs(output_dir) + except OSError as e: + raise OSError(f"Failed to create output directory: {e}") + + data = self.process_logs() + + lag_info = LagInfo({ + 'start_time': start_time, + 'end_time': end_time, + 'repl_lag_threshold': repl_lag_threshold, + 'only_fully_replicated': False, + 'only_not_replicated': False, + 'utc_offset': None + }) + + generated_files = {} + + for fmt in formats: + if fmt not in ['csv', 'png', 'html']: + self._logger.warning(f"Unsupported format ignored: {fmt}") + continue + + output_path = os.path.join(output_dir, f'{report_name}.{fmt}') + try: + if fmt == 'csv': + lag_info.plot_lag_csv(output_path) + elif fmt == 'png': + lag_info.plot_lag_png(output_path) + elif fmt == 'html': + lag_info.plot_interactive_html(output_path) + generated_files[fmt] = output_path + except Exception as e: + self._logger.error(f"Failed to generate {fmt} report: {e}") + raise + + # Generate summary JSON + summary_path = os.path.join(output_dir, f'{report_name}_summary.json') + try: + with open(summary_path, 'w') as f: + json.dump({ + 'analysis_summary': { + 'total_servers': len(self.servers_data), + 'analyzed_logs': sum(len(d.get('log-files', [])) + for d in self.servers_data.values()), + 'time_range': { + 'start': start_time, + 'end': end_time + }, + 'lag_threshold': repl_lag_threshold, + 'generated_files': generated_files + } + }, f, indent=2) + generated_files['summary'] = summary_path + except Exception as e: + self._logger.error(f"Failed to write summary JSON: {e}") + raise + + return generated_files + + def get_lag_statistics(self) -> Dict[str, Union[float, int]]: + """Calculate replication lag statistics across all servers + + :returns: Dictionary containing lag statistics + """ + data = self.process_logs() + lag_times = [] + + for csn_data in data['lag'].values(): + for server_data in csn_data.values(): + if 'logtime' in server_data: + lag_times.append(float(server_data['logtime'])) + + if not lag_times: + return { + 'min_lag': 0, + 'max_lag': 0, + 'avg_lag': 0, + 'total_updates': 0 + } + + return { + 'min_lag': min(lag_times), + 'max_lag': max(lag_times), + 'avg_lag': sum(lag_times) / len(lag_times), + 'total_updates': len(lag_times) + }