Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replica job stability 1.8 #375

197 changes: 163 additions & 34 deletions replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import genquery
import irods_types
import psutil

from util import *

Expand Down Expand Up @@ -41,18 +42,22 @@ def replicate_asynchronously(ctx, path, source_resource, target_resource):


@rule.make()
def rule_replicate_batch(ctx, verbose):
def rule_replicate_batch(ctx, verbose, rss_limit='1000000000', num_copies=2, dry_run='0'):
"""Scheduled replication batch job.

Performs replication for all data objects marked with 'org_replication_scheduled' metadata.
The metadata value indicates the source and destination resource.

:param ctx: Combined type of a callback and rei struct
:param verbose: Whether to log verbose messages for troubleshooting ('1': yes, anything else: no)
:max_rss: When not '0' maximum amount of rss memory in bytes before stopping, after first data object
:num_copies: How many good current copies are enough
:param dry_run: When '1' do not actually replicate, only log what would have replicated
"""
count = 0
count_ok = 0
print_verbose = (verbose == '1')
no_action = (dry_run == '1')

attr = constants.UUORGMETADATAPREFIX + "replication_scheduled"
errorattr = constants.UUORGMETADATAPREFIX + "replication_failed"
Expand All @@ -65,38 +70,84 @@ def rule_replicate_batch(ctx, verbose):

minimum_timestamp = int(time.time() - config.async_replication_delay_time)

log.write(ctx, "[replication] verbose = {}".format(verbose))
log.write(ctx, "[replication] async_replication_delay_time = {} seconds".format(config.async_replication_delay_time))
log.write(ctx, "[replication] rss_limit = {} bytes".format(rss_limit))
log.write(ctx, "[replication] dry_run = {}".format(dry_run))
show_memory_usage(ctx)

iter = list(genquery.Query(ctx,
['ORDER(DATA_ID)', 'COLL_NAME', 'DATA_NAME', 'META_DATA_ATTR_VALUE'],
"META_DATA_ATTR_NAME = '{}' AND DATA_MODIFY_TIME n<= '{}'".format(attr, minimum_timestamp),
output=genquery.AS_LIST))

# Workaround for multiple "org_replication_scheduled" AVU's having been
# created for the same data object.
# They are sorted on the V by the way.
# The first one is processed, and then all org_replication_scheduled
# AVU's are removed from within the following loop.
# Ignore, but report extra occurrences.
data_objects_seen = set()

for row in iter:
# Stop further execution if admin has blocked replication process.
if is_replication_blocked_by_admin(ctx):
log.write(ctx, "[replication] Batch replication job is stopped")
break

count += 1
path = row[1] + "/" + row[2]
# Check current memory usage and stop if it is above the limit.
if memory_limit_exceeded(rss_limit):
show_memory_usage(ctx)
log.write(ctx, "[replication] Memory used is now above specified limit of {} bytes, stopping further processing".format(rss_limit))
break

coll_name = row[1]
data_name = row[2]
path = coll_name + "/" + data_name
rescs = row[3]

# Have we seen this data object before in this run. In current Yoda this is a bug.
if path in data_objects_seen:
log.write(ctx, "[replication] WARNING - ignoring extra AVU <{}: {}> for <{}>".format(attr, rescs, path))
continue

data_objects_seen.add(path)
count += 1

xs = rescs.split(',')
if len(xs) != 2:
# not replicable
avu.set_on_data(ctx, path, errorattr, "true")
log.write(ctx, "[replication] ERROR - Invalid replication data for {}".format(path))
log.write(ctx, "[replication] ERROR - Invalid replication data for <{}>: <{}>".format(path, rescs))
if no_action:
log.write(ctx, "[replication] Skipping removing and setting AVU's (dry_run)")
continue
set_flag_on_data(ctx=ctx, path=path, attr=errorattr, value="true")
remove_replication_scheduled_flag(ctx=ctx, path=path, attr=attr)
# Go to next record and skip further processing
continue

from_path = xs[0]
to_path = xs[1]

if already_has_enough_good_replicas(ctx, coll_name, data_name, num_copies):
log.write(ctx, "[replication] Skipping batch replication: already at least {} good replicas of {}".format(num_copies, path, to_path))
if not no_action:
remove_replication_scheduled_flag(ctx=ctx, path=path, attr=attr)
continue

# "No action" is meant for easier memory usage debugging.
if no_action:
show_memory_usage(ctx)
log.write(ctx, "[replication] Skipping batch replication (dry_run): would have replicated \"{}\" from {} to {}".format(path, from_path, to_path))
continue

if print_verbose:
log.write(ctx, "[replication] Batch replication: copying {} from {} to {}".format(path, from_path, to_path))

# Actual replication
try:
# Ensure first replica has checksum before replication.
msi.data_obj_chksum(ctx, path, "replNum=0", irods_types.BytesBuf())
# Ensure replica has checksum before replication.
msi.data_obj_chksum(ctx, path, "irodsAdmin=", irods_types.BytesBuf())

# Workaround the PREP deadlock issue: Restrict threads to 1.
ofFlags = "numThreads=1++++rescName={}++++destRescName={}++++irodsAdmin=++++verifyChksum=".format(from_path, to_path)
Expand All @@ -105,43 +156,121 @@ def rule_replicate_batch(ctx, verbose):
count_ok += 1
except msi.Error as e:
log.write(ctx, '[replication] ERROR - The file could not be replicated: {}'.format(str(e)))
avu.set_on_data(ctx, path, errorattr, "true")
set_flag_on_data(ctx=ctx, path=path, attr=errorattr, value="true")

# Remove replication_scheduled flag no matter if replication succeeded or not.
# rods should have been given own access via policy to allow AVU changes
avu_deleted = False
try:
avu.rmw_from_data(ctx, path, attr, "%") # use wildcard cause rm_from_data causes problems
avu_deleted = True
except Exception:
avu_deleted = False

# Try removing attr/resc meta data again with other ACL's
if not avu_deleted:
try:
# The object's ACLs may have changed.
# Force the ACL and try one more time.
msi.sudo_obj_acl_set(ctx, "", "own", user.full_name(ctx), path, "")
avu.rmw_from_data(ctx, path, attr, "%") # use wildcard cause rm_from_data causes problems
except Exception:
# error => report it but still continue
log.write(ctx, "[replication] ERROR - Scheduled replication of <{}>: could not remove schedule flag".format(path))
remove_replication_scheduled_flag(ctx=ctx, path=path, attr=attr)


if print_verbose:
show_memory_usage(ctx)

# Total replication process completed
log.write(ctx, "[replication] Batch replication job finished. {}/{} objects replicated successfully.".format(count_ok, count))


def set_flag_on_data(ctx, path, attr, value):
"""Set attribute on data-object, if necessary as rodsAdmin.

When setting fails, fallback to first giving user "own" access as workaround.
"""
avu_set = False
attempt = 0
try:
attempt += 1
avu.set_on_data(ctx, path, attr, value)
avu_set = True
except msi.Error as e:
log.debug(ctx, "[replication] Cannot set <{}> in attempt {}: <{}>".format(attr, attempt, str(e)))

# Try removing attr/resc meta data again with other ACL's
if not avu_set:
try:
# The object's ACLs may have changed.
# Force the ACL and try one more time.
attempt += 1
msi.sudo_obj_acl_set(ctx, "", "own", user.full_name(ctx), path, "")
avu.set_on_data(ctx, path, attr, value)
avu_set = True
except Exception as e:
log.debug(ctx, "[replication] Cannot set <{}> in attempt {}: <{}>".format(attr, attempt, str(e)))

if not avu_set:
log.write(ctx, "[replication] ERROR - Cannot set <{}> in attempt {}: <{}>".format(attr, attempt, str(e.name())))


def remove_replication_scheduled_flag(ctx, path, attr):
# Remove replication_scheduled flag no matter if replication succeeded or not.
# rods should have been given own access via policy to allow AVU changes
avu_deleted = False
try:
avu.rmw_from_data(ctx, path, attr, "%") # use wildcard cause rm_from_data causes problems
avu_deleted = True
except Exception:
avu_deleted = False

# Try removing attr/resc meta data again with other ACL's
if not avu_deleted:
try:
# The object's ACLs may have changed.
# Force the ACL and try one more time.
msi.sudo_obj_acl_set(ctx, "", "own", user.full_name(ctx), path, "")
avu.rmw_from_data(ctx, path, attr, "%") # use wildcard cause rm_from_data causes problems
except Exception:
# error => report it but still continue
log.write(ctx, "[replication] ERROR - Scheduled replication of <{}>: could not remove schedule flag".format(path))


def already_has_enough_good_replicas(ctx, coll_name, data_name, num_copies):
"""Are there at least num_copies good (status 1) replica's?

This version just checks if there are at least num_copies with status '1', without
taking in consideration on which resources, e.g. if they are in different data centers.

The main motive for now is being able to prevent extra unnecessary actions while doing
large-scale maintenance, e.g. moving all data objects from one resource to another with
an "irepl" followed (possibly much later) by an "itrim".
"""
count = 0
rows = list(genquery.Query(ctx,
['COUNT(DATA_REPL_NUM)', 'COLL_NAME', 'DATA_NAME', 'DATA_REPL_NUM', 'RESC_NAME'],
"COLL_NAME = '{}' AND DATA_NAME = '{}' AND DATA_REPL_STATUS = '1'".format(coll_name, data_name),
output=genquery.AS_LIST))
for row in rows:
count += 1
log.debug(ctx, "[replication] row = {}".format(row))

return count >= num_copies


def is_replication_blocked_by_admin(ctx):
"""Admin can put the replication process on a hold by adding a file called 'stop_replication' in collection /yoda/flags.

:param ctx: Combined type of a callback and rei struct

:returns: Boolean indicating if admin put replication on hold.
"""
zone = user.zone(ctx)
iter = genquery.row_iterator(
"DATA_ID",
"COLL_NAME = '" + "/{}/yoda/flags".format(zone) + "' AND DATA_NAME = 'stop_replication'",
genquery.AS_LIST, ctx
)
return len(list(iter)) > 0
return data_object.exists(ctx, "/{}{}".format(user.zone(ctx), "/yoda/flags/stop_replication"))


def memory_rss_usage():
"""
The RSS (resident) memory size in bytes for the current process.
"""
p = psutil.Process()
return p.memory_info().rss


def show_memory_usage(ctx):
"""
For debug purposes show the current RSS usage.
"""
log.write(ctx, "[replication] current RSS usage: {} bytes".format(memory_rss_usage()))


def memory_limit_exceeded(rss_limit):
"""
True when a limit other than 0 was specified and memory usage is currently
above this limit. Otherwise False.
"""
rss_limit = int(rss_limit)
return rss_limit and memory_rss_usage() > rss_limit
61 changes: 52 additions & 9 deletions revisions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import genquery
import irods_types
import psutil

import folder
import groups
Expand Down Expand Up @@ -275,18 +276,21 @@ def resource_modified_post_revision(ctx, resource, zone, path):


@rule.make()
def rule_revision_batch(ctx, verbose):
def rule_revision_batch(ctx, verbose, rss_limit='1000000000',dry_run='0'):
"""Scheduled revision creation batch job.

Creates revisions for all data objects marked with 'org_revision_scheduled' metadata.

:param ctx: Combined type of a callback and rei struct
:param verbose: Whether to log verbose messages for troubleshooting ('1': yes, anything else: no)
:max_rss: When not '0' maximum amount of rss memory in bytes before stopping, after first data object
:param dry_run: When '1' do not actually create revisions, only log what would have been created
"""
count = 0
count_ok = 0
count_ignored = 0
print_verbose = (verbose == '1')
no_action = (dry_run == '1')

attr = constants.UUORGMETADATAPREFIX + "revision_scheduled"
errorattr = constants.UUORGMETADATAPREFIX + "revision_failed"
Expand All @@ -299,6 +303,12 @@ def rule_revision_batch(ctx, verbose):

minimum_timestamp = int(time.time() - config.async_revision_delay_time)

log.write(ctx, "[revisions] verbose = {}".format(verbose))
log.write(ctx, "[revisions] async_revision_delay_time = {} seconds".format(config.async_revision_delay_time))
log.write(ctx, "[revisions] rss_limit = {} bytes".format(rss_limit))
log.write(ctx, "[revisions] dry_run = {}".format(dry_run))
show_memory_usage(ctx)

iter = list(genquery.Query(ctx,
['ORDER(DATA_ID)', 'COLL_NAME', 'DATA_NAME', 'META_DATA_ATTR_VALUE'],
"META_DATA_ATTR_NAME = '{}' AND COLL_NAME like '/{}/home/{}%' AND DATA_MODIFY_TIME n<= '{}'".format(
Expand All @@ -316,10 +326,22 @@ def rule_revision_batch(ctx, verbose):
log.write(ctx, "[revisions] Batch revision job is stopped")
break

# Check current memory usage and stop if it is above the limit.
if memory_limit_exceeded(rss_limit):
show_memory_usage(ctx)
log.write(ctx, "[revisions] Memory used is now above specified limit of {} bytes, stopping further processing".format(rss_limit))
break

# Perform scheduled revision creation for one data object.
path = row[1] + "/" + row[2]
resc = row[3]

# "No action" is meant for easier memory usage debugging.
if no_action:
show_memory_usage(ctx)
log.write(ctx, "[revisions] Skipping creating revision (dry_run): would have created revision for {} on resc {}".format(path, resc))
continue

if print_verbose:
log.write(ctx, "[revisions] Batch revision: creating revision for {} on resc {}".format(path, resc))

Expand Down Expand Up @@ -372,6 +394,9 @@ def rule_revision_batch(ctx, verbose):
log.write(ctx, "[revisions] ERROR - Scheduled revision creation of <{}> failed".format(path))
avu.set_on_data(ctx, path, errorattr, "true")

if print_verbose:
show_memory_usage(ctx)

# Total revision process completed
log.write(ctx, "[revisions] Batch revision job finished. {}/{} objects processed successfully. {} objects ignored.".format(count_ok, count, count_ignored))

Expand All @@ -381,15 +406,9 @@ def is_revision_blocked_by_admin(ctx):

:param ctx: Combined type of a callback and rei struct

:returns: Boolean indicating if admin put replication on hold.
:returns: Boolean indicating if admin put revisions on hold.
"""
zone = user.zone(ctx)
iter = genquery.row_iterator(
"DATA_ID",
"COLL_NAME = '" + "/{}/yoda/flags".format(zone) + "' AND DATA_NAME = 'stop_revisions'",
genquery.AS_LIST, ctx
)
return (len(list(iter)) > 0)
return data_object.exists(ctx, "/{}{}".format(user.zone(ctx), "/yoda/flags/stop_revisions"))


def revision_create(ctx, resource, path, max_size, verbose):
Expand Down Expand Up @@ -844,3 +863,27 @@ def revision_remove(ctx, revision_id, revision_path):

log.write(ctx, "[revisions] ERROR - Revision ID <{}> not found or permission denied.".format(revision_id))
return False


def memory_rss_usage():
"""
The RSS (resident) memory size in bytes for the current process.
"""
p = psutil.Process()
return p.memory_info().rss


def show_memory_usage(ctx):
"""
For debug purposes show the current RSS usage.
"""
log.write(ctx, "[revisions] current RSS usage: {} bytes".format(memory_rss_usage()))


def memory_limit_exceeded(rss_limit):
"""
True when a limit other than 0 was specified and memory usage is currently
above this limit. Otherwise False.
"""
rss_limit = int(rss_limit)
return rss_limit and memory_rss_usage() > rss_limit
Loading
Loading