diff --git a/prompt-processing/groups.ipynb b/prompt-processing/groups.ipynb index 8ecb5ef..f847b57 100644 --- a/prompt-processing/groups.ipynb +++ b/prompt-processing/groups.ipynb @@ -36,6 +36,7 @@ "outputs": [], "source": [ "from astropy.time import Time, TimeDelta\n", + "import boto3\n", "import pandas\n", "from lsst_efd_client import EfdClient" ] @@ -84,8 +85,9 @@ " # Ignore the explicitly canceled groups\n", " if not all_canceled.empty:\n", " canceled = df.index.intersection(all_canceled.set_index(\"groupId\").index).tolist()\n", - " logger.info(\"Removing the canceled events: %s\", canceled)\n", - " df = df.drop(canceled)\n", + " if canceled:\n", + " logger.info(f\"{len(canceled)} events were canceled {canceled}\")\n", + " df = df.drop(canceled)\n", "\n", " return df" ] @@ -100,16 +102,6 @@ "df_efd = await get_df_from_next_visit_events(date)" ] }, - { - "cell_type": "code", - "execution_count": null, - "id": "40c67ddd-4bfa-4f45-8775-bfbe9edf8927", - "metadata": {}, - "outputs": [], - "source": [ - "df_efd.head()" - ] - }, { "cell_type": "code", "execution_count": null, @@ -153,16 +145,6 @@ "df_butler = pandas.DataFrame.from_records([embargo_records[num].toDict() for num in embargo_records]).set_index(\"group_name\")" ] }, - { - "cell_type": "code", - "execution_count": null, - "id": "e518792e-590a-4af5-8b21-1c39089c2129", - "metadata": {}, - "outputs": [], - "source": [ - "df_butler.head()" - ] - }, { "cell_type": "code", "execution_count": null, @@ -171,7 +153,8 @@ "outputs": [], "source": [ "groups_no_raw = set(df_efd.index) - set(df_butler.index)\n", - "logger.info(f\"{len(groups_no_raw)} group had records in EFD but no raws in the embargo butler: {groups_no_raw}\")" + "if groups_no_raw:\n", + " logger.info(f\"{len(groups_no_raw)} group had records in EFD but no raws in the embargo butler: {groups_no_raw}\")" ] }, { @@ -207,11 +190,7 @@ " suffixes=('_efd', '_butler'),\n", " validate=\"one_to_one\",).set_index(\"groupId\")\n", "\n", - "logger.info(f\"{len(df_md)} groups in the table\")\n", - "\n", - "fields_efd = [\"filters\", \"position0\", \"position1\"]\n", - "fields_butler = [\"seq_num\", \"tracking_ra\", \"tracking_dec\", \"physical_filter\", \"target_name\", \"id\"]\n", - "df_md[fields_efd + fields_butler].head()" + "logger.info(f\"Total: {len(df_md)} groups in the table\")" ] }, { @@ -224,30 +203,16 @@ "boring_cols = [\"instrument_efd\", \"instrument_butler\", \"science_program\", \"observation_reason\", \"observation_type\", \n", " \"cameraAngle\", \"has_simulated\", \"dome\", \"coordinateSystem\", \"rotationSystem\",\n", " \"private_identity\", \"private_origin\", \"private_revCode\", \"salIndex\", \"totalCheckpoints\",\n", - " \"nimages\"]\n", + " \"nimages\",\n", + " \"day_obs\", \"survey\", \"exposure_time\"]\n", "for col in boring_cols:\n", " if df_md[col].nunique() == 1:\n", - " logger.info(f\"Dropping column {col} with only {df_md[col].unique()}\")\n", + " logger.info(f\"column {col} has only {df_md[col].unique()}\")\n", " df_md.drop(columns=[col,], inplace=True) \n", " else:\n", " logger.warning(f\"Column {col} has {df_md[col].unique()}\")" ] }, - { - "cell_type": "code", - "execution_count": null, - "id": "b14d0660-34da-44e2-ab50-3a83f576621c", - "metadata": {}, - "outputs": [], - "source": [ - "for (col1, col2, name) in ((\"filters\", \"physical_filter\", \"filter\"),):\n", - " if df_md[col1].equals(df_md[col2]):\n", - " df_md.drop(columns=[col1,], inplace=True) \n", - " df_md.rename(columns={col2: name}, inplace=True)\n", - " else:\n", - " print(f\"Some {name} did not match\")" - ] - }, { "cell_type": "code", "execution_count": null, @@ -255,7 +220,7 @@ "metadata": {}, "outputs": [], "source": [ - "for col in (\"filter\", \"exposure_time\", \"survey\", \"day_obs\" ):\n", + "for col in (\"physical_filter\", ):\n", " logger.info(f\"Column {col} has {df_md[col].unique()}\")" ] }, @@ -287,7 +252,7 @@ "metadata": {}, "outputs": [], "source": [ - "df_md[[\"seq_num\", \"position0\", \"tracking_ra\", \"offset0\", \"position1\", \"tracking_dec\", \"offset1\", \"filter\" ]]" + "df_md[[\"seq_num\", \"position0\", \"tracking_ra\", \"offset0\", \"position1\", \"tracking_dec\", \"offset1\", \"physical_filter\", \"id\"]]" ] }, { @@ -297,13 +262,376 @@ "metadata": {}, "outputs": [], "source": [ - "df_md[\"offset0\"].describe(), df_md[\"offset1\"].describe()" + "df_md[[\"offset0\", \"offset1\"]].describe()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c5e8101a-036e-4a14-9941-e1f156c85d88", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1a182ce5-4541-40c3-bc75-e5be75feede1", + "metadata": {}, + "outputs": [], + "source": [ + "def get_exposure_id(dayobs: int):\n", + " \"\"\"\n", + " Returns\n", + " -------\n", + " df : `pandas.DataFrame`\n", + " \"\"\"\n", + " butler = Butler(\"/repo/embargo\", writeable=False)\n", + "\n", + " results = butler.registry.queryDimensionRecords(\n", + " \"exposure\",\n", + " datasets=\"raw\",\n", + " collections=\"LATISS/raw/all\",\n", + " where=\"instrument='LATISS' and exposure.observation_type='science' and\"\n", + " \" exposure.day_obs=dayobs\",\n", + " bind={\"dayobs\": dayobs},\n", + " )\n", + "\n", + " df = pandas.DataFrame(\n", + " [(_.group_name, _.id) for _ in results], columns=[\"groupId\", \"expId\"]\n", + " ).set_index(\"groupId\")\n", + "\n", + " return df" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fbdd3851-6ec4-4706-bd74-dd35ccb4839c", + "metadata": {}, + "outputs": [], + "source": [ + "def get_df_file(df_expId, butler, datasetType, where=\"\", collections=...):\n", + " \"\"\"\n", + " Get the last-moditied timestamps of the dataset files in a bucket-based butler repo at USDF\n", + "\n", + " Returns\n", + " -------\n", + " df : `pandas.DataFrame`\n", + " \"\"\" \n", + " s3_endpoint = \"https://s3dfrgw.slac.stanford.edu\"\n", + " s3client = boto3.client(\"s3\", endpoint_url=s3_endpoint)\n", + " refs = butler.registry.queryDatasets(\n", + " datasetType=datasetType,\n", + " collections=collections,\n", + " where=where,\n", + " )\n", + "\n", + " timestamps = dict()\n", + " dimension = None\n", + " for ref in refs:\n", + " if not dimension:\n", + " if \"visit\" in ref.dataId:\n", + " dimension = \"visit\"\n", + " else:\n", + " dimension = \"exposure\"\n", + " fits_uri = butler.getURI(ref)\n", + " time_written = s3client.head_object(\n", + " Bucket=fits_uri.netloc,\n", + " Key=fits_uri.relativeToPathRoot,\n", + " )[\"LastModified\"]\n", + " timestamps[ref.dataId[dimension]] = time_written\n", + " \n", + " df = pandas.DataFrame.from_dict(data=timestamps, orient='index', columns=[\"file\"]) \n", + " df = pandas.merge(df, df_expId.reset_index(), left_index=True, right_on=\"expId\", \n", + " how=\"left\", validate=\"one_to_one\",).set_index(\"groupId\")\n", + " return df" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "55208bb3-d4a8-4d5e-a1c1-53e464db29f9", + "metadata": {}, + "outputs": [], + "source": [ + "df_expId = get_exposure_id(dayobs)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0b423ff9-f41a-47cc-a331-15700334e910", + "metadata": {}, + "outputs": [], + "source": [ + "df_raw = get_df_file(df_expId, butler, \"raw\", \n", + " collections=[\"LATISS/raw/all\"],\n", + " where=f\"exposure.science_program IN ('AUXTEL_PHOTO_IMAGING') and instrument='LATISS' and exposure.day_obs={dayobs}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3fd370c4-8389-4006-b23f-ee6a7f5852f2", + "metadata": {}, + "outputs": [], + "source": [ + "if len(df_raw) != len(df_md):\n", + " logger.warning(\"Counts of raw files do not match; need attention\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "71fc7e31-3c39-4bde-a2cd-86e255b44ac0", + "metadata": {}, + "outputs": [], + "source": [ + "df_md = df_md.merge(df_raw[[\"file\"]], how=\"outer\", left_index=True, right_index=True, validate=\"one_to_one\",)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "345f7d89-fa0d-4594-8d83-7b13b838d30d", + "metadata": {}, + "outputs": [], + "source": [ + "df_md.rename(columns={\"file\": \"ts_raw\"}, inplace=True)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "014c2f3c-1441-4567-aadf-bf14da1369ff", + "metadata": {}, + "outputs": [], + "source": [ + "df_output = get_df_file(df_expId, butler, \"calexp\", \n", + " collections=[f\"LATISS/prompt/output-{date}/*/prompt-proto-service-*\"])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f3272504-db65-4e10-a7f3-450eb60a8ed8", + "metadata": {}, + "outputs": [], + "source": [ + "df_md = df_md.merge(df_output[[\"file\"]], how=\"outer\", left_index=True, right_index=True, validate=\"one_to_one\",)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "790edcc5-d023-43cf-a04b-4203a7c0fbf7", + "metadata": {}, + "outputs": [], + "source": [ + "df_md.rename(columns={\"file\": \"ts_output\"}, inplace=True)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5a20abfc-5102-4ece-8dd0-58dba88f541a", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3c0e28e9-7b35-419f-ab5b-b1560de18e28", + "metadata": {}, + "outputs": [], + "source": [ + "def get_loki_command_with_phrase(time_start, time_end, phrase, extra=\"\"):\n", + " command = f\"\"\"~/binaries/logcli-linux-amd64 --output=jsonl --tls-skip-verify query --addr=http://sdfloki.slac.stanford.edu:80 --timezone=UTC -q --limit=200 --from=\"{time_start}\" --to=\"{time_end}\" --proxy-url=http://sdfproxy.sdf.slac.stanford.edu:3128 '{{namespace=\"vcluster--usdf-prompt-processing\", container=\"user-container\", pod=~\"prompt-proto-service-.+\"}} |~ \"{phrase}\" {extra} ' \"\"\"\n", + " return command" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5e783edc-f809-430d-9a3a-d340c1e761ea", + "metadata": {}, + "outputs": [], + "source": [ + "def get_df_from_loki(date, search_phrase=\"Waiting for snaps\"):\n", + " start = Time(date, scale=\"utc\", format=\"isot\") + TimeDelta(12*60*60, format=\"sec\")\n", + " end = start + TimeDelta(1, format=\"jd\")\n", + " command = get_loki_command_with_phrase(start.strftime('%Y-%m-%dT%H:%M:%SZ'), \n", + " end.strftime('%Y-%m-%dT%H:%M:%SZ'),\n", + " search_phrase)\n", + " results = !{command}\n", + " data = [json.loads(_) for _ in results]\n", + " df = pandas.json_normalize(data)\n", + " df = df.merge(pandas.json_normalize(df[\"line\"].apply(json.loads)),\n", + " left_index=True, right_index=True).drop(columns=[\"line\"])\n", + "\n", + " if \"group\" not in df.columns and \"message\" in df.columns:\n", + " df[\"group\"] = df[\"message\"].str.extract(r\"groupId='([T:.\\d-]*)',\")\n", + "\n", + " df[\"ts\"] = pandas.to_datetime(df[\"timestamp\"])\n", + " \n", + " return df[[\"group\", \"ts\"]]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "246b8632-4f90-49be-8376-5ef1911a67bc", + "metadata": {}, + "outputs": [], + "source": [ + "phrases = {\n", + " \"unpckMsg\": \"Unpacked message as \\\" |~ \\\"AUXTEL\",\n", + " \"prepBtlr\": \"Preparing Butler for visit\",\n", + " \"waitSnap\": \"Waiting for snaps\",\n", + " \"runPipe1\": \"Running pipeline\",\n", + " \"pipeSucc\": \"Pipeline successfully run\",\n", + " \"outputSa\": \"Pipeline products saved to collection\",\n", + " #\"timeout1\": \"Timed out waiting for image after receiving exposures \",\n", + "}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9afa63c6-359f-486c-805f-684bf86c7ae2", + "metadata": {}, + "outputs": [], + "source": [ + "df_loki = pandas.DataFrame(columns=[\"group\"])\n", + "for phrase in phrases:\n", + " logger.debug(f\"Got Loki records for {phrase}\")\n", + " df2 = get_df_from_loki(date, phrases[phrase]).rename(columns={\"ts\": \"ts_\" + phrase})\n", + " df_loki = df_loki.merge(df2, on=\"group\", how=\"outer\", validate=\"one_to_one\",)\n", + "df_loki = df_loki.rename(columns={\"group\": \"groupId\"}).set_index(\"groupId\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f96dc090-8d7b-47b0-a86a-531f496d6c01", + "metadata": {}, + "outputs": [], + "source": [ + "# This plot only uses Loki timestamp. Hence it includes groups with no data taken. \n", + "df1 = pandas.DataFrame(index=df_loki.index)\n", + "ref = \"ts_unpckMsg\"\n", + "for col_name in df_loki.columns:\n", + " # Notes: if the ref column doesn't exist, all become NaN in df1 \n", + " # This can happens e.g. an exposure wasn't taken, so that group isn't in df_md\n", + " df1[col_name] = (df_loki[col_name] - df_loki[ref]).dt.total_seconds()\n", + " \n", + "ax = df1.drop(columns=[ref]).plot(kind=\"hist\", title=f\"{date}; ref={ref}\", xlabel=\"seconds\", ylabel=\"\", bins=50,\n", + " #subplots=True, layout=(2,4), figsize=(12,6),\n", + " alpha=0.5, rot=45, \n", + ")\n", + "\n", + "pandas.merge(df1, df_loki, how=\"outer\", left_index=True, right_index=True, \n", + " suffixes=('_diff', '_loki'),\n", + " validate=\"one_to_one\",).head()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2a254629-67e2-46cb-a984-00026396a0d5", + "metadata": {}, + "outputs": [], + "source": [ + "df_md[\"ts_begin\"] = df_md[\"timespan\"].apply(lambda _: pandas.Timestamp(_.begin.utc.datetime, tz=\"UTC\"))\n", + "df_md[\"ts_end\"] = df_md[\"timespan\"].apply(lambda _: pandas.Timestamp(_.end.utc.datetime, tz=\"UTC\"))\n", + "\n", + "# typically a small fraction of seconds before sndStamp \n", + "df_md[\"ts_group_utc\"] = df_md[\"group_name\"].apply(lambda _: pandas.Timestamp(Time(_, scale=\"tai\").utc.datetime, tz=\"UTC\"))\n", + "\n", + "# time of visit publication; TAI in unix seconds\n", + "df_md[\"ts_sndStamp\"] = df_md[\"private_sndStamp\"].apply(lambda _: pandas.Timestamp(Time(_, format=\"unix_tai\").utc.datetime, tz=\"UTC\"))\n", + "# time of visit publication; UTC in unix seconds\n", + "df_md[\"ts_efdStamp\"] = df_md[\"private_efdStamp\"].apply(lambda _: pandas.Timestamp.fromtimestamp(_, tz=\"UTC\")) \n", + "# Let them be private \n", + "df_md[\"ts_rcvStamp\"] = df_md[\"private_rcvStamp\"].apply(lambda _: pandas.Timestamp.fromtimestamp(_, tz=\"UTC\")) \n", + "df_md[\"ts_kafkaStamp\"] = df_md[\"private_kafkaStamp\"].apply(lambda _: pandas.Timestamp.fromtimestamp(_, tz=\"UTC\")) " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2cab28cd-2698-45b5-bcb5-0f95e3618642", + "metadata": {}, + "outputs": [], + "source": [ + "columns = [\"ts_sndStamp\", \"ts_begin\", \"ts_end\", \"ts_raw\", \"ts_output\"]\n", + "df0 = pandas.merge(df_md[columns],\n", + " df_loki, \n", + " # only those with data in butler, not those groups with events but no data taken.\n", + " how=\"left\",\n", + " left_index=True, right_index=True, validate=\"one_to_one\",)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a77cdfd2-297d-4926-b0f1-7a9f793c7ac3", + "metadata": {}, + "outputs": [], + "source": [ + "df2 = pandas.DataFrame(index=df0.index)\n", + "ref = \"ts_sndStamp\"\n", + "for col_name in df0.columns:\n", + " df2[col_name] = (df0[col_name] - df0[ref]).dt.total_seconds()\n", + "\n", + "df2[[\"ts_sndStamp\", \"ts_waitSnap\",\"ts_begin\",\"ts_end\", \"ts_runPipe1\", \"ts_outputSa\", \"ts_raw\", \"ts_output\"]].plot(\n", + " kind=\"hist\", title=f\"{date}; ref={ref}\", xlabel=\"seconds\", ylabel=\"\", bins=150,\n", + " alpha=0.5, rot=45, \n", + ")\n", + "\n", + "df2.describe()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c6ec5219-0964-414a-b3f5-21ebf88194f9", + "metadata": {}, + "outputs": [], + "source": [ + "df2 = pandas.DataFrame(index=df0.index)\n", + "ref = \"ts_end\"\n", + "for col_name in df0.columns:\n", + " df2[col_name] = (df0[col_name] - df0[ref]).dt.total_seconds()\n", + "\n", + "df2[[\"ts_sndStamp\", \"ts_waitSnap\",\"ts_begin\",\"ts_end\", \"ts_runPipe1\", \"ts_raw\", \"ts_output\"]].plot(\n", + " kind=\"hist\", title=f\"{date}; ref={ref}\", xlabel=\"seconds\", ylabel=\"\", bins=150,\n", + " alpha=0.5, rot=45, \n", + ")\n", + "\n", + "df2.describe()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "299216ec-0637-43f3-a153-82e0e2b930be", + "metadata": {}, + "outputs": [], + "source": [ + "ax = df2.plot.box(title=f\"{date}; ref={ref}\", ylabel=\"seconds\", figsize=(12,5),\n", + " column=df2.median().sort_values().index.tolist(),\n", + ")\n", + "ax.legend(labels=df2.median().sort_values().to_string().split(\"\\n\"), \n", + " loc=\"lower right\", title=\"median\", handlelength=0)" ] }, { "cell_type": "code", "execution_count": null, - "id": "ec7e51b3-807d-4406-bbab-ff97679559d6", + "id": "9ed3aa0b-cfde-47da-ab21-fc1799875e49", "metadata": {}, "outputs": [], "source": []