From 9b2e21ab44d18d227cbb20a8091a1c407a96157a Mon Sep 17 00:00:00 2001 From: Aishwarya Mathuria Date: Tue, 12 Oct 2021 14:27:33 +0000 Subject: [PATCH 1/3] cluster: Addition of new tests to run Scrub and Recovery Add 2 new tests similar to the background Recovery test for running Scrub load with client IO and the other one for running Scrub, Recovery, and Client load. The Scrub and client IO test performs the following steps: - Create a pool and image to populate scrub objects (scrub pool) - Create scrub thread - Populate the scrub pool with objects using radosbench - Initiate deep-scrub on the scrub pool - Create a second pool and an image in it to run client IO - Initiate fio job on the second image at the same time the deep-scrub starts In the second test, we have an additional recovery pool that is populated after an OSD is marked down and out. Once the pool is populated we mark the OSD up and in which starts backfill. At the same time, we begin deep-scrub on the scrub pool and client IO. Signed-off-by: Aishwarya Mathuria --- benchmark/fio.py | 30 ++ benchmark/librbdfio.py | 39 ++ client_endpoints/ceph_client_endpoints.py | 11 + client_endpoints/client_endpoints.py | 3 + client_endpoints/librbd_client_endpoints.py | 3 + client_endpoints/rbdfuse_client_endpoints.py | 3 + .../rbdkernel_client_endpoints.py | 3 + client_endpoints/rbdnbd_client_endpoints.py | 3 + client_endpoints/rbdtcmu_client_endpoints.py | 3 + cluster/ceph.py | 356 +++++++++++++++++- 10 files changed, 445 insertions(+), 9 deletions(-) diff --git a/benchmark/fio.py b/benchmark/fio.py index b504b389..7ccfc631 100644 --- a/benchmark/fio.py +++ b/benchmark/fio.py @@ -66,6 +66,11 @@ def initialize_endpoints(self): # Create the recovery image based on test type requested if 'recovery_test' in self.cluster.config and self.recov_test_type == 'background': self.client_endpoints_object.create_recovery_image() + if 'scrubbing_test' in self.cluster.config: + self.client_endpoints_object.create_scrubbing_image() + if 'scrub_recov_test' in self.cluster.config: + self.client_endpoints_object.create_recovery_image() + self.client_endpoints_object.create_scrubbing_image() self.create_endpoints() def create_endpoints(self): @@ -213,6 +218,18 @@ def run(self): # Wait for signal to start client IO self.cluster.wait_start_io() + if 'scrubbing_test' in self.cluster.config: + logger.info('Scrubbing test in config') + scrubbing_callback = self.scrubbing_callback + self.cluster.create_scrubbing_test(self.run_dir, scrubbing_callback) + self.cluster.wait_start_io() + + if 'scrub_recov_test' in self.cluster.config: + logger.info('Scrub+Recov') + scrub_recov_callback = self.scrub_recov_callback + self.cluster.create_scrub_recovery_test(self.run_dir, scrub_recov_callback) + self.cluster.wait_start_io() + monitoring.start(self.run_dir) logger.info('Running fio %s test.', self.mode) @@ -225,6 +242,13 @@ def run(self): # If we were doing recovery, wait until it's done. if 'recovery_test' in self.cluster.config: self.cluster.wait_recovery_done() + # If we were doing scrubbing, wait until it's done. + if 'scrubbing_test' in self.cluster.config: + self.cluster.wait_scrubbing_done() + + if 'scrub_recov_test' in self.cluster.config: + self.cluster.wait_scrub_recovery_done() + monitoring.stop(self.run_dir) @@ -239,6 +263,12 @@ def recovery_callback_blocking(self): def recovery_callback_background(self): logger.info('Recovery thread completed!') + def scrubbing_callback(self): + logger.info('Scrubbing thread completed') + + def scrub_recov_callback(self): + logger.info('Scrub+Recovery thread completed') + def analyze(self, out_dir): logger.info('Convert results to json format.') for client in settings.getnodes('clients').split(','): diff --git a/benchmark/librbdfio.py b/benchmark/librbdfio.py index 84ea0de3..66e487c6 100644 --- a/benchmark/librbdfio.py +++ b/benchmark/librbdfio.py @@ -46,6 +46,8 @@ def __init__(self, archive_dir, cluster, config): self.use_existing_volumes = config.get('use_existing_volumes', False) self.pool_name = config.get("poolname", "cbt-librbdfio") self.recov_pool_name = config.get("recov_pool_name", "cbt-librbdfio-recov") + self.scrub_pool_name = config.get("scrub_pool_name", "cbt-librbdfio-scrub") + self.scrub_pool_profile = config.get("scrub_pool_profile", "default") self.rbdname = config.get('rbdname', '') self.total_procs = self.procs_per_volume * self.volumes_per_client * len(settings.getnodes('clients').split(',')) @@ -80,9 +82,17 @@ def initialize(self): common.sync_files('%s/*' % self.run_dir, self.out_dir) + if 'scrubbing_test' in self.cluster.config: + self.mkscrubimage() + # Create the recovery image based on test type requested if 'recovery_test' in self.cluster.config and self.recov_test_type == 'background': self.mkrecovimage() + + if 'scrub_recov_test' in self.cluster.config: + self.mkrecovimage() + self.mkscrubimage() + self.mkimages() # populate the fio files ps = [] @@ -128,6 +138,17 @@ def run(self): # Wait for a signal from the recovery thread to initiate client IO self.cluster.wait_start_io() + if 'scrubbing_test' in self.cluster.config: + scrubbing_callback = self.scrubbing_callback + self.cluster.create_scrubbing_test(self.run_dir, scrubbing_callback) + self.cluster.wait_start_io() + + if 'scrub_recov_test' in self.cluster.config: + scrub_recov_callback = self.scrub_recov_callback + self.cluster.create_scrub_recovery_test(self.run_dir, scrub_recov_callback) + self.cluster.wait_start_io() + + monitoring.start(self.run_dir) logger.info('Running rbd fio %s test.', self.mode) @@ -142,6 +163,9 @@ def run(self): if 'recovery_test' in self.cluster.config: self.cluster.wait_recovery_done() + if 'scrub_recov_test' in self.cluster.config: + self.cluster.wait_scrub_recovery_done() + monitoring.stop(self.run_dir) # Finally, get the historic ops @@ -210,6 +234,18 @@ def mkrecovimage(self): self.cluster.mkimage('cbt-librbdfio-recov-%s-%d' % (node,volnum), self.vol_size, self.recov_pool_name, self.data_pool, self.vol_object_size) monitoring.stop() + def mkscrubimage(self): + logger.info('Creating scrubbing image...') + monitoring.start("%s/scrub_pool_monitoring" % self.run_dir) + if (self.use_existing_volumes == False): + self.cluster.rmpool(self.scrub_pool_name, self.scrub_pool_profile) + self.cluster.mkpool(self.scrub_pool_name, self.scrub_pool_profile, 'rbd') + for node in common.get_fqdn_list('clients'): + for volnum in range(0, self.volumes_per_client): + node = node.rpartition("@")[2] + self.cluster.mkimage('cbt-librbdfio-scrub-%s-%d' % (node,volnum), self.vol_size, self.scrub_pool_name, self.data_pool, self.vol_object_size) + monitoring.stop() + def mkimages(self): monitoring.start("%s/pool_monitoring" % self.run_dir) if (self.use_existing_volumes == False): @@ -231,6 +267,9 @@ def recovery_callback_blocking(self): def recovery_callback_background(self): logger.info('Recovery thread completed!') + def scrubbing_callback(self): + logger.info('Scrubbing thread completed!') + def parse(self, out_dir): for client in settings.getnodes('clients').split(','): host = settings.host_info(client)["host"] diff --git a/client_endpoints/ceph_client_endpoints.py b/client_endpoints/ceph_client_endpoints.py index f61fb9bb..fe4b450f 100644 --- a/client_endpoints/ceph_client_endpoints.py +++ b/client_endpoints/ceph_client_endpoints.py @@ -26,6 +26,8 @@ def __init__(self, cluster, config): self.data_pool_profile = config.get('data_pool_profile', None) self.recov_pool = None self.recov_pool_profile = config.get('recov_pool_profile', 'default') + self.scrub_pool = None + self.scrub_pool_profile = config.get('scrub_pool_profile', 'default') self.order = config.get('order', 22) self.disabled_features = config.get('disabled_features', None) @@ -114,6 +116,15 @@ def create_rbd_recovery(self): rbd_name = '%s-%s' % (self.pool, self.get_rbd_name(node, ep_num)) self.cluster.mkimage(rbd_name, self.endpoint_size, self.pool, self.data_pool, self.order) + def create_rbd_scrubbing(self): + self.pool = '%s-scrub' % self.name + self.cluster.rmpool(self.pool, self.scrub_pool_profile) + self.cluster.mkpool(self.pool, self.scrub_pool_profile, 'rbd') + for node in common.get_fqdn_list('clients'): + for ep_num in range(0, self.endpoints_per_client): + rbd_name = '%s-%s' % (self.pool, self.get_rbd_name(node, ep_num)) + self.cluster.mkimage(rbd_name, self.endpoint_size, self.pool, self.data_pool, self.order) + def mount_rbd(self): for ep_num in range(0, self.endpoints_per_client): dir_name = self.get_dir_name(ep_num) diff --git a/client_endpoints/client_endpoints.py b/client_endpoints/client_endpoints.py index a5b19252..97ed2a06 100644 --- a/client_endpoints/client_endpoints.py +++ b/client_endpoints/client_endpoints.py @@ -45,3 +45,6 @@ def remove(self): def create_recovery_image(self): pass + + def create_scrubbing_image(self): + pass diff --git a/client_endpoints/librbd_client_endpoints.py b/client_endpoints/librbd_client_endpoints.py index 64fd3bd3..57fd3bba 100644 --- a/client_endpoints/librbd_client_endpoints.py +++ b/client_endpoints/librbd_client_endpoints.py @@ -19,3 +19,6 @@ def mount(self): def create_recovery_image(self): self.create_rbd_recovery() + + def create_scrubbing_image(self): + self.create_rbd_scrubbing() diff --git a/client_endpoints/rbdfuse_client_endpoints.py b/client_endpoints/rbdfuse_client_endpoints.py index ce4586f6..fa94a1ae 100644 --- a/client_endpoints/rbdfuse_client_endpoints.py +++ b/client_endpoints/rbdfuse_client_endpoints.py @@ -31,3 +31,6 @@ def map_rbd(self, node, rbd_name): def create_recovery_image(self): self.create_rbd_recovery() + + def create_scrubbing_image(self): + self.create_rbd_scrubbing() diff --git a/client_endpoints/rbdkernel_client_endpoints.py b/client_endpoints/rbdkernel_client_endpoints.py index 66dbd0a7..8a75afb9 100644 --- a/client_endpoints/rbdkernel_client_endpoints.py +++ b/client_endpoints/rbdkernel_client_endpoints.py @@ -22,3 +22,6 @@ def map_rbd(self, node, rbd_name): def create_recovery_image(self): self.create_rbd_recovery() + + def create_scrubbing_image(self): + self.create_rbd_scrubbing() diff --git a/client_endpoints/rbdnbd_client_endpoints.py b/client_endpoints/rbdnbd_client_endpoints.py index 22a05a98..93abc3be 100644 --- a/client_endpoints/rbdnbd_client_endpoints.py +++ b/client_endpoints/rbdnbd_client_endpoints.py @@ -16,3 +16,6 @@ def map_rbd(self, node, rbd_name): def create_recovery_image(self): self.create_rbd_recovery() + + def create_scrubbing_image(self): + self.create_rbd_scrubbing() diff --git a/client_endpoints/rbdtcmu_client_endpoints.py b/client_endpoints/rbdtcmu_client_endpoints.py index 0ec2c0e7..236e69cd 100644 --- a/client_endpoints/rbdtcmu_client_endpoints.py +++ b/client_endpoints/rbdtcmu_client_endpoints.py @@ -23,3 +23,6 @@ def map_rbd(self, node, rbd_name): def create_recovery_image(self): self.create_rbd_recovery() + + def create_scrubbing_image(self): + self.create_rbd_scrubbing() diff --git a/cluster/ceph.py b/cluster/ceph.py index 93fc9f32..d23897a4 100644 --- a/cluster/ceph.py +++ b/cluster/ceph.py @@ -148,6 +148,13 @@ def __init__(self, config): self.prefill_recov_time = 0 self.recov_pool_name = '' + #Scrubbing tests + self.scrub_enabled = config.get('enable_scrub', False) + self.prefill_scrub_objects = 0 + self.prefill_scrub_object_size = 0 + self.prefill_scrub_time = 0 + self.scrub_pool_name = '' + def initialize(self): # Reset the rulesets self.ruleset_map = {} @@ -194,7 +201,9 @@ def initialize(self): monitoring.stop() # Disable scrub and wait for any scrubbing to complete - self.disable_scrub() + logger.info("Scrub enabled is %s", self.scrub_enabled) + if not self.scrub_enabled: + self.disable_scrub() if self.disable_bal: self.disable_balancer() @@ -532,6 +541,12 @@ def start_rgw(self): def disable_scrub(self): common.pdsh(settings.getnodes('head'), "ceph osd set noscrub; ceph osd set nodeep-scrub").communicate() + def disable_recovery(self): + common.pdsh(settings.getnodes('head'), "ceph osd set norecover; ceph osd set nobackfill").communicate() + + def enable_recovery(self): + common.pdsh(settings.getnodes('head'), "ceph osd unset norecover; ceph osd unset nobackfill").communicate() + def disable_balancer(self): common.pdsh(settings.getnodes('head'), "ceph balancer off").communicate() @@ -548,7 +563,6 @@ def check_health(self, check_list=None, logfile=None, recstatsfile=None): if recstatsfile: header = "Time, Num Deg Objs, Total Deg Objs" stdout, stderr = common.pdsh(settings.getnodes('head'), 'echo %s >> %s' % (header, recstatsfile)).communicate() - while True: stdout, stderr = common.pdsh(settings.getnodes('head'), '%s -c %s health %s' % (self.ceph_cmd, self.tmp_conf, logline)).communicate() self.log_recovery_stats(recstatsfile) @@ -563,6 +577,44 @@ def check_health(self, check_list=None, logfile=None, recstatsfile=None): return ret + def log_scrubbing_stats(self, scrubstatsfile=None, pgid=None): + if not scrubstatsfile: + return + fmtjson = "--format=json" + PG_MAP = "pg_map" + PG_STATS = "pg_stats" + separator = " " + PG_ID = "pgid" + NUM_OBJECTS = "num_objects" + LAST_SCRUB_STAMP = "last_scrub_stamp" + STAT_SUM = "stat_sum" + SCRUB_DURATION = "last_scrub_duration" + OBJECTS_SCRUBBED = "objects_scrubbed" + stdout, stderr = common.pdsh(settings.getnodes('head'), '%s -c %s pg dump %s' % (self.ceph_cmd, self.tmp_conf, fmtjson)).communicate() + stdout = stdout.split(':', 1)[1] + stdout = stdout.strip() + try: + jsondata = json.loads(stdout) + except ValueError as e: + logger.error(str(e)) + return + scrubstats = [] + scrubstats.append(str(time.time())) + if PG_STATS in jsondata[PG_MAP]: + no_pgs = len(jsondata[PG_MAP][PG_STATS]) + for i in range(0, no_pgs): + if str(jsondata[PG_MAP][PG_STATS][i][PG_ID]) == pgid: + scrubstats.append(str(jsondata[PG_MAP][PG_STATS][i][PG_ID])) + scrubstats.append(str(jsondata[PG_MAP][PG_STATS][i][STAT_SUM][NUM_OBJECTS])) + scrubstats.append(str(jsondata[PG_MAP][PG_STATS][i][SCRUB_DURATION])) + scrubstats.append(str(jsondata[PG_MAP][PG_STATS][i][OBJECTS_SCRUBBED])) + + if len(scrubstats): + print(scrubstatsfile) + message = separator.join(scrubstats) + stdout, stderr = common.pdsh(settings.getnodes('head'), 'echo -e %s >> %s' % (message, scrubstatsfile)).communicate() + + def log_recovery_stats(self, recstatsfile=None): if not recstatsfile: return @@ -623,15 +675,39 @@ def check_backfill(self, check_list=None, logfile=None, recstatsfile=None): time.sleep(1) return ret - def check_scrub(self): + def check_scrub(self, scrubstatsfile=None): logger.info('Waiting until Scrubbing completes...') + fmtjson = '--format=json' + SCRUB_DURATION = "last_scrub_duration" + PG_STATS = "pg_stats" + PG_ID = "pgid" + pgs_scrubbed = [] while True: - stdout, stderr = common.pdsh(settings.getnodes('head'), '%s -c %s pg dump | cut -f 16 | grep "0.000000" | wc -l' % (self.ceph_cmd, self.tmp_conf)).communicate() - if " 0\n" in stdout: - break - else: - logger.info(stdout) - time.sleep(1) + stdout, stderr = common.pdsh(settings.getnodes('head'), '%s pg ls-by-pool %s %s' %(self.ceph_cmd, self.scrub_pool_name, fmtjson)).communicate() + stdout = stdout.split(':', 1)[1] + stdout = stdout.strip() + scrubbed_pgs = 0 + try: + jsondata = json.loads(stdout) + except ValueError as e: + logger.error(str(e)) + return 0 + logger.info('PG STATS present') + for i in range(0, len(jsondata[PG_STATS])): + if jsondata[PG_STATS][i][SCRUB_DURATION] == 0: + time.sleep(1) + else: + scrubbed_pgs += 1 + logger.info(scrubbed_pgs) + logger.info(jsondata[PG_STATS][i][SCRUB_DURATION]) + logger.info('Scrub done for: ') + logger.info(jsondata[PG_STATS][i][PG_ID]) + if jsondata[PG_STATS][i][PG_ID] not in pgs_scrubbed: + pgs_scrubbed.append(jsondata[PG_STATS][i][PG_ID]) + self.log_scrubbing_stats(scrubstatsfile, str(jsondata[PG_STATS][i][PG_ID])) + if scrubbed_pgs == len(jsondata[PG_STATS]): + logger.info('Scrubbing is complete') + return 1 def dump_config(self, run_dir): common.pdsh(settings.getnodes('osds'), 'sudo %s -c %s daemon osd.0 config show > %s/ceph_settings.out' % (self.ceph_cmd, self.tmp_conf, run_dir)).communicate() @@ -654,6 +730,21 @@ def create_recovery_test(self, run_dir, callback, test_type='blocking'): self.rt = RecoveryTestThreadBackground(rt_config, self, callback, self.stoprequest, self.haltrequest, self.startiorequest) self.rt.start() + def create_scrubbing_test(self, run_dir, callback): + ''' + Only background type currently + ''' + st_config = self.config.get("scrubbing_test", {}) + st_config['run_dir'] = run_dir + self.st = ScrubbingTestThreadBackground(st_config, self, callback, self.stoprequest, self.haltrequest, self.startiorequest) + self.st.start() + + def create_scrub_recovery_test(self, run_dir, callback): + config = self.config.get("scrub_recov_test", {}) + config['run_dir'] = run_dir + self.srt = ScrubRecoveryThreadBackground(config, self, callback, self.stoprequest, self.haltrequest, self.startiorequest) + self.srt.start() + def wait_start_io(self): logger.info("Waiting for signal to start client io...") self.startiorequest.wait() @@ -672,6 +763,32 @@ def wait_recovery_done(self): break self.rt.join(1) + def maybe_populate_scrubbing_pool(self): + if self.prefill_scrub_objects > 0 or self.prefill_scrub_time > 0: + logger.info('prefilling %s %sbyte objects into scrubbing pool %s' % (self.prefill_scrub_objects, self.prefill_scrub_object_size, self.scrub_pool_name)) + common.pdsh(settings.getnodes('head'), 'sudo %s -p %s bench %s write -b %s --max-objects %s --no-cleanup' % (self.rados_cmd, self.scrub_pool_name, self.prefill_scrub_time, self.prefill_scrub_object_size, self.prefill_scrub_objects)).communicate() + #self.check_health() + + def initiate_scrubbing(self): + logger.info("Initiating scrub on pool %s" % self.scrub_pool_name) + common.pdsh(settings.getnodes('head'), '%s osd pool deep-scrub %s' % (self.ceph_cmd, self.scrub_pool_name)).communicate() + + def wait_scrubbing_done(self): + self.stoprequest.set() + while True: + threads = threading.enumerate() + if len(threads) == 1: + break + self.st.join(1) + + def wait_scrub_recovery_done(self): + self.stoprequest.set() + while True: + threads = threading.enumerate() + if len(threads) == 1: + break + self.srt.join(1) + def check_pg_autoscaler(self, timeout=-1, logfile=None): ret = 0 if not timeout: @@ -772,6 +889,14 @@ def mkpool(self, name, profile_name, application, base_name=None): if self.prefill_recov_objects > 0: self.recov_pool_name = name + scrub_pool = profile.get('scrub_pool', False) + if scrub_pool: + self.prefill_scrub_objects = profile.get('prefill_scrub_objects', 0) + self.prefill_scrub_object_size = profile.get('prefill_scrub_object_size', 0) + self.prefill_scrub_time = profile.get('prefill_scrub_time', 0) + if self.prefill_scrub_objects > 0: + self.scrub_pool_name = name + if replication and replication == 'erasure': common.pdsh(settings.getnodes('head'), 'sudo %s -c %s osd pool create %s %d %d erasure %s' % (self.ceph_cmd, self.tmp_conf, name, pg_size, pgp_size, erasure_profile), continue_if_error=False).communicate() @@ -1167,3 +1292,216 @@ def run(self): self.states[self.state]() common.pdsh(settings.getnodes('head'), self.logcmd('Exiting recovery test thread. Last state was: %s' % self.state)).communicate() + +class ScrubbingTestThreadBackground(threading.Thread): + def __init__(self, config, cluster, callback, stoprequest, haltrequest, startiorequest): + threading.Thread.__init__(self) + self.config = config + self.cluster = cluster + self.callback = callback + self.state = 'pre' + self.states = {'pre': self.pre, 'osdout': self.osdout, 'osdin':self.osdin, + 'post': self.post, 'done': self.done} + self.startiorequest = startiorequest + self.stoprequest = stoprequest + self.haltrequest = haltrequest + self.outhealthtries = 0 + self.inhealthtries = 0 + self.maxhealthtries = 60 + self.health_checklist = ["peering", "recovery_wait", "stuck", "inactive", "unclean", "recovery"] + self.ceph_cmd = self.cluster.ceph_cmd + self.lasttime = time.time() + + def logcmd(self, message): + return 'echo "[`date`] %s" >> %s/scrubbing.log' % (message, self.config.get('run_dir')) + + def pre(self): + pre_time = self.config.get("pre_time", 60) + common.pdsh(settings.getnodes('head'), self.logcmd('Starting Scrubbing Test Thread, waiting %s seconds.' % pre_time)).communicate() + time.sleep(pre_time) + self.state = 'osdout' + + def osdout(self): + scrub_log = "%s/scrub.log" % self.config.get('run_dir') + scrub_stats_log = "%s/scrub_stats.log" % self.config.get('run_dir') + ret = self.cluster.check_health(self.health_checklist, None, None) + + common.pdsh(settings.getnodes('head'), self.logcmd("ret: %s" % ret)).communicate() + + self.cluster.maybe_populate_scrubbing_pool() + common.pdsh(settings.getnodes('head'), self.logcmd("osdout state - Sleeping for 10 secs after populating scrubbing pool.")).communicate() + time.sleep(10) + self.lasttime = time.time() + self.state = "osdin" + + def osdin(self): + scrub_stats_log = "%s/scrub_stats.log" % self.config.get('run_dir') + self.startiorequest.set() + self.cluster.initiate_scrubbing() + ret = self.cluster.check_scrub(scrub_stats_log) + if ret == 1: + self.state = "post" + + def post(self): + if self.stoprequest.isSet(): + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, but stoprequest is set, finishing now.')).communicate() + self.haltrequest.set() + return + + if self.config.get("repeat", False): + # reset counters + self.outhealthtries = 0 + self.inhealthtries = 0 + + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, but repeat is set. Moving to "osdout" state.')).communicate() + self.state = "osdout" + return + + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, finishing up...')).communicate() + self.state = "done" + + def done(self): + common.pdsh(settings.getnodes('head'), self.logcmd("Done. Calling parent callback function.")).communicate() + self.callback() + self.haltrequest.set() + + def join(self, timeout=None): + common.pdsh(settings.getnodes('head'), self.logcmd('Received notification that parent is finished and waiting.')).communicate() + super(ScrubbingTestThreadBackground, self).join(timeout) + + def run(self): + self.haltrequest.clear() + self.stoprequest.clear() + self.startiorequest.clear() + while not self.haltrequest.isSet(): + self.states[self.state]() + common.pdsh(settings.getnodes('head'), self.logcmd('Exiting scrubbing test thread. Last state was: %s' % self.state)).communicate() + + +class ScrubRecoveryThreadBackground(threading.Thread): + def __init__(self, config, cluster, callback, stoprequest, haltrequest, startiorequest): + threading.Thread.__init__(self) + self.config = config + self.cluster = cluster + self.callback = callback + self.state = 'pre' + self.states = {'pre': self.pre, 'markdown': self.markdown, 'osdout': self.osdout, 'osdin':self.osdin, + 'post': self.post, 'done': self.done} + self.startiorequest = startiorequest + self.stoprequest = stoprequest + self.haltrequest = haltrequest + self.outhealthtries = 0 + self.inhealthtries = 0 + self.maxhealthtries = 60 + self.health_checklist = ["peering", "recovery_wait", "stuck", "inactive", "unclean", "recovery"] + self.ceph_cmd = self.cluster.ceph_cmd + self.lasttime = time.time() + + def logcmd(self, message): + return 'echo "[`date`] %s" >> %s/scrub_recov.log' % (message, self.config.get('run_dir')) + + def pre(self): + pre_time = self.config.get("pre_time", 60) + common.pdsh(settings.getnodes('head'), self.logcmd('Starting Scrub+Recovery Test Thread, waiting %s seconds.' % pre_time)).communicate() + time.sleep(pre_time) + lcmd = self.logcmd("Setting the ceph osd noup flag") + common.pdsh(settings.getnodes('head'), '%s -c %s osd set noup;%s' % (self.ceph_cmd, self.cluster.tmp_conf, lcmd)).communicate() + self.state = 'markdown' + + def markdown(self): + for osdnum in self.config.get('osds'): + lcmd = self.logcmd("Marking OSD %s down." % osdnum) + common.pdsh(settings.getnodes('head'), '%s -c %s osd down %s;%s' % (self.ceph_cmd, self.cluster.tmp_conf, osdnum, lcmd)).communicate() + lcmd = self.logcmd("Marking OSD %s out." % osdnum) + common.pdsh(settings.getnodes('head'), '%s -c %s osd out %s;%s' % (self.ceph_cmd, self.cluster.tmp_conf, osdnum, lcmd)).communicate() + common.pdsh(settings.getnodes('head'), self.logcmd('Waiting for the cluster to break and heal')).communicate() + self.lasttime = time.time() + self.state = 'osdout' + + + def osdout(self): + reclog = "%s/recovery.log" % self.config.get('run_dir') + recstatslog = "%s/recovery_stats.log" % self.config.get('run_dir') + ret = self.cluster.check_health(self.health_checklist, reclog, recstatslog) + + common.pdsh(settings.getnodes('head'), self.logcmd("ret: %s" % ret)).communicate() + + if ret == 0: + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster never went unhealthy.')).communicate() + else: + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster appears to have healed.')).communicate() + rectime = str(time.time() - self.lasttime) + common.pdsh(settings.getnodes('head'), 'echo Time: %s >> %s' % (rectime, recstatslog)).communicate() + common.pdsh(settings.getnodes('head'), self.logcmd('Time: %s' % rectime)).communicate() + + # Populate the recovery pool + self.cluster.maybe_populate_recovery_pool() + + common.pdsh(settings.getnodes('head'), self.logcmd("osdout state - Sleeping for 10 secs after populating recovery pool.")).communicate() + time.sleep(10) + lcmd = self.logcmd("Unsetting the ceph osd noup flag") + self.cluster.disable_recovery() + common.pdsh(settings.getnodes('head'), '%s -c %s osd unset noup;%s' % (self.ceph_cmd, self.cluster.tmp_conf, lcmd)).communicate() + for osdnum in self.config.get('osds'): + lcmd = self.logcmd("Marking OSD %s up." % osdnum) + common.pdsh(settings.getnodes('head'), '%s -c %s osd up %s;%s' % (self.ceph_cmd, self.cluster.tmp_conf, osdnum, lcmd)).communicate() + lcmd = self.logcmd("Marking OSD %s in." % osdnum) + common.pdsh(settings.getnodes('head'), '%s -c %s osd in %s;%s' % (self.ceph_cmd, self.cluster.tmp_conf, osdnum, lcmd)).communicate() + self.lasttime = time.time() + # Populate the scrub pool + logger.info("Sleep before scrub populate") + time.sleep(10) + self.cluster.maybe_populate_scrubbing_pool() + self.state = "osdin" + + + def osdin(self): + #Start scrub + self.startiorequest.set() + self.cluster.initiate_scrubbing() + self.cluster.enable_recovery() + recstatslog = "%s/recovery_backfill_stats.log" % self.config.get('run_dir') + scrub_stats_log = "%s/scrub_stats.log" % self.config.get('run_dir') + backfill = threading.Thread(target=self.cluster.check_backfill, args=(self.health_checklist, "%s/recovery.log" % self.config.get('run_dir'), recstatslog,)) + scrub_check = threading.Thread(target=self.cluster.check_scrub, args=(scrub_stats_log,)) + backfill.start() + scrub_check.start() + backfill.join() + scrub_check.join() + self.state = "post" + + + def post(self): + if self.stoprequest.isSet(): + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, but stoprequest is set, finishing now.')).communicate() + self.haltrequest.set() + return + + if self.config.get("repeat", False): + # reset counters + self.outhealthtries = 0 + self.inhealthtries = 0 + + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, but repeat is set. Moving to "markdown" state.')).communicate() + self.state = "markdown" + return + + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, finishing up...')).communicate() + self.state = "done" + + def done(self): + common.pdsh(settings.getnodes('head'), self.logcmd("Done. Calling parent callback function.")).communicate() + self.callback() + self.haltrequest.set() + + def join(self, timeout=None): + common.pdsh(settings.getnodes('head'), self.logcmd('Received notification that parent is finished and waiting.')).communicate() + super(ScrubRecoveryThreadBackground, self).join(timeout) + + def run(self): + self.haltrequest.clear() + self.stoprequest.clear() + self.startiorequest.clear() + while not self.haltrequest.isSet(): + self.states[self.state]() + common.pdsh(settings.getnodes('head'), self.logcmd('Exiting scrub+recovery test thread. Last state was: %s' % self.state)).communicate() From fd4de2dd07462b7a5070fa1ff356a44bdcbdf8b8 Mon Sep 17 00:00:00 2001 From: Aishwarya Mathuria Date: Tue, 6 Sep 2022 14:00:26 +0000 Subject: [PATCH 2/3] Moving out scrub tests to a separate file Signed-off-by: Aishwarya Mathuria --- benchmark/fio.py | 22 +- benchmark/librbdfio.py | 2 +- client_endpoints/ceph_client_endpoints.py | 2 +- client_endpoints/client_endpoints.py | 2 +- client_endpoints/librbd_client_endpoints.py | 4 +- client_endpoints/rbdfuse_client_endpoints.py | 4 +- .../rbdkernel_client_endpoints.py | 4 +- client_endpoints/rbdnbd_client_endpoints.py | 4 +- client_endpoints/rbdtcmu_client_endpoints.py | 4 +- cluster/ceph.py | 240 +----------------- cluster/scrub_tests.py | 221 ++++++++++++++++ 11 files changed, 259 insertions(+), 250 deletions(-) create mode 100644 cluster/scrub_tests.py diff --git a/benchmark/fio.py b/benchmark/fio.py index 7ccfc631..251b46ff 100644 --- a/benchmark/fio.py +++ b/benchmark/fio.py @@ -66,11 +66,11 @@ def initialize_endpoints(self): # Create the recovery image based on test type requested if 'recovery_test' in self.cluster.config and self.recov_test_type == 'background': self.client_endpoints_object.create_recovery_image() - if 'scrubbing_test' in self.cluster.config: - self.client_endpoints_object.create_scrubbing_image() + if 'scrub_test' in self.cluster.config: + self.client_endpoints_object.create_scrub_image() if 'scrub_recov_test' in self.cluster.config: self.client_endpoints_object.create_recovery_image() - self.client_endpoints_object.create_scrubbing_image() + self.client_endpoints_object.create_scrub_image() self.create_endpoints() def create_endpoints(self): @@ -218,10 +218,10 @@ def run(self): # Wait for signal to start client IO self.cluster.wait_start_io() - if 'scrubbing_test' in self.cluster.config: - logger.info('Scrubbing test in config') - scrubbing_callback = self.scrubbing_callback - self.cluster.create_scrubbing_test(self.run_dir, scrubbing_callback) + if 'scrub_test' in self.cluster.config: + logger.info('Scrub test in config') + scrub_callback = self.scrub_callback + self.cluster.create_scrub_test(self.run_dir, scrub_callback) self.cluster.wait_start_io() if 'scrub_recov_test' in self.cluster.config: @@ -242,9 +242,9 @@ def run(self): # If we were doing recovery, wait until it's done. if 'recovery_test' in self.cluster.config: self.cluster.wait_recovery_done() - # If we were doing scrubbing, wait until it's done. - if 'scrubbing_test' in self.cluster.config: - self.cluster.wait_scrubbing_done() + # If we were doing scrub, wait until it's done. + if 'scrub_test' in self.cluster.config: + self.cluster.wait_scrub_done() if 'scrub_recov_test' in self.cluster.config: self.cluster.wait_scrub_recovery_done() @@ -263,7 +263,7 @@ def recovery_callback_blocking(self): def recovery_callback_background(self): logger.info('Recovery thread completed!') - def scrubbing_callback(self): + def scrub_callback(self): logger.info('Scrubbing thread completed') def scrub_recov_callback(self): diff --git a/benchmark/librbdfio.py b/benchmark/librbdfio.py index 66e487c6..f901585b 100644 --- a/benchmark/librbdfio.py +++ b/benchmark/librbdfio.py @@ -267,7 +267,7 @@ def recovery_callback_blocking(self): def recovery_callback_background(self): logger.info('Recovery thread completed!') - def scrubbing_callback(self): + def scrub_callback(self): logger.info('Scrubbing thread completed!') def parse(self, out_dir): diff --git a/client_endpoints/ceph_client_endpoints.py b/client_endpoints/ceph_client_endpoints.py index fe4b450f..7c4d5637 100644 --- a/client_endpoints/ceph_client_endpoints.py +++ b/client_endpoints/ceph_client_endpoints.py @@ -116,7 +116,7 @@ def create_rbd_recovery(self): rbd_name = '%s-%s' % (self.pool, self.get_rbd_name(node, ep_num)) self.cluster.mkimage(rbd_name, self.endpoint_size, self.pool, self.data_pool, self.order) - def create_rbd_scrubbing(self): + def create_rbd_scrub_pool(self): self.pool = '%s-scrub' % self.name self.cluster.rmpool(self.pool, self.scrub_pool_profile) self.cluster.mkpool(self.pool, self.scrub_pool_profile, 'rbd') diff --git a/client_endpoints/client_endpoints.py b/client_endpoints/client_endpoints.py index 97ed2a06..7ff21eca 100644 --- a/client_endpoints/client_endpoints.py +++ b/client_endpoints/client_endpoints.py @@ -46,5 +46,5 @@ def remove(self): def create_recovery_image(self): pass - def create_scrubbing_image(self): + def create_scrub_image(self): pass diff --git a/client_endpoints/librbd_client_endpoints.py b/client_endpoints/librbd_client_endpoints.py index 57fd3bba..fa92e234 100644 --- a/client_endpoints/librbd_client_endpoints.py +++ b/client_endpoints/librbd_client_endpoints.py @@ -20,5 +20,5 @@ def mount(self): def create_recovery_image(self): self.create_rbd_recovery() - def create_scrubbing_image(self): - self.create_rbd_scrubbing() + def create_scrub_image(self): + self.create_rbd_scrub_pool() diff --git a/client_endpoints/rbdfuse_client_endpoints.py b/client_endpoints/rbdfuse_client_endpoints.py index fa94a1ae..a3a7947f 100644 --- a/client_endpoints/rbdfuse_client_endpoints.py +++ b/client_endpoints/rbdfuse_client_endpoints.py @@ -32,5 +32,5 @@ def map_rbd(self, node, rbd_name): def create_recovery_image(self): self.create_rbd_recovery() - def create_scrubbing_image(self): - self.create_rbd_scrubbing() + def create_scrub_image(self): + self.create_rbd_scrub_pool() diff --git a/client_endpoints/rbdkernel_client_endpoints.py b/client_endpoints/rbdkernel_client_endpoints.py index 8a75afb9..d1a776df 100644 --- a/client_endpoints/rbdkernel_client_endpoints.py +++ b/client_endpoints/rbdkernel_client_endpoints.py @@ -23,5 +23,5 @@ def map_rbd(self, node, rbd_name): def create_recovery_image(self): self.create_rbd_recovery() - def create_scrubbing_image(self): - self.create_rbd_scrubbing() + def create_scrub_image(self): + self.create_rbd_scrub_pool() diff --git a/client_endpoints/rbdnbd_client_endpoints.py b/client_endpoints/rbdnbd_client_endpoints.py index 93abc3be..63b6cbfc 100644 --- a/client_endpoints/rbdnbd_client_endpoints.py +++ b/client_endpoints/rbdnbd_client_endpoints.py @@ -17,5 +17,5 @@ def map_rbd(self, node, rbd_name): def create_recovery_image(self): self.create_rbd_recovery() - def create_scrubbing_image(self): - self.create_rbd_scrubbing() + def create_scrub_image(self): + self.create_rbd_scrub_pool() diff --git a/client_endpoints/rbdtcmu_client_endpoints.py b/client_endpoints/rbdtcmu_client_endpoints.py index 236e69cd..25cd5f9c 100644 --- a/client_endpoints/rbdtcmu_client_endpoints.py +++ b/client_endpoints/rbdtcmu_client_endpoints.py @@ -24,5 +24,5 @@ def map_rbd(self, node, rbd_name): def create_recovery_image(self): self.create_rbd_recovery() - def create_scrubbing_image(self): - self.create_rbd_scrubbing() + def create_scrub_image(self): + self.create_rbd_scrub_pool() diff --git a/cluster/ceph.py b/cluster/ceph.py index d23897a4..50a9d7b8 100644 --- a/cluster/ceph.py +++ b/cluster/ceph.py @@ -9,6 +9,7 @@ import json from .cluster import Cluster +from .scrub_tests import ScrubTestThreadBackground, ScrubRecoveryThreadBackground logger = logging.getLogger("cbt") @@ -148,7 +149,7 @@ def __init__(self, config): self.prefill_recov_time = 0 self.recov_pool_name = '' - #Scrubbing tests + #Scrub tests self.scrub_enabled = config.get('enable_scrub', False) self.prefill_scrub_objects = 0 self.prefill_scrub_object_size = 0 @@ -200,7 +201,7 @@ def initialize(self): self.check_health() monitoring.stop() - # Disable scrub and wait for any scrubbing to complete + # Disable scrub and wait for any scrubs to complete logger.info("Scrub enabled is %s", self.scrub_enabled) if not self.scrub_enabled: self.disable_scrub() @@ -577,7 +578,7 @@ def check_health(self, check_list=None, logfile=None, recstatsfile=None): return ret - def log_scrubbing_stats(self, scrubstatsfile=None, pgid=None): + def log_scrub_stats(self, scrubstatsfile=None, pgid=None): if not scrubstatsfile: return fmtjson = "--format=json" @@ -676,7 +677,7 @@ def check_backfill(self, check_list=None, logfile=None, recstatsfile=None): return ret def check_scrub(self, scrubstatsfile=None): - logger.info('Waiting until Scrubbing completes...') + logger.info('Waiting until Scrub completes...') fmtjson = '--format=json' SCRUB_DURATION = "last_scrub_duration" PG_STATS = "pg_stats" @@ -704,9 +705,9 @@ def check_scrub(self, scrubstatsfile=None): logger.info(jsondata[PG_STATS][i][PG_ID]) if jsondata[PG_STATS][i][PG_ID] not in pgs_scrubbed: pgs_scrubbed.append(jsondata[PG_STATS][i][PG_ID]) - self.log_scrubbing_stats(scrubstatsfile, str(jsondata[PG_STATS][i][PG_ID])) + self.log_scrub_stats(scrubstatsfile, str(jsondata[PG_STATS][i][PG_ID])) if scrubbed_pgs == len(jsondata[PG_STATS]): - logger.info('Scrubbing is complete') + logger.info('Scrub is complete') return 1 def dump_config(self, run_dir): @@ -730,13 +731,13 @@ def create_recovery_test(self, run_dir, callback, test_type='blocking'): self.rt = RecoveryTestThreadBackground(rt_config, self, callback, self.stoprequest, self.haltrequest, self.startiorequest) self.rt.start() - def create_scrubbing_test(self, run_dir, callback): + def create_scrub_test(self, run_dir, callback): ''' Only background type currently ''' - st_config = self.config.get("scrubbing_test", {}) + st_config = self.config.get("scrub_test", {}) st_config['run_dir'] = run_dir - self.st = ScrubbingTestThreadBackground(st_config, self, callback, self.stoprequest, self.haltrequest, self.startiorequest) + self.st = ScrubTestThreadBackground(st_config, self, callback, self.stoprequest, self.haltrequest, self.startiorequest) self.st.start() def create_scrub_recovery_test(self, run_dir, callback): @@ -763,17 +764,17 @@ def wait_recovery_done(self): break self.rt.join(1) - def maybe_populate_scrubbing_pool(self): + def maybe_populate_scrub_pool(self): if self.prefill_scrub_objects > 0 or self.prefill_scrub_time > 0: - logger.info('prefilling %s %sbyte objects into scrubbing pool %s' % (self.prefill_scrub_objects, self.prefill_scrub_object_size, self.scrub_pool_name)) + logger.info('prefilling %s %sbyte objects into scrub pool %s' % (self.prefill_scrub_objects, self.prefill_scrub_object_size, self.scrub_pool_name)) common.pdsh(settings.getnodes('head'), 'sudo %s -p %s bench %s write -b %s --max-objects %s --no-cleanup' % (self.rados_cmd, self.scrub_pool_name, self.prefill_scrub_time, self.prefill_scrub_object_size, self.prefill_scrub_objects)).communicate() #self.check_health() - def initiate_scrubbing(self): + def initiate_scrub(self): logger.info("Initiating scrub on pool %s" % self.scrub_pool_name) common.pdsh(settings.getnodes('head'), '%s osd pool deep-scrub %s' % (self.ceph_cmd, self.scrub_pool_name)).communicate() - def wait_scrubbing_done(self): + def wait_scrub_done(self): self.stoprequest.set() while True: threads = threading.enumerate() @@ -1292,216 +1293,3 @@ def run(self): self.states[self.state]() common.pdsh(settings.getnodes('head'), self.logcmd('Exiting recovery test thread. Last state was: %s' % self.state)).communicate() - -class ScrubbingTestThreadBackground(threading.Thread): - def __init__(self, config, cluster, callback, stoprequest, haltrequest, startiorequest): - threading.Thread.__init__(self) - self.config = config - self.cluster = cluster - self.callback = callback - self.state = 'pre' - self.states = {'pre': self.pre, 'osdout': self.osdout, 'osdin':self.osdin, - 'post': self.post, 'done': self.done} - self.startiorequest = startiorequest - self.stoprequest = stoprequest - self.haltrequest = haltrequest - self.outhealthtries = 0 - self.inhealthtries = 0 - self.maxhealthtries = 60 - self.health_checklist = ["peering", "recovery_wait", "stuck", "inactive", "unclean", "recovery"] - self.ceph_cmd = self.cluster.ceph_cmd - self.lasttime = time.time() - - def logcmd(self, message): - return 'echo "[`date`] %s" >> %s/scrubbing.log' % (message, self.config.get('run_dir')) - - def pre(self): - pre_time = self.config.get("pre_time", 60) - common.pdsh(settings.getnodes('head'), self.logcmd('Starting Scrubbing Test Thread, waiting %s seconds.' % pre_time)).communicate() - time.sleep(pre_time) - self.state = 'osdout' - - def osdout(self): - scrub_log = "%s/scrub.log" % self.config.get('run_dir') - scrub_stats_log = "%s/scrub_stats.log" % self.config.get('run_dir') - ret = self.cluster.check_health(self.health_checklist, None, None) - - common.pdsh(settings.getnodes('head'), self.logcmd("ret: %s" % ret)).communicate() - - self.cluster.maybe_populate_scrubbing_pool() - common.pdsh(settings.getnodes('head'), self.logcmd("osdout state - Sleeping for 10 secs after populating scrubbing pool.")).communicate() - time.sleep(10) - self.lasttime = time.time() - self.state = "osdin" - - def osdin(self): - scrub_stats_log = "%s/scrub_stats.log" % self.config.get('run_dir') - self.startiorequest.set() - self.cluster.initiate_scrubbing() - ret = self.cluster.check_scrub(scrub_stats_log) - if ret == 1: - self.state = "post" - - def post(self): - if self.stoprequest.isSet(): - common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, but stoprequest is set, finishing now.')).communicate() - self.haltrequest.set() - return - - if self.config.get("repeat", False): - # reset counters - self.outhealthtries = 0 - self.inhealthtries = 0 - - common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, but repeat is set. Moving to "osdout" state.')).communicate() - self.state = "osdout" - return - - common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, finishing up...')).communicate() - self.state = "done" - - def done(self): - common.pdsh(settings.getnodes('head'), self.logcmd("Done. Calling parent callback function.")).communicate() - self.callback() - self.haltrequest.set() - - def join(self, timeout=None): - common.pdsh(settings.getnodes('head'), self.logcmd('Received notification that parent is finished and waiting.')).communicate() - super(ScrubbingTestThreadBackground, self).join(timeout) - - def run(self): - self.haltrequest.clear() - self.stoprequest.clear() - self.startiorequest.clear() - while not self.haltrequest.isSet(): - self.states[self.state]() - common.pdsh(settings.getnodes('head'), self.logcmd('Exiting scrubbing test thread. Last state was: %s' % self.state)).communicate() - - -class ScrubRecoveryThreadBackground(threading.Thread): - def __init__(self, config, cluster, callback, stoprequest, haltrequest, startiorequest): - threading.Thread.__init__(self) - self.config = config - self.cluster = cluster - self.callback = callback - self.state = 'pre' - self.states = {'pre': self.pre, 'markdown': self.markdown, 'osdout': self.osdout, 'osdin':self.osdin, - 'post': self.post, 'done': self.done} - self.startiorequest = startiorequest - self.stoprequest = stoprequest - self.haltrequest = haltrequest - self.outhealthtries = 0 - self.inhealthtries = 0 - self.maxhealthtries = 60 - self.health_checklist = ["peering", "recovery_wait", "stuck", "inactive", "unclean", "recovery"] - self.ceph_cmd = self.cluster.ceph_cmd - self.lasttime = time.time() - - def logcmd(self, message): - return 'echo "[`date`] %s" >> %s/scrub_recov.log' % (message, self.config.get('run_dir')) - - def pre(self): - pre_time = self.config.get("pre_time", 60) - common.pdsh(settings.getnodes('head'), self.logcmd('Starting Scrub+Recovery Test Thread, waiting %s seconds.' % pre_time)).communicate() - time.sleep(pre_time) - lcmd = self.logcmd("Setting the ceph osd noup flag") - common.pdsh(settings.getnodes('head'), '%s -c %s osd set noup;%s' % (self.ceph_cmd, self.cluster.tmp_conf, lcmd)).communicate() - self.state = 'markdown' - - def markdown(self): - for osdnum in self.config.get('osds'): - lcmd = self.logcmd("Marking OSD %s down." % osdnum) - common.pdsh(settings.getnodes('head'), '%s -c %s osd down %s;%s' % (self.ceph_cmd, self.cluster.tmp_conf, osdnum, lcmd)).communicate() - lcmd = self.logcmd("Marking OSD %s out." % osdnum) - common.pdsh(settings.getnodes('head'), '%s -c %s osd out %s;%s' % (self.ceph_cmd, self.cluster.tmp_conf, osdnum, lcmd)).communicate() - common.pdsh(settings.getnodes('head'), self.logcmd('Waiting for the cluster to break and heal')).communicate() - self.lasttime = time.time() - self.state = 'osdout' - - - def osdout(self): - reclog = "%s/recovery.log" % self.config.get('run_dir') - recstatslog = "%s/recovery_stats.log" % self.config.get('run_dir') - ret = self.cluster.check_health(self.health_checklist, reclog, recstatslog) - - common.pdsh(settings.getnodes('head'), self.logcmd("ret: %s" % ret)).communicate() - - if ret == 0: - common.pdsh(settings.getnodes('head'), self.logcmd('Cluster never went unhealthy.')).communicate() - else: - common.pdsh(settings.getnodes('head'), self.logcmd('Cluster appears to have healed.')).communicate() - rectime = str(time.time() - self.lasttime) - common.pdsh(settings.getnodes('head'), 'echo Time: %s >> %s' % (rectime, recstatslog)).communicate() - common.pdsh(settings.getnodes('head'), self.logcmd('Time: %s' % rectime)).communicate() - - # Populate the recovery pool - self.cluster.maybe_populate_recovery_pool() - - common.pdsh(settings.getnodes('head'), self.logcmd("osdout state - Sleeping for 10 secs after populating recovery pool.")).communicate() - time.sleep(10) - lcmd = self.logcmd("Unsetting the ceph osd noup flag") - self.cluster.disable_recovery() - common.pdsh(settings.getnodes('head'), '%s -c %s osd unset noup;%s' % (self.ceph_cmd, self.cluster.tmp_conf, lcmd)).communicate() - for osdnum in self.config.get('osds'): - lcmd = self.logcmd("Marking OSD %s up." % osdnum) - common.pdsh(settings.getnodes('head'), '%s -c %s osd up %s;%s' % (self.ceph_cmd, self.cluster.tmp_conf, osdnum, lcmd)).communicate() - lcmd = self.logcmd("Marking OSD %s in." % osdnum) - common.pdsh(settings.getnodes('head'), '%s -c %s osd in %s;%s' % (self.ceph_cmd, self.cluster.tmp_conf, osdnum, lcmd)).communicate() - self.lasttime = time.time() - # Populate the scrub pool - logger.info("Sleep before scrub populate") - time.sleep(10) - self.cluster.maybe_populate_scrubbing_pool() - self.state = "osdin" - - - def osdin(self): - #Start scrub - self.startiorequest.set() - self.cluster.initiate_scrubbing() - self.cluster.enable_recovery() - recstatslog = "%s/recovery_backfill_stats.log" % self.config.get('run_dir') - scrub_stats_log = "%s/scrub_stats.log" % self.config.get('run_dir') - backfill = threading.Thread(target=self.cluster.check_backfill, args=(self.health_checklist, "%s/recovery.log" % self.config.get('run_dir'), recstatslog,)) - scrub_check = threading.Thread(target=self.cluster.check_scrub, args=(scrub_stats_log,)) - backfill.start() - scrub_check.start() - backfill.join() - scrub_check.join() - self.state = "post" - - - def post(self): - if self.stoprequest.isSet(): - common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, but stoprequest is set, finishing now.')).communicate() - self.haltrequest.set() - return - - if self.config.get("repeat", False): - # reset counters - self.outhealthtries = 0 - self.inhealthtries = 0 - - common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, but repeat is set. Moving to "markdown" state.')).communicate() - self.state = "markdown" - return - - common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, finishing up...')).communicate() - self.state = "done" - - def done(self): - common.pdsh(settings.getnodes('head'), self.logcmd("Done. Calling parent callback function.")).communicate() - self.callback() - self.haltrequest.set() - - def join(self, timeout=None): - common.pdsh(settings.getnodes('head'), self.logcmd('Received notification that parent is finished and waiting.')).communicate() - super(ScrubRecoveryThreadBackground, self).join(timeout) - - def run(self): - self.haltrequest.clear() - self.stoprequest.clear() - self.startiorequest.clear() - while not self.haltrequest.isSet(): - self.states[self.state]() - common.pdsh(settings.getnodes('head'), self.logcmd('Exiting scrub+recovery test thread. Last state was: %s' % self.state)).communicate() diff --git a/cluster/scrub_tests.py b/cluster/scrub_tests.py new file mode 100644 index 00000000..6ecb2335 --- /dev/null +++ b/cluster/scrub_tests.py @@ -0,0 +1,221 @@ +import common +import settings +import time +import threading +import logging + + +logger = logging.getLogger("cbt") + + +class ScrubTestThreadBackground(threading.Thread): + def __init__(self, config, cluster, callback, stoprequest, haltrequest, startiorequest): + threading.Thread.__init__(self) + self.config = config + self.cluster = cluster + self.callback = callback + self.state = 'pre' + self.states = {'pre': self.pre, 'fill_scrub_pool': self.fill_scrub_pool, 'start_scrub':self.start_scrub, + 'post': self.post, 'done': self.done} + self.startiorequest = startiorequest + self.stoprequest = stoprequest + self.haltrequest = haltrequest + self.outhealthtries = 0 + self.inhealthtries = 0 + self.maxhealthtries = 60 + self.health_checklist = ["peering", "recovery_wait", "stuck", "inactive", "unclean", "recovery"] + self.ceph_cmd = self.cluster.ceph_cmd + self.lasttime = time.time() + + def logcmd(self, message): + return 'echo "[`date`] %s" >> %s/scrub.log' % (message, self.config.get('run_dir')) + + def pre(self): + pre_time = self.config.get("pre_time", 60) + common.pdsh(settings.getnodes('head'), self.logcmd('Starting Scrub Test Thread, waiting %s seconds.' % pre_time)).communicate() + time.sleep(pre_time) + self.state = 'fill_scrub_pool' + + def fill_scrub_pool(self): + scrub_log = "%s/scrub.log" % self.config.get('run_dir') + scrub_stats_log = "%s/scrub_stats.log" % self.config.get('run_dir') + ret = self.cluster.check_health(self.health_checklist, None, None) + + common.pdsh(settings.getnodes('head'), self.logcmd("ret: %s" % ret)).communicate() + + self.cluster.maybe_populate_scrub_pool() + common.pdsh(settings.getnodes('head'), self.logcmd("osdout state - Sleeping for 10 secs after populating scrub pool.")).communicate() + time.sleep(10) + self.lasttime = time.time() + self.state = "start_scrub" + + def start_scrub(self): + scrub_stats_log = "%s/scrub_stats.log" % self.config.get('run_dir') + self.startiorequest.set() + self.cluster.initiate_scrub() + ret = self.cluster.check_scrub(scrub_stats_log) + if ret == 1: + self.state = "post" + + def post(self): + if self.stoprequest.isSet(): + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, but stoprequest is set, finishing now.')).communicate() + self.haltrequest.set() + return + + if self.config.get("repeat", False): + # reset counters + self.outhealthtries = 0 + self.inhealthtries = 0 + + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, but repeat is set. Moving to "osdout" state.')).communicate() + self.state = "fill_scrub_pool" + return + + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, finishing up...')).communicate() + self.state = "done" + + def done(self): + common.pdsh(settings.getnodes('head'), self.logcmd("Done. Calling parent callback function.")).communicate() + self.callback() + self.haltrequest.set() + + def join(self, timeout=None): + common.pdsh(settings.getnodes('head'), self.logcmd('Received notification that parent is finished and waiting.')).communicate() + super(ScrubTestThreadBackground, self).join(timeout) + + def run(self): + self.haltrequest.clear() + self.stoprequest.clear() + self.startiorequest.clear() + while not self.haltrequest.isSet(): + self.states[self.state]() + common.pdsh(settings.getnodes('head'), self.logcmd('Exiting scrub test thread. Last state was: %s' % self.state)).communicate() + + +class ScrubRecoveryThreadBackground(threading.Thread): + def __init__(self, config, cluster, callback, stoprequest, haltrequest, startiorequest): + threading.Thread.__init__(self) + self.config = config + self.cluster = cluster + self.callback = callback + self.state = 'pre' + self.states = {'pre': self.pre, 'markdown': self.markdown, 'fill_pools': self.fill_pools, + 'start_recovery_and_scrub':self.start_recovery_and_scrub, 'post': self.post, 'done': self.done} + self.startiorequest = startiorequest + self.stoprequest = stoprequest + self.haltrequest = haltrequest + self.outhealthtries = 0 + self.inhealthtries = 0 + self.maxhealthtries = 60 + self.health_checklist = ["peering", "recovery_wait", "stuck", "inactive", "unclean", "recovery"] + self.ceph_cmd = self.cluster.ceph_cmd + self.lasttime = time.time() + + def logcmd(self, message): + return 'echo "[`date`] %s" >> %s/scrub_recov.log' % (message, self.config.get('run_dir')) + + def pre(self): + pre_time = self.config.get("pre_time", 60) + common.pdsh(settings.getnodes('head'), self.logcmd('Starting Scrub+Recovery Test Thread, waiting %s seconds.' % pre_time)).communicate() + time.sleep(pre_time) + lcmd = self.logcmd("Setting the ceph osd noup flag") + common.pdsh(settings.getnodes('head'), '%s -c %s osd set noup;%s' % (self.ceph_cmd, self.cluster.tmp_conf, lcmd)).communicate() + self.state = 'markdown' + + def markdown(self): + for osdnum in self.config.get('osds'): + lcmd = self.logcmd("Marking OSD %s down." % osdnum) + common.pdsh(settings.getnodes('head'), '%s -c %s osd down %s;%s' % (self.ceph_cmd, self.cluster.tmp_conf, osdnum, lcmd)).communicate() + lcmd = self.logcmd("Marking OSD %s out." % osdnum) + common.pdsh(settings.getnodes('head'), '%s -c %s osd out %s;%s' % (self.ceph_cmd, self.cluster.tmp_conf, osdnum, lcmd)).communicate() + common.pdsh(settings.getnodes('head'), self.logcmd('Waiting for the cluster to break and heal')).communicate() + self.lasttime = time.time() + self.state = 'fill_pools' + + + def fill_pools(self): + reclog = "%s/recovery.log" % self.config.get('run_dir') + recstatslog = "%s/recovery_stats.log" % self.config.get('run_dir') + ret = self.cluster.check_health(self.health_checklist, reclog, recstatslog) + + common.pdsh(settings.getnodes('head'), self.logcmd("ret: %s" % ret)).communicate() + + if ret == 0: + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster never went unhealthy.')).communicate() + else: + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster appears to have healed.')).communicate() + rectime = str(time.time() - self.lasttime) + common.pdsh(settings.getnodes('head'), 'echo Time: %s >> %s' % (rectime, recstatslog)).communicate() + common.pdsh(settings.getnodes('head'), self.logcmd('Time: %s' % rectime)).communicate() + + # Populate the recovery pool + self.cluster.maybe_populate_recovery_pool() + + common.pdsh(settings.getnodes('head'), self.logcmd("osdout state - Sleeping for 10 secs after populating recovery pool.")).communicate() + time.sleep(10) + lcmd = self.logcmd("Unsetting the ceph osd noup flag") + self.cluster.disable_recovery() + common.pdsh(settings.getnodes('head'), '%s -c %s osd unset noup;%s' % (self.ceph_cmd, self.cluster.tmp_conf, lcmd)).communicate() + for osdnum in self.config.get('osds'): + lcmd = self.logcmd("Marking OSD %s up." % osdnum) + common.pdsh(settings.getnodes('head'), '%s -c %s osd up %s;%s' % (self.ceph_cmd, self.cluster.tmp_conf, osdnum, lcmd)).communicate() + lcmd = self.logcmd("Marking OSD %s in." % osdnum) + common.pdsh(settings.getnodes('head'), '%s -c %s osd in %s;%s' % (self.ceph_cmd, self.cluster.tmp_conf, osdnum, lcmd)).communicate() + self.lasttime = time.time() + # Populate the scrub pool + logger.info("Sleep before scrub populate") + time.sleep(10) + self.cluster.maybe_populate_scrub_pool() + self.state = "start_recovery_and_scrub" + + + def start_recovery_and_scrub(self): + self.startiorequest.set() + self.cluster.initiate_scrub() + self.cluster.enable_recovery() + recstatslog = "%s/recovery_backfill_stats.log" % self.config.get('run_dir') + scrub_stats_log = "%s/scrub_stats.log" % self.config.get('run_dir') + backfill = threading.Thread(target=self.cluster.check_backfill, args=(self.health_checklist, "%s/recovery.log" % self.config.get('run_dir'), recstatslog,)) + scrub_check = threading.Thread(target=self.cluster.check_scrub, args=(scrub_stats_log,)) + backfill.start() + scrub_check.start() + backfill.join() + scrub_check.join() + self.state = "post" + + + def post(self): + if self.stoprequest.isSet(): + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, but stoprequest is set, finishing now.')).communicate() + self.haltrequest.set() + return + + if self.config.get("repeat", False): + # reset counters + self.outhealthtries = 0 + self.inhealthtries = 0 + + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, but repeat is set. Moving to "markdown" state.')).communicate() + self.state = "markdown" + return + + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, finishing up...')).communicate() + self.state = "done" + + def done(self): + common.pdsh(settings.getnodes('head'), self.logcmd("Done. Calling parent callback function.")).communicate() + self.callback() + self.haltrequest.set() + + def join(self, timeout=None): + common.pdsh(settings.getnodes('head'), self.logcmd('Received notification that parent is finished and waiting.')).communicate() + super(ScrubRecoveryThreadBackground, self).join(timeout) + + def run(self): + self.haltrequest.clear() + self.stoprequest.clear() + self.startiorequest.clear() + while not self.haltrequest.isSet(): + self.states[self.state]() + common.pdsh(settings.getnodes('head'), self.logcmd('Exiting scrub+recovery test thread. Last state was: %s' % self.state)).communicate() From 4291252f4a48034bfd2f2100b0388c0344441822 Mon Sep 17 00:00:00 2001 From: Aishwarya Mathuria Date: Mon, 28 Nov 2022 13:51:30 +0000 Subject: [PATCH 3/3] cluster: Add option for shallow scrub for scrub tests Signed-off-by: Aishwarya Mathuria --- cluster/ceph.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/cluster/ceph.py b/cluster/ceph.py index 50a9d7b8..de10d4c7 100644 --- a/cluster/ceph.py +++ b/cluster/ceph.py @@ -151,6 +151,7 @@ def __init__(self, config): #Scrub tests self.scrub_enabled = config.get('enable_scrub', False) + self.scrub_type = config.get('scrub_type', '') self.prefill_scrub_objects = 0 self.prefill_scrub_object_size = 0 self.prefill_scrub_time = 0 @@ -771,8 +772,15 @@ def maybe_populate_scrub_pool(self): #self.check_health() def initiate_scrub(self): - logger.info("Initiating scrub on pool %s" % self.scrub_pool_name) - common.pdsh(settings.getnodes('head'), '%s osd pool deep-scrub %s' % (self.ceph_cmd, self.scrub_pool_name)).communicate() + if self.scrub_type: + if self.scrub_type == 'shallow': + scrub_command = 'scrub' + elif self.scrub_type == 'deep': + scrub_command = 'deep-scrub' + else: + scrub_command = 'deep-scrub' + logger.info("Initiating %s on pool %s" % (scrub_command, self.scrub_pool_name)) + common.pdsh(settings.getnodes('head'), '%s osd pool %s %s' % (self.ceph_cmd, scrub_command, self.scrub_pool_name)).communicate() def wait_scrub_done(self): self.stoprequest.set()