Skip to content

Commit

Permalink
[Celery]! Use the start time.
Browse files Browse the repository at this point in the history
  • Loading branch information
cronosnull committed Jun 5, 2020
1 parent d124162 commit 9c3fbad
Showing 1 changed file with 11 additions and 3 deletions.
14 changes: 11 additions & 3 deletions src/htcondor_es/celery/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ def query_schedd(
feed_es=feed_es,
es_index=es_index_template,
metadata={"spider_source": f"condor_{query_type}"},
start_time=start_time,
)
if query_type == "history":
getRedisConnection().set(schedd_ad["name"], hist_time)
Expand All @@ -132,6 +133,7 @@ def process_docs(
es_index="cms-test-k8s",
metadata=None,
feed_es_only_completed=True,
start_time=None,
):
"""
process the documents to a suitable format,
Expand All @@ -152,7 +154,11 @@ def process_docs(
for doc in docs:
try:
c_doc = convert_to_json(
doc, return_dict=True, reduce_data=reduce_data, pool_name=pool_name
doc,
return_dict=True,
reduce_data=reduce_data,
pool_name=pool_name,
start_time=start_time,
)
if c_doc:
converted_docs.append(
Expand Down Expand Up @@ -249,6 +255,7 @@ def send_data(
feed_es=False,
es_index="cms-test",
metadata=None,
start_time=None,
):
"""
Send the data to AMQ and, optionally, to ES.
Expand All @@ -274,11 +281,12 @@ def send_data(
feed_es=feed_es,
es_index=es_index,
metadata=metadata,
start_time=start_time,
)
for X in grouper(docs_bunch, chunk_size)
]
total_tasks += len(list(filter(None,docs_bunch)))
#responses.append(process_and_send.apply_async(serializer="pickle"))
total_tasks += len(list(filter(None, docs_bunch)))
# responses.append(process_and_send.apply_async(serializer="pickle"))
return total_tasks


Expand Down

0 comments on commit 9c3fbad

Please sign in to comment.