Skip to content

Commit

Permalink
Adds new mapping tasks to Stanford and Cornell DAGs
Browse files Browse the repository at this point in the history
  • Loading branch information
jermnelson committed Dec 7, 2021
1 parent 55990a0 commit c152527
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 85 deletions.
129 changes: 83 additions & 46 deletions ils_middleware/dags/cornell.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,30 @@
"""DAG for Cornell University Libraries."""
import logging
from datetime import datetime, timedelta

from ils_middleware.tasks.folio.map import map_to_folio
from ils_middleware.tasks.amazon.sqs import SubscribeOperator
from ils_middleware.tasks.sinopia.local_metadata import new_local_admin_metadata
from ils_middleware.tasks.sinopia.login import sinopia_login
from ils_middleware.tasks.folio.request import FolioRequest
from ils_middleware.tasks.folio.login import FolioLogin

from airflow import DAG
from airflow.models import Variable
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup

from ils_middleware.tasks.amazon.sqs import SubscribeOperator, parse_messages
from ils_middleware.tasks.folio.login import FolioLogin
from ils_middleware.tasks.folio.graph import construct_graph
from ils_middleware.tasks.folio.map import FOLIO_FIELDS, map_to_folio
from ils_middleware.tasks.sinopia.local_metadata import new_local_admin_metadata
from ils_middleware.tasks.sinopia.login import sinopia_login
from ils_middleware.tasks.sinopia.email import (
notify_and_log,
send_update_success_emails,
)


def task_failure_callback(ctx_dict) -> None:
notify_and_log("Error executing task", ctx_dict)


def dag_failure_callback(ctx_dict) -> None:
notify_and_log("Error executing DAG", ctx_dict)


default_args = {
Expand All @@ -35,21 +48,14 @@
catchup=False,
) as dag:

# Monitors SNS for Cornell topic
# Monitors Cornell's SQS ILS
listen_sns = SubscribeOperator(queue="cornell-ils")

# Maps Documents from URLs in the SNS Message to FOLIO JSON
map_sinopia_to_inventory_records = PythonOperator(
task_id="folio_map",
python_callable=map_to_folio,
op_kwargs={"url": "http://example-instance.sinopia.io"},
process_message = PythonOperator(
task_id="sqs-message-parse",
python_callable=parse_messages,
)

logging.info(
"POST to Okapi's /inventory/items with map_sinopia_to_inventory_records result"
)
connect_okapi_cmd = """exit 0"""

folio_login = PythonOperator(
task_id="folio-login",
python_callable=FolioLogin,
Expand All @@ -60,34 +66,65 @@
},
)

send_to_folio = FolioRequest(
task_id="cornell_send_to_folio",
tenant="cornell",
token="{{ task_instance.xcom_pull(key='return_value', task_ids=['folio_login'])[0]}}",
endpoint="",
)
bf_graphs = PythonOperator(task_id="bf-graph", python_callable=construct_graph)

# Sinopia Login
login_sinopia = PythonOperator(
task_id="sinopia-login",
python_callable=sinopia_login,
op_kwargs={
"region": "us-west-1",
"sinopia_env": Variable.get("sinopia_env"),
},
)
with TaskGroup(group_id="folio_mapping") as folio_map_task_group:
for folio_field in FOLIO_FIELDS:
bf_to_folio = PythonOperator(
task_id=f"{folio_field}_task",
python_callable=map_to_folio,
op_kwargs={"folio_field": folio_field},
)

# Adds localAdminMetadata
local_admin_metadata = PythonOperator(
task_id="sinopia-new-metadata",
python_callable=new_local_admin_metadata,
op_kwargs={
"jwt": "{{ task_instance.xcom_pull(task_ids='update_sinopia.sinopia-login', key='return_value') }}",
"resource": "{{ task_instance.xcom_pull(task_ids='sqs-message-parse', key='resource') }}",
"instance_uri": "{{ task_instance.xcom_pull(task_ids='sqs-message-parse', key='resource_uri') }}",
"ils_identifiers": {
"folio": "{{ task_instance.xcom_pull(task_ids='send_to_folio', key='return_value') }}"
with TaskGroup(group_id="update_sinopia") as sinopia_update_group:

# Sinopia Login
login_sinopia = PythonOperator(
task_id="sinopia-login",
python_callable=sinopia_login,
op_kwargs={
"region": "us-west-2",
"sinopia_env": Variable.get("sinopia_env"),
},
},
)

# Adds localAdminMetadata
local_admin_metadata = PythonOperator(
task_id="sinopia-new-metadata",
python_callable=new_local_admin_metadata,
op_kwargs={
"jwt": "{{ task_instance.xcom_pull(task_ids='update_sinopia.sinopia-login', key='return_value') }}",
"ils_tasks": {
"SIRSI": [
"process_symphony.post_new_symphony",
"process_symphony.post_overlay_symphony",
]
},
},
)

login_sinopia >> local_admin_metadata

notify_sinopia_updated = PythonOperator(
task_id="sinopia_update_success_notification",
dag=dag,
trigger_rule="none_failed",
python_callable=send_update_success_emails,
)
listen_sns >> map_sinopia_to_inventory_records >> login_sinopia >> local_admin_metadata

# Dummy Operators
messages_received = DummyOperator(task_id="messages_received", dag=dag)
messages_timeout = DummyOperator(task_id="sqs_timeout", dag=dag)
processing_complete = DummyOperator(task_id="processing_complete", dag=dag)
processed_sinopia = DummyOperator(
task_id="processed_sinopia", dag=dag, trigger_rule="none_failed"
)


listen_sns >> [messages_received, messages_timeout]
messages_received >> process_message
process_message >> [folio_login, bf_graphs] >> folio_map_task_group
folio_map_task_group >> processed_sinopia >> sinopia_update_group
sinopia_update_group >> notify_sinopia_updated
notify_sinopia_updated >> processing_complete
messages_timeout >> processing_complete
17 changes: 12 additions & 5 deletions ils_middleware/dags/stanford.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from ils_middleware.tasks.amazon.s3 import get_from_s3, send_to_s3
from ils_middleware.tasks.amazon.sqs import SubscribeOperator, parse_messages

from ils_middleware.tasks.sinopia.local_metadata import new_local_admin_metadata
from ils_middleware.tasks.sinopia.email import (
notify_and_log,
Expand All @@ -21,6 +22,8 @@
from ils_middleware.tasks.symphony.mod_json import to_symphony_json
from ils_middleware.tasks.symphony.overlay import overlay_marc_in_symphony
from ils_middleware.tasks.folio.login import FolioLogin
from ils_middleware.tasks.folio.graph import construct_graph
from ils_middleware.tasks.folio.map import FOLIO_FIELDS, map_to_folio


def task_failure_callback(ctx_dict) -> None:
Expand Down Expand Up @@ -164,13 +167,17 @@ def dag_failure_callback(ctx_dict) -> None:
},
)

download_folio_marc = DummyOperator(task_id="download_folio_marc", dag=dag)

export_folio_json = DummyOperator(task_id="folio_json_to_s3", dag=dag)
bf_graphs = PythonOperator(task_id="bf-graph", python_callable=construct_graph)

send_to_folio = DummyOperator(task_id="folio_send", dag=dag)
with TaskGroup(group_id="folio_mapping") as folio_map_task_group:
for folio_field in FOLIO_FIELDS:
bf_to_folio = PythonOperator(
task_id=f"{folio_field}_task",
python_callable=map_to_folio,
op_kwargs={"folio_field": folio_field},
)

folio_login >> download_folio_marc >> export_folio_json >> send_to_folio
folio_login >> bf_graphs >> folio_map_task_group

# Dummy Operator
processed_sinopia = DummyOperator(
Expand Down
34 changes: 0 additions & 34 deletions ils_middleware/tasks/folio/request.py

This file was deleted.

0 comments on commit c152527

Please sign in to comment.