Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP]Celery spider #156

Draft
wants to merge 37 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
275a351
Autoflake
cronosnull Jul 22, 2019
f5570e1
Proof of concept
cronosnull Aug 21, 2019
92c8632
Update the docker-compose file with the user
cronosnull Feb 3, 2020
1417ec1
Working POC after the rebase
cronosnull Feb 4, 2020
00d8bf0
[Celery] History queries, main file (WIP)
cronosnull Feb 5, 2020
b7ac271
[Celery][Houskeeping]
cronosnull Feb 5, 2020
52faeb5
[Celery][ES] Adding the support to ES to the celery version
cronosnull Feb 7, 2020
060329d
[Celery]Explicitly setting the serializer
cronosnull Feb 11, 2020
17e2935
[Celery][Docker] Updating names to avoid problems with k8s
cronosnull Feb 12, 2020
c79f382
First version of the k8s manifest
cronosnull Feb 13, 2020
e5f1855
Spliting the k8s manifest
cronosnull Feb 28, 2020
5f29a19
[Celery]fix for the cron k8s definition
cronosnull Mar 19, 2020
cc2e839
[Celery] change the create affiliation to be a task
cronosnull May 28, 2020
1407d99
[Spider][k8s]Environment cleanup in the affiliation cron
cronosnull Jun 2, 2020
376fb10
[k8s][Celery] Affiliation manager created the first time is run
cronosnull Jun 2, 2020
c8d7fb3
[k8s] Change the required resources
cronosnull Jun 2, 2020
4350adc
[k8s] imagePullPolicy always
cronosnull Jun 2, 2020
9075e8a
[Celery]Bug Fix
cronosnull Jun 2, 2020
aa5d280
[Celery]Enabling task compression
cronosnull Jun 3, 2020
ee293e4
[k8s] Trying the 32bit redis to decrease memory requirements
cronosnull Jun 3, 2020
d051064
[Celery][k8s] Retry on worker kill
cronosnull Jun 4, 2020
74e63fa
[Celery][Flower] Using the worker image as flower base
cronosnull Jun 4, 2020
38c656a
[Celery]Using multiple queues
cronosnull Jun 4, 2020
6ca7af9
[k8s][Celery] updating the k8s flower deployment
cronosnull Jun 4, 2020
d124162
[celery] No over parallelism
cronosnull Jun 4, 2020
9c3fbad
[Celery]! Use the start time.
cronosnull Jun 5, 2020
3449baf
[Celery]Only send to ES data from the history
cronosnull Jun 5, 2020
83996f2
[Celery]Fixing the most common key error
cronosnull Jun 9, 2020
e65cbfd
[Celery] Test parameters
cronosnull Jun 11, 2020
213621f
[ConvertToJson] bugfix GLIDEIN Memory
cronosnull Jun 11, 2020
d2847ba
[amq] Use logging instead of print
cronosnull Jun 11, 2020
3f682e3
[Celery] Test parameters
cronosnull Jun 11, 2020
c52f534
[Celery] Documentation
cronosnull Jun 18, 2020
05e0521
[Celery] Adding metadata and remove unused imports
cronosnull Jun 19, 2020
88c9d7b
[Celery] Log schedd query errors
cronosnull Jun 19, 2020
4d7107d
Adding the schema to the repository root
cronosnull Jun 24, 2020
73eda00
[Celery] solve logging bug
cronosnull Jun 25, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ username
password
venv
*.pyc

.venv/
10 changes: 5 additions & 5 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
FROM cern/cc7-base:20190724
COPY ./requirements.txt /cms-htcondor-es/requirements.txt
RUN yum install -y git sudo python36 python36-virtualenv python36-pip && \
RUN yum install -y git python36 python36-virtualenv python36-pip && \
ln -fs /usr/bin/python3 /usr/bin/python && \
ln -fs /usr/bin/pip3.6 /usr/bin/pip &&\
ln -fs /usr/bin/pip3.6 /usr/bin/pip && \
pip install -r /cms-htcondor-es/requirements.txt
ENV PYTHONPATH "${PYTHONPATH}:/cms-htcondor-es/src"
ENV REQUESTS_CA_BUNDLE "/etc/pki/ca-trust/extracted/openssl/ca-bundle.trust.crt"

# Install latest kubectl for using with crons
RUN curl -k -O -L https://storage.googleapis.com/kubernetes-release/release/$(curl -s https://storage.googleapis.com/kubernetes-release/release/stable.txt)/bin/linux/amd64/kubectl
RUN mv kubectl /usr/bin
RUN chmod +x /usr/bin/kubectl
RUN curl -k -O -L https://storage.googleapis.com/kubernetes-release/release/$(curl -s https://storage.googleapis.com/kubernetes-release/release/stable.txt)/bin/linux/amd64/kubectl && \
mv kubectl /usr/bin && \
chmod +x /usr/bin/kubectl

# copy spider codebase
COPY . /cms-htcondor-es
Expand Down
661 changes: 661 additions & 0 deletions JobMonitoring.json

Large diffs are not rendered by default.

189 changes: 189 additions & 0 deletions celery_spider_cms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
# -*- coding: utf-8 -*-
# Author: Christian Ariza <christian.ariza AT gmail [DOT] com>
"""
Celery version of the cms htcondor_es spider.
This version has some major changes:
- Celery based
- The same function is used for either the queues and
history queries.
- The history checkpoint for the schedds is stored in Redis instead of a json file.
- Parallelism is managed by celery
"""
import os
import logging
import argparse
import time
import traceback
from celery import group
from htcondor_es.celery.tasks import query_schedd, create_affiliation_dir
from htcondor_es.celery.celery import app
from htcondor_es.utils import get_schedds, get_schedds_from_file

logging.basicConfig(level=os.environ.get("LOGLEVEL", "ERROR"))
__TYPE_HISTORY = "history"
__TYPE_QUEUE = "queue"


def main_driver(args):
logging.debug(app.conf.humanize(with_defaults=False))
schedd_ads = []
start_time = time.time()
if args.collectors_file:
schedd_ads = get_schedds_from_file(args, collectors_file=args.collectors_file)
del (
args.collectors_file
) # sending a file through postprocessing will cause problems.
else:
schedd_ads = get_schedds(args, collectors=args.collectors)
_types = []
if not args.skip_history:
_types.append(__TYPE_HISTORY)
if not args.skip_queues:
_types.append(__TYPE_QUEUE)
aff_res = create_affiliation_dir.si().apply_async()
aff_res.get()
res = group(
query_schedd.si(
sched,
dry_run=args.dry_run,
start_time=start_time,
keep_full_queue_data=args.keep_full_queue_data,
chunk_size=args.query_queue_batch_size,
bunch=args.amq_bunch_size,
query_type=_type,
es_index_template=args.es_index_template,
feed_es=args.feed_es and _type is __TYPE_HISTORY,
)
for _type in _types
for sched in schedd_ads
).apply_async()
# Use the get to wait for results
# We could also chain it to a chord to process the responses
# for logging pourposes.
# The propagate false will prevent it to raise
# an exception if any of the schedds query failed.
_query_res = res.get(propagate=False)
logging.debug(_query_res)
if res.failed():
logging.warning("At least one of the schedd queries failed")
print(time.time() - start_time)


def main():
"""
Main method for the spider_cms script.
Parses arguments and invokes main_driver
"""
parser = argparse.ArgumentParser()
parser.add_argument(
"--schedd_filter",
default="",
type=str,
dest="schedd_filter",
help=(
"Comma separated list of schedd names to process "
"[default is to process all]"
),
)
parser.add_argument(
"--skip_history",
action="store_true",
dest="skip_history",
help="Skip processing the history. (Only do queues.)",
)
parser.add_argument(
"--skip_queues",
action="store_true",
dest="skip_queues",
help="Skip processing the queues. (Only do history.)",
)
parser.add_argument(
"--read_only",
action="store_true",
dest="read_only",
help="Only read the info, don't submit it.",
)
parser.add_argument(
"--dry_run",
action="store_true",
dest="dry_run",
help=(
"Don't even read info, just pretend to. (Still "
"query the collector for the schedd's though.)"
),
)
parser.add_argument(
"--max_documents_to_process",
default=0,
type=int,
dest="max_documents_to_process",
help=(
"Abort after this many documents (per schedd). "
"[default: %(default)d (process all)]"
),
)
parser.add_argument(
"--keep_full_queue_data",
action="store_true",
dest="keep_full_queue_data",
help="Drop all but some fields for running jobs.",
)
parser.add_argument(
"--amq_bunch_size",
default=5000,
type=int,
dest="amq_bunch_size",
help=("Send docs to AMQ in bunches of this number " "[default: %(default)d]"),
)

parser.add_argument(
"--query_queue_batch_size",
default=500,
type=int,
dest="query_queue_batch_size",
help=(
"Send docs to listener in batches of this number " "[default: %(default)d]"
),
)
parser.add_argument(
"--collectors",
default=[],
action="append",
dest="collectors",
help="Collectors' addresses",
)
parser.add_argument(
"--collectors_file",
default=os.getenv("COLLECTORS_FILE_LOCATION", None),
action="store",
type=argparse.FileType("r"),
dest="collectors_file",
help="FIle defining the pools and collectors",
)
parser.add_argument(
"--es_index_template",
default="cms-test-k8s",
type=str,
dest="es_index_template",
help=(
"Trunk of index pattern. "
"Needs to start with 'cms' "
"[default: %(default)s]"
),
)
parser.add_argument(
"--feed_es",
action="store_true",
dest="feed_es",
help="Send data also to the es-cms ES instance",
)
args = parser.parse_args()

# --dry_run implies read_only
args.read_only = args.read_only or args.dry_run

main_driver(args)


if __name__ == "__main__":
main()
3 changes: 2 additions & 1 deletion cronAffiliation.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ export PYTHONPATH="$DIR/src/:$DIR/../CMSMonitoring/src/python:$PYTHONPATH"
source "$DIR/venv/bin/activate"
export REQUESTS_CA_BUNDLE="/etc/pki/ca-trust/extracted/openssl/ca-bundle.trust.crt"
create=$(cat <<EOF
import os
from htcondor_es.AffiliationManager import AffiliationManager, AffiliationManagerException
try:
AffiliationManager(recreate_older_days=1)
AffiliationManager(recreate_older_days=1, dir_file=os.getenv('AFFILIATION_DIR_LOCATION', AffiliationManager._AffiliationManager__DEFAULT_DIR_PATH))
except AffiliationManagerException as e:
import traceback
traceback.print_exc()
Expand Down
Binary file added doc/celery-spider/img/celery_spider.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
47 changes: 47 additions & 0 deletions doc/celery-spider/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Celery HTCondor Spider

![celery spider](img/celery_spider.png)

The celery version of the htcondor-spider is designed for horizontal scalability and maintainability. The [celery spider docker image](https://hub.docker.com/r/cronosnull/cms_htcondor_es/tags) is based on this repository. The image has a size around 420MB.

## Components

### spider cronjob

Queries the collectors for the schedd names and triggers the execution of the spider.

### Redis and Redis-checkpoint

Redis is used as the celery message queue and results store. We also use another Redis instance, `redis-checkpoint`, to store the last execution time of the history query on each schedd.

### Spider worker

The celery workers that will execute the tasks.

### Flower

Celery monitoring interface. It allow us to see the amount of queue messages.

### Shared spider volume

Persistent shared storage, it is used to store the affiliation directory cache.

# Secrets



- collectors: A json file with the list of collector for each pool
- AMQ credentials
- amq-password
- amq-username
- ElasticSearch credentials: es-conf



# Environment Variables

Additional to the environment variables defining the location of the secrets, this environment variables need to be set according to the execution environment.

- CMS_HTCONDOR_BROKER
- CMS_HTCONDOR_PRODUCER
- CMS_HTCONDOR_TOPIC
92 changes: 92 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
version: "3.4"
services:
spider-worker:
build: .
image: spider-worker
user: 1414:1414
volumes:
- shared_spider:/cms_shared
depends_on:
- redis
- redis_checkpoint
restart: 'no'
environment: &env
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
- SPIDER_CHECKPOINT=redis://redis_checkpoint:6379/0
- AFFILIATION_DIR_LOCATION=/cms_shared/affiliation_dir.json
- COLLECTORS_FILE_LOCATION=/run/secrets/collectors.json
- PYTHONPATH=/cms-htcondor-es/src:$$PYTHONPATH
- CMS_HTCONDOR_PRODUCER=condor-test
- CMS_HTCONDOR_TOPIC=/topic/cms.jobmon.condor
- CMS_HTCONDOR_BROKER=cms-test-mb.cern.ch
- C_FORCE_ROOT="true"
- CELERY_TEST="true"
- CMS_AMQ_USERNAME_FILE=/run/secrets/amq-username
- CMS_AMQ_PASSWORD_FILE=/run/secrets/amq-password
- CMS_ES_CONF_FILE=/run/secrets/es-conf
- REQUESTS_CA_BUNDLE=/etc/pki/ca-trust/extracted/openssl/ca-bundle.trust.crt
- LOGLEVEL=DEBUG
entrypoint: ['celery', "-A", "htcondor_es.celery.celery", "worker","-Q","celery,es_post,convert"]
secrets: &secrets
- amq-username
- amq-password
- es-conf
- source: collectors
target: collectors.json
depends_on:
- redis_checkpoint
- redis
spider-app:
image: spider-worker
user: 1414:1414
depends_on:
- spider-worker
volumes:
- shared_spider:/cms_shared
secrets: *secrets

restart: 'no'
environment: *env
# entrypoint: ["python","/cms-htcondor-es/tests/celery_test.py"]
entrypoint: ["python","/cms-htcondor-es/celery_spider_cms.py","--feed_es","--query_queue_batch_size","1000", "--amq_bunch_size", "500"]
depends_on:
- spider-worker


redis:
image: redis:alpine
restart: 'unless-stopped'
volumes:
- $HOME/tmp/redis_data:/data

redis_checkpoint:
image: redis:alpine
entrypoint: redis-server --appendonly yes
restart: always
volumes:
- $HOME/tmp/redis_cp_data:/data

spider_flower:
image: spider-worker
command: ["celery","flower","-A", "htcondor_es.celery.celery", "--broker=redis://redis:6379/0", "--port=8888"]
ports:
- 8888:8888
depends_on:
- spider-worker
volumes:
shared_spider:
driver: local
driver_opts:
o: bind
device: $HOME/tmp/shared_spider
type: none
secrets:
collectors:
file: ./etc/collectors.json
amq-username:
file: ./username
amq-password:
file: ./password
es-conf:
file: ./es.conf
19 changes: 19 additions & 0 deletions k8s/accounts/spider-accounts.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
## ACCOUNTS
apiVersion: v1
kind: ServiceAccount
metadata:
namespace: spider
name: spider-account
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
namespace: spider
name: spider-account
subjects:
- kind: ServiceAccount
name: spider-account
roleRef:
kind: ClusterRole
name: edit
apiGroup: rbac.authorization.k8s.io
Loading