diff --git a/src/htcondor_es/celery/tasks.py b/src/htcondor_es/celery/tasks.py index 1b61f35..4bf306c 100644 --- a/src/htcondor_es/celery/tasks.py +++ b/src/htcondor_es/celery/tasks.py @@ -111,6 +111,7 @@ def query_schedd( feed_es=feed_es, es_index=es_index_template, metadata={"spider_source": f"condor_{query_type}"}, + start_time=start_time, ) if query_type == "history": getRedisConnection().set(schedd_ad["name"], hist_time) @@ -132,6 +133,7 @@ def process_docs( es_index="cms-test-k8s", metadata=None, feed_es_only_completed=True, + start_time=None, ): """ process the documents to a suitable format, @@ -152,7 +154,11 @@ def process_docs( for doc in docs: try: c_doc = convert_to_json( - doc, return_dict=True, reduce_data=reduce_data, pool_name=pool_name + doc, + return_dict=True, + reduce_data=reduce_data, + pool_name=pool_name, + start_time=start_time, ) if c_doc: converted_docs.append( @@ -249,6 +255,7 @@ def send_data( feed_es=False, es_index="cms-test", metadata=None, + start_time=None, ): """ Send the data to AMQ and, optionally, to ES. @@ -274,11 +281,12 @@ def send_data( feed_es=feed_es, es_index=es_index, metadata=metadata, + start_time=start_time, ) for X in grouper(docs_bunch, chunk_size) ] - total_tasks += len(list(filter(None,docs_bunch))) - #responses.append(process_and_send.apply_async(serializer="pickle")) + total_tasks += len(list(filter(None, docs_bunch))) + # responses.append(process_and_send.apply_async(serializer="pickle")) return total_tasks