Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get it going #11

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions obsplan.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@

CREATE TABLE "ObsPlan" (
t_planning DOUBLE PRECISION NOT NULL,
target_name VARCHAR,
obs_id VARCHAR NOT NULL,
obs_collection VARCHAR,
s_ra DOUBLE PRECISION,
s_dec DOUBLE PRECISION,
s_fov DOUBLE PRECISION,
s_region VARCHAR,
s_resolution DOUBLE PRECISION,
t_min DOUBLE PRECISION NOT NULL,
t_max DOUBLE PRECISION NOT NULL,
t_exptime DOUBLE PRECISION NOT NULL,
t_resolution DOUBLE PRECISION,
em_min DOUBLE PRECISION,
em_max DOUBLE PRECISION,
em_res_power DOUBLE PRECISION,
o_ucd VARCHAR,
pol_states VARCHAR,
pol_xel INTEGER,
facility_name VARCHAR NOT NULL,
instrument_name VARCHAR,
t_plan_exptime DOUBLE PRECISION,
category VARCHAR NOT NULL,
priority INTEGER NOT NULL,
execution_status VARCHAR NOT NULL,
tracking_type VARCHAR NOT NULL,
rubin_rot_sky_pos FLOAT,
rubin_nexp INTEGER
)

;
COMMENT ON TABLE "ObsPlan" IS 'Metadata in the ObsLocTap relational realization of the IVOA ObsLocTap data model';
COMMENT ON COLUMN "ObsPlan".t_planning IS 'Time in MJD when this observation has been added or modified into the planning log';
COMMENT ON COLUMN "ObsPlan".target_name IS 'Astronomical object observed, if any';
COMMENT ON COLUMN "ObsPlan".obs_id IS 'Observation ID';
COMMENT ON COLUMN "ObsPlan".obs_collection IS 'Name of the data collection';
COMMENT ON COLUMN "ObsPlan".s_ra IS 'Central right ascension, ICRS';
COMMENT ON COLUMN "ObsPlan".s_dec IS 'Central declination, ICRS';
COMMENT ON COLUMN "ObsPlan".s_fov IS 'Diameter (bounds) of the covered region';
COMMENT ON COLUMN "ObsPlan".s_region IS 'Sky region covered by the data product (expressed in ICRS frame)';
COMMENT ON COLUMN "ObsPlan".s_resolution IS 'Spatial resolution of data as FWHM';
COMMENT ON COLUMN "ObsPlan".t_min IS 'Start time in MJD';
COMMENT ON COLUMN "ObsPlan".t_max IS 'Stop time in MJD';
COMMENT ON COLUMN "ObsPlan".t_exptime IS 'Total exposure time';
COMMENT ON COLUMN "ObsPlan".t_resolution IS 'Temporal resolution FWHM';
COMMENT ON COLUMN "ObsPlan".em_min IS 'Start in spectral coordinates';
COMMENT ON COLUMN "ObsPlan".em_max IS 'Stop in spectral coordinates';
COMMENT ON COLUMN "ObsPlan".em_res_power IS 'Spectral resolving power';
COMMENT ON COLUMN "ObsPlan".o_ucd IS 'UCD of observable (e.g., phot.flux.density, phot.count, etc.)';
COMMENT ON COLUMN "ObsPlan".pol_states IS 'List of polarization states or NULL if not applicable';
COMMENT ON COLUMN "ObsPlan".pol_xel IS 'Number of polarization samples';
COMMENT ON COLUMN "ObsPlan".facility_name IS 'Name of the facility used for this observation';
COMMENT ON COLUMN "ObsPlan".instrument_name IS 'Name of the instrument used for this observation';
COMMENT ON COLUMN "ObsPlan".t_plan_exptime IS 'Planned or scheduled exposure time';
COMMENT ON COLUMN "ObsPlan".category IS 'Observation category. One of the following values\\u0003A Fixed, Coordinated, Window, Other';
COMMENT ON COLUMN "ObsPlan".priority IS 'Priority level { 0, 1, 2}';
COMMENT ON COLUMN "ObsPlan".execution_status IS 'One of the following values\\u0003A Planned, Scheduled, Unscheduled, Performed, Aborted';
COMMENT ON COLUMN "ObsPlan".tracking_type IS 'One of the following values\\u0003A Sidereal, Solar-system-object-tracking, Fixed-az-el-transit';
COMMENT ON COLUMN "ObsPlan".rubin_rot_sky_pos IS 'From scheduler rotation angle Not in the IVOA ObsPlan table';
COMMENT ON COLUMN "ObsPlan".rubin_nexp IS 'number of exposures to take usually 1 or 2 Not in the IVOA ObsPlan table';
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ disallow_untyped_defs = true
disallow_incomplete_defs = true
ignore_missing_imports = true
local_partial_types = true
plugins = ["pydantic.mypy"]
no_implicit_reexport = true
show_error_codes = true
strict_equality = true
Expand Down
2 changes: 2 additions & 0 deletions requirements/dev.in
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

-c main.txt

felis
asgi-lifespan
coverage
httpx
Expand All @@ -18,3 +19,4 @@ pydantic
pytest
pytest-asyncio
pytest-cov
greenlet
535 changes: 312 additions & 223 deletions requirements/dev.txt

Large diffs are not rendered by default.

14 changes: 12 additions & 2 deletions requirements/main.in
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@ starlette
uvicorn

# Other dependencies.
safir>=3.4.0
lsst-efd-client
pandas
safir[db]>=3.4.0
requests
aiokafka
astro_metadata_translator
lsst.resources
sqlalchemy
asyncpg
pydantic_settings
#urllib3<1.27
#pydantic<2.0.0
#botocore<1.32.0
# this causes lot of whacky errors lsst-efd-client
1,866 changes: 833 additions & 1,033 deletions requirements/main.txt

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions scripts/local-postgress.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
LC_ALL="C" /opt/homebrew/opt/postgresql@15/bin/postgres -D /opt/homebrew/var/postgresql@15

45 changes: 44 additions & 1 deletion src/obsloctap/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

from __future__ import annotations

from pydantic import BaseSettings, Field
from pydantic import Field
from pydantic_settings import BaseSettings
from safir.logging import LogLevel, Profile

__all__ = ["Configuration", "config"]
Expand Down Expand Up @@ -35,6 +36,48 @@ class Configuration(BaseSettings):
env="SAFIR_LOG_LEVEL",
)

database: str = Field(
"",
title="postgres database name",
env="database",
)

database_url: str = Field(
"",
title="URL for postgres database",
env="database_url",
)

database_user: str = Field(
"",
title="user for postgres database",
env="database_user",
)

database_password: str = Field(
"",
title="password for postgres database",
env="database_password",
)

database_schema: str = Field(
"obsloctap",
title="schema for postgres database",
env="database_schema",
)

obsplanLimit: int = Field(
1000,
title="Limit to use on obsplan query",
env="obsplanLimit",
)

obsplanTimeSpan: int = Field(
24,
title="Time to look back in obsplan query with time in hours",
env="obsplanTimeSpan",
)


config = Configuration()
"""Configuration for obsloctap."""
222 changes: 222 additions & 0 deletions src/obsloctap/db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
"""Helper for the efd -so it may be mocked in test."""

__all__ = ["DbHelp", "DbHelpProvider", "OBSPLAN_FIELDS"]

import logging
import os
from io import StringIO
from typing import Any, Sequence

from pandas import Timedelta, Timestamp
from sqlalchemy import Row, text
from sqlalchemy.ext.asyncio import (
AsyncEngine,
AsyncSession,
create_async_engine,
)

from obsloctap.models import Obsplan

from .config import Configuration

OBSPLAN_FIELDS = [
"t_planning", # DOUBLE PRECISION NOT NULL,
"target_name", # VARCHAR,
"obs_id", # VARCHAR NOT NULL,
"obs_collection", # VARCHAR,
"s_ra", # DOUBLE PRECISION,
"s_dec", # DOUBLE PRECISION,
"s_fov", # DOUBLE PRECISION,
"s_region", # VARCHAR,
"s_resolution", # DOUBLE PRECISION,
"t_min", # DOUBLE PRECISION NOT NULL,
"t_max", # DOUBLE PRECISION NOT NULL,
"t_exptime", # DOUBLE PRECISION NOT NULL,
"t_resolution", # DOUBLE PRECISION,
"em_min", # DOUBLE PRECISION,
"em_max", # DOUBLE PRECISION,
"em_res_power", # DOUBLE PRECISION,
"o_ucd", # VARCHAR,
"pol_states", # VARCHAR,
"pol_xel", # INTEGER,
"facility_name", # VARCHAR NOT NULL,
"instrument_name", # VARCHAR,
"t_plan_exptime", # DOUBLE PRECISION,
"category", # VARCHAR NOT NULL,
"priority", # INTEGER NOT NULL,
"execution_status", # VARCHAR NOT NULL,
"tracking_type", # VARCHAR NOT NULL,
"rubin_rot_sky_pos", # FLOAT,
"rubin_nexp", # INTEGER
]

# Configure logging
log = logging.getLogger(__name__)
handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s [%(name)-12s] %(levelname)-8s %(message)s"
)
handler.setFormatter(formatter)
log.addHandler(handler)
if "LOG_LEVEL" in os.environ:
log.setLevel(os.environ["LOG_LEVEL"].upper())
else:
log.setLevel("DEBUG")


class DbHelp:
def __init__(self, engine: AsyncEngine | None) -> None:
"""
Setup helper with the sqlclient client passed in.

Parameters
----------
engine : SqlAlchemy Engine

:return: None
"""
self.engine = engine
self.schema = ""
self.insert_fields = ",".join(OBSPLAN_FIELDS)

def process(self, result: Sequence[Row[Any]]) -> list[Obsplan]:
"""
Process the result of the query.

:type self: DbHelp
:type result: Obsplan[]
:return: list[Obsplan]
"""
obslist = list[Obsplan]()
for o in result:
obs = Obsplan()
for c, key in enumerate(OBSPLAN_FIELDS):
setattr(obs, key, o[c])
obslist.append(obs)
return obslist

async def get_schedule(self, time: float = 0) -> list[Obsplan]:
"""Return the latest schedule item from the DB.
We should consider how much that is.. 24 hours worth?
if time is zero we just take top obsplanLimit rows."""

config = Configuration()

whereclause = ""
if time != 0:
window = Timestamp.now() - Timedelta(hours=12)
whereclause = f" where t_planning > {window.to_julian_date()}"

statement = (
f"select {self.insert_fields} from "
f'{self.schema}."{Obsplan.__tablename__}"'
f" {whereclause}"
f" order by t_planning limit {config.obsplanLimit}"
)
logging.debug(statement)
session = AsyncSession(self.engine)
result = await session.execute(text(statement))
obs = result.all()
await session.close()
return self.process(obs)

async def insert_obs(
self, session: AsyncSession, observation: Obsplan
) -> None:
"""Insert an observation into the DB"""

value_str = StringIO()
for count, key in enumerate(OBSPLAN_FIELDS):
if getattr(observation, key) is None:
value_str.write("NULL")
else:
value_str.write(f"'{getattr(observation,key)}'")
if (count + 1) < len(OBSPLAN_FIELDS):
value_str.write(",")

stmt = (
f'insert into {self.schema}."{Obsplan.__tablename__}" '
f"values ({value_str.getvalue()})"
)
logging.debug(stmt)
result = await session.execute(text(stmt))
logging.info(f"Inserted 1 Observation. {result}")

async def insert_obsplan(self, observations: list[Obsplan]) -> int:
"""Insert observations into the DB -
return the count of inserted rows."""
session = AsyncSession(self.engine)
for observation in observations:
await self.insert_obs(session, observation)
await session.commit()
await session.close()
logging.info(
f"Inserted and commited {len(observations)} Observations."
)
return len(observations)

async def tidyup(self, t: float) -> None:
session = AsyncSession(self.engine)
stmt = (
f'delete from {self.schema}."{Obsplan.__tablename__}"'
f" where t_planning = {t}"
)
logging.debug(stmt)
await session.execute(text(stmt))
await session.commit()
await session.close()


class MockDbHelp(DbHelp):
obslist = list[Obsplan]()

async def get_schedule(self, time: float = 0) -> list[Obsplan]:
observations = []
obs = Obsplan()
obs.t_planning = 60032.194918981484
obs.s_ra = 90.90909091666666
obs.s_dec = -74.60384434722222
obs.rubin_rot_sky_pos = 18.33895879413964
obs.rubin_nexp = 3
observations.append(obs)
return observations

async def insert_obsplan(self, observations: list[Obsplan]) -> int:
MockDbHelp.obslist.extend(observations)
return len(observations)


# sort of singleton
dbHelper: DbHelp | None = None


class DbHelpProvider:
@staticmethod
async def getHelper() -> DbHelp:
"""
:return: EfdHelp the helper
"""
global dbHelper
if dbHelper is None:
if "database_url" in os.environ:
config = Configuration()
full_url = (
f"postgresql+asyncpg://{config.database_user}:"
f"{config.database_password}@"
f"{config.database_url}/{config.database}"
)
logging.info(
f"Creating SQlAlchemy engine with "
f"{config.database_user}@{config.database_url}"
f"/config.database"
f" and schema: {config.database_schema}."
)
engine = create_async_engine(full_url)
dbHelper = DbHelp(engine=engine)
dbHelper.schema = config.database_schema
logging.info("Got engine")
else:
dbHelper = MockDbHelp(None)
logging.warning("Using MOCK DB - database_url env not set.")

return dbHelper
Loading
Loading