Skip to content

Commit

Permalink
[Celery]Only send to ES data from the history
Browse files Browse the repository at this point in the history
  • Loading branch information
cronosnull committed Jun 5, 2020
1 parent 9c3fbad commit 3449baf
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 6 deletions.
8 changes: 5 additions & 3 deletions celery_spider_cms.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from htcondor_es.utils import get_schedds, get_schedds_from_file

logging.basicConfig(level=os.environ.get("LOGLEVEL", "ERROR"))
__TYPE_HISTORY = "history"
__TYPE_QUEUE = "queue"


def main_driver(args):
Expand All @@ -35,9 +37,9 @@ def main_driver(args):
schedd_ads = get_schedds(args, collectors=args.collectors)
_types = []
if not args.skip_history:
_types.append("history")
_types.append(__TYPE_HISTORY)
if not args.skip_queues:
_types.append("queue")
_types.append(__TYPE_QUEUE)
aff_res = create_affiliation_dir.si().apply_async()
aff_res.get()
res = group(
Expand All @@ -50,7 +52,7 @@ def main_driver(args):
bunch=args.amq_bunch_size,
query_type=_type,
es_index_template=args.es_index_template,
feed_es=args.feed_es,
feed_es=args.feed_es and _type is __TYPE_HISTORY,
)
for _type in _types
for sched in schedd_ads
Expand Down
6 changes: 3 additions & 3 deletions src/htcondor_es/celery/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@
@app.task(
max_retries=3,
autoretry_for=(RuntimeError,), # When a schedd cannot be contacted, retry.
acks_late=True,
retry_backoff=True,
acks_late=True, # Only ack the message when done
retry_backoff=True, # Wait between retries
reject_on_worker_lost=True, # If the worker is killed (e.g. by k8s) reasign the task
)
def query_schedd(
Expand Down Expand Up @@ -178,7 +178,7 @@ def process_docs(
continue
if es_docs:
post_ads_es.si(es_docs, es_index, metadata).apply_async()
return post_ads(converted_docs) if converted_docs else []
return post_ads(converted_docs, metadata=metadata) if converted_docs else []


@app.task(ignore_result=True, queue="es_post")
Expand Down

0 comments on commit 3449baf

Please sign in to comment.