Skip to content

Commit

Permalink
Integrate with AWS Services: MSK Kafka, Secrets Manager, KMS
Browse files Browse the repository at this point in the history
  • Loading branch information
lukaszlacinski committed Sep 18, 2024
1 parent 56409f6 commit f3138d5
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 33 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ mangum==0.17.0
stac-fastapi.types==3.0.1
stac-fastapi.extensions==3.0.1
stac-fastapi.api==3.0.1
confluent-kafka==2.3.0
16 changes: 16 additions & 0 deletions scripts/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM public.ecr.aws/sam/build-python3.10:latest-x86_64

ENV PATH="/sbin:/usr/sbin:${PATH}"

COPY confluent.repo /etc/yum.repos.d/confluent.repo
RUN rpm --import https://packages.confluent.io/rpm/7.0/archive.key && \
yum install -y librdkafka-devel && \
pip install --upgrade pip

ARG USER_ID
ARG GROUP_ID
RUN groupadd -g $GROUP_ID mygroup && \
useradd -m -u $USER_ID -g $GROUP_ID myuser

USER myuser
WORKDIR /var/task
6 changes: 6 additions & 0 deletions scripts/confluent.repo
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[Confluent-Clients]
name=Confluent Clients repository
baseurl=https://packages.confluent.io/clients/rpm/centos/7/x86_64
gpgcheck=1
gpgkey=https://packages.confluent.io/clients/rpm/archive.key
enabled=1
32 changes: 29 additions & 3 deletions src/api_settings.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import os
from utils import get_secret


# ESGF2 Globus Project
project_id = "cae45630-2a4b-47b9-b704-d870e341da67"

# ESGF2 STAC Transaction API client
api = {
# ESGF2 STAC Transaction API service
stac_api = {
"client_id": "6fa3b827-5484-42b9-84db-f00c7a183a6a",
"client_secret": os.environ.get("CLIENT_SECRET"),
"issuer": "https://auth.globus.org",
Expand All @@ -15,8 +17,32 @@
"url": "https://n08bs7a0hc.execute-api.us-east-1.amazonaws.com/dev",
}

# ESGF2 STAC Ingest API client
# ESGF2 STAC Transaction API client
publisher = {
"client_id": "ec5f07c0-7ed8-4f2b-94f2-ddb6f8fc91a3",
"redirect_uri": "https://auth.globus.org/v2/web/auth-code",
}

# ESGF2 Event Stream Service
secretsmanager = {
"region_name": "us-east-1",
"secret_name": os.environ.get("SECRET_NAME"),
}
sasl_secret = get_secret(secretsmanager)

event_stream = {
"config": {
"bootstrap.servers": "b-1.esgf2a.3wk15r.c9.kafka.us-east-1.amazonaws.com:9096,"
"b-2.esgf2a.3wk15r.c9.kafka.us-east-1.amazonaws.com:9096,"
"b-3.esgf2a.3wk15r.c9.kafka.us-east-1.amazonaws.com:9096",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": sasl_secret.get("username"),
"sasl.password": sasl_secret.get("password"),
},
"topic": "esgf2",
}

if os.environ.get("PRODUCER_DEBUG"):
event_stream["config"]["debug"] = "all"
event_stream["config"]["log_level"] = 7
49 changes: 25 additions & 24 deletions src/client.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,16 @@
from typing import Optional, Union
import json
import urllib3
from datetime import datetime
from fastapi import Request, HTTPException, status, Response
from stac_fastapi.types.core import BaseTransactionsClient
from stac_fastapi.types.core import Collection, Item
import api_settings as settings


http = urllib3.PoolManager()
class TransactionClient(BaseTransactionsClient):


def load_access_control_policy(url):
response = http.request("GET", url)
if response.status == 200:
return json.loads(response.data.decode("utf-8"))
else:
return {}


access_control_policy = load_access_control_policy(settings.api.get("access_control_policy"))


class Producer(BaseTransactionsClient):

def __init__(self):
pass
def __init__(self, producer, acl):
self.producer = producer
self.acl = acl

def allowed_groups(self, properties, acp) -> list:
if isinstance(acp, list):
Expand All @@ -48,7 +33,7 @@ def authorize(self, item: Item, event: dict, collection_id: str) -> dict:
raise ValueError("Item collection must match path collection_id")
if getattr(properties, "project", None) != collection_id:
raise ValueError("Item project must match path collection_id")
allowed_groups = self.allowed_groups(properties, access_control_policy)
allowed_groups = self.allowed_groups(properties, self.acl)
print(json.dumps(allowed_groups))
allowed_groups_uuid = [g.get("uuid") for g in allowed_groups]
print(json.dumps(allowed_groups_uuid))
Expand Down Expand Up @@ -141,10 +126,19 @@ async def create_item(
},
}

print(json.dumps(message, default=str).encode("utf-8"))
try:
self.producer.produce(
topic="esgf2",
key=item.id.encode("utf-8"),
value=json.dumps(message, default=str).encode("utf-8"),
)
except Exception as e:
print(f"Error producing message: {e}")
raise HTTPException(status_code=500, detail=str(e))

return Response(
content="Item queued for publication",
status_code=status.HTTP_202_ACCEPTED,
content="Item queued for publication",
)

async def update_item(
Expand Down Expand Up @@ -183,7 +177,11 @@ async def update_item(
},
}

print(json.dumps(message, default=str).encode("utf-8"))
self.producer.produce(
None,
json.dumps(message, default=str).encode("utf-8"),
)

return Response(
content="Item queued for update",
status_code=status.HTTP_202_ACCEPTED,
Expand Down Expand Up @@ -223,7 +221,10 @@ async def delete_item(
},
}

print(json.dumps(message, default=str).encode("utf-8"))
self.producer.produce(
None,
json.dumps(message, default=str).encode("utf-8"),
)
return Response(
content="Item queued for deletion",
status_code=status.HTTP_202_ACCEPTED,
Expand Down
10 changes: 7 additions & 3 deletions src/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@
from mangum import Mangum
from stac_fastapi.extensions.core.transaction import TransactionExtension
from stac_fastapi.types.config import ApiSettings
from client import Producer
from client import TransactionClient
from producer import KafkaProducer
from utils import load_access_control_policy
from api_settings import event_stream, stac_api


app = FastAPI(debug=True)


core_client = Producer()
access_control_policy = load_access_control_policy(url=stac_api.get("access_control_policy"))
producer = KafkaProducer(config=event_stream.get("config"))
core_client = TransactionClient(producer=producer, acl=access_control_policy)

settings = ApiSettings(
api_title="STAC Transaction API",
Expand Down
36 changes: 33 additions & 3 deletions src/producer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,35 @@
import json
from abc import ABC, abstractmethod
import attr
from confluent_kafka import Producer


def stdout(message):
print(json.dumps(message))
@attr.s
class BaseProducer(ABC):
@abstractmethod
def produce(self, topic, message):
pass


class StdoutProducer(BaseProducer):
def produce(self, topic, message):
print(message)


class KafkaProducer(BaseProducer):
def __init__(self, config):
self.producer = Producer(config)
print("KafkaProducer initialized")

def produce(self, topic, key, value):
delivery_reports = []

def delivery_report(err, msg):
if err is not None:
print(f"Delivery failed for message {msg.key()}: {err}")
else:
print(f"Message {msg.key()} successfully delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
delivery_reports.append((err, msg))

self.producer.produce(topic=topic, key=key, value=value, callback=delivery_report)
self.producer.flush()
return delivery_reports
28 changes: 28 additions & 0 deletions src/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import json
import boto3
import urllib3


def get_secret(config):
client = boto3.client("secretsmanager", region_name=config.get("region_name"))
try:
response = client.get_secret_value(SecretId=config.get("secret_name"))
if "SecretString" in response:
secret = response["SecretString"]
else:
secret = response["SecretBinary"]
secret_dict = json.loads(secret)
return secret_dict
except Exception as e:
print(f"Error retrieving secret: {e}")
raise e


def load_access_control_policy(url):
http = urllib3.PoolManager()
response = http.request("GET", url)
if response.status == 200:
print("Access Control Policy loaded")
return json.loads(response.data.decode("utf-8"))
else:
return {}

0 comments on commit f3138d5

Please sign in to comment.