Skip to content

Commit

Permalink
Run browser data now respects the 'stream' combobox.
Browse files Browse the repository at this point in the history
  • Loading branch information
canismarko committed Jan 16, 2025
1 parent c2ff5fc commit 4f304db
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 79 deletions.
20 changes: 12 additions & 8 deletions src/firefly/run_browser/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,13 @@ async def load_all_runs(self, filters: Mapping = {}):
all_runs.append(run_data)
return all_runs

async def signal_names(self, hinted_only: bool = False):
async def signal_names(self, stream: str, *, hinted_only: bool = False):
"""Get a list of valid signal names (data columns) for selected runs.
Parameters
==========
stream
The Tiled stream name to fetch.
hinted_only
If true, only signals with the kind="hinted" parameter get
picked.
Expand All @@ -157,7 +159,7 @@ async def signal_names(self, hinted_only: bool = False):
xsignals, ysignals = [], []
for run in self.selected_runs:
if hinted_only:
xsig, ysig = await run.hints()
xsig, ysig = await run.hints(stream=stream)
else:
df = await run.data()
xsig = ysig = df.columns
Expand Down Expand Up @@ -186,32 +188,32 @@ async def load_selected_runs(self, uids):
self.selected_runs = runs
return runs

async def images(self, signal):
async def images(self, signal: str, stream: str):
"""Load the selected runs as 2D or 3D images suitable for plotting."""
images = OrderedDict()
for idx, run in enumerate(self.selected_runs):
# Load datasets from the database
try:
image = await run[signal]
image = await run.__getitem__(signal, stream=stream)
except KeyError as exc:
log.exception(exc)
else:
images[run.uid] = image
return images

async def all_signals(self, hinted_only=False) -> dict:
async def all_signals(self, stream: str, *, hinted_only=False) -> dict:
"""Produce dataframes with all signals for each run.
The keys of the dictionary are the labels for each curve, and
the corresponding value is a pandas dataframe with the scan data.
"""
xsignals, ysignals = await self.signal_names(hinted_only=hinted_only)
xsignals, ysignals = await self.signal_names(hinted_only=hinted_only, stream=stream)
# Build the dataframes
dfs = OrderedDict()
for run in self.selected_runs:
# Get data from the database
df = await run.data(signals=xsignals + ysignals)
df = await run.data(signals=xsignals + ysignals, stream=stream)
dfs[run.uid] = df
return dfs

Expand All @@ -220,6 +222,8 @@ async def signals(
x_signal,
y_signal,
r_signal=None,
*,
stream: str,
use_log=False,
use_invert=False,
use_grad=False,
Expand Down Expand Up @@ -259,7 +263,7 @@ async def signals(
if uids is not None and run.uid not in uids:
break
# Get data from the database
df = await run.data(signals=signals)
df = await run.data(signals=signals, stream=stream)
# Check for missing signals
missing_x = x_signal not in df.columns and df.index.name != x_signal
missing_y = y_signal not in df.columns
Expand Down
16 changes: 11 additions & 5 deletions src/firefly/run_browser/display.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,11 @@ async def update_streams(self, *args):
sorted(stream_names, key=lambda x: x != "primary")
self.ui.stream_combobox.addItems(stream_names)

@property
def stream(self):
current_text = self.ui.stream_combobox.currentText()
return current_text or "primary"

@asyncSlot()
@cancellable
async def update_multi_signals(self, *args):
Expand All @@ -355,7 +360,7 @@ async def update_multi_signals(self, *args):
# Determine valid list of columns to choose from
use_hints = self.ui.plot_multi_hints_checkbox.isChecked()
signals_task = self.db_task(
self.db.signal_names(hinted_only=use_hints), "multi signals"
self.db.signal_names(hinted_only=use_hints, stream=self.stream), "multi signals"
)
xcols, ycols = await signals_task
# Update the comboboxes with new signals
Expand All @@ -379,7 +384,7 @@ async def update_1d_signals(self, *args):
# Determine valid list of columns to choose from
use_hints = self.ui.plot_1d_hints_checkbox.isChecked()
signals_task = self.db_task(
self.db.signal_names(hinted_only=use_hints), "1D signals"
self.db.signal_names(hinted_only=use_hints, stream=self.stream), name="1D signals"
)
xcols, ycols = await signals_task
self.multi_y_signals = ycols
Expand All @@ -405,7 +410,7 @@ async def update_2d_signals(self, *args):
# Determine valid list of dependent signals to choose from
use_hints = self.ui.plot_2d_hints_checkbox.isChecked()
xcols, vcols = await self.db_task(
self.db.signal_names(hinted_only=use_hints), "2D signals"
self.db.signal_names(hinted_only=use_hints, stream=self.stream), "2D signals"
)
# Update the UI with the list of controls
val_cb.clear()
Expand All @@ -421,7 +426,7 @@ async def update_multi_plot(self, *args):
return
use_hints = self.ui.plot_multi_hints_checkbox.isChecked()
runs = await self.db_task(
self.db.all_signals(hinted_only=use_hints), "multi-plot"
self.db.all_signals(hinted_only=use_hints, stream=self.stream), "multi-plot"
)
self.ui.plot_multi_view.plot_runs(runs, xsignal=x_signal)

Expand Down Expand Up @@ -482,6 +487,7 @@ async def update_1d_plot(self, *args, uids: Sequence[str] = None):
use_invert=use_invert,
use_grad=use_grad,
uids=uids,
stream=self.stream,
),
"1D plot",
)
Expand Down Expand Up @@ -518,7 +524,7 @@ async def update_2d_plot(self):
use_log = self.ui.logarithm_checkbox_2d.isChecked()
use_invert = self.ui.invert_checkbox_2d.isChecked()
use_grad = self.ui.gradient_checkbox_2d.isChecked()
images = await self.db_task(self.db.images(value_signal), "2D plot")
images = await self.db_task(self.db.images(value_signal, stream=self.stream), "2D plot")
# Get axis labels
# Eventually this will be replaced with robust choices for plotting multiple images
metadata = await self.db_task(self.db.metadata(), "2D plot")
Expand Down
10 changes: 6 additions & 4 deletions src/firefly/run_browser/tests/test_display.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async def display(qtbot, tiled_client, catalog, mocker):
run = [run async for run in catalog.values()][0]
display.db.selected_runs = [run]
await display.update_1d_signals()
run_data = await run.data()
run_data = await run.data(stream="primary")
# Set the controls to describe the data we want to test
x_combobox = display.ui.signal_x_combobox
x_combobox.addItem("energy_energy")
Expand Down Expand Up @@ -251,7 +251,7 @@ async def test_update_2d_plot(catalog, display):
# Update the plots
await display.update_2d_plot()
# Determine what the image data should look like
expected_data = await run["It_net_counts"]
expected_data = await run.__getitem__("It_net_counts", stream="primary")
expected_data = expected_data.reshape((5, 21)).T
# Check that the data were added
image = display.plot_2d_item.image
Expand All @@ -267,8 +267,10 @@ async def test_update_2d_plot(catalog, display):

async def test_update_multi_plot(catalog, display):
run = await catalog["7d1daf1d-60c7-4aa7-a668-d1cd97e5335f"]
expected_xdata = await run["energy_energy"]
expected_ydata = np.log(await run["I0_net_counts"] / await run["It_net_counts"])
expected_xdata = await run.__getitem__("energy_energy", stream="primary")
I0 = await run.__getitem__("I0_net_counts", stream="primary")
It = await run.__getitem__("It_net_counts", stream="primary")
expected_ydata = np.log(I0 / It)
expected_ydata = np.gradient(expected_ydata, expected_xdata)
# Configure signals
display.ui.multi_signal_x_combobox.addItem("energy_energy")
Expand Down
131 changes: 69 additions & 62 deletions src/haven/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,61 +86,61 @@ def load_catalog(name: str = "bluesky"):
return databroker.catalog[name]


def load_result(uid: str, catalog_name: str = "bluesky", stream: str = "primary"):
"""Load a past experiment from the database.
The result contains metadata and scan parameters. The data
themselves are accessible from the result's *read()* method.
Parameters
==========
uid
The universal identifier for this scan, as return by a bluesky
RunEngine.
catalog_name
The name of the catalog as defined in the Intake file
(e.g. ~/.local/share/intake/catalogs.yml)
stream
The data stream defined by the bluesky RunEngine.
Returns
=======
result
The experiment result, with data available via the *read()*
method.
"""
cat = load_catalog(name=catalog_name)
result = cat[uid][stream]
return result


def load_data(uid, catalog_name="bluesky", stream="primary"):
"""Load a past experiment's data from the database.
The result is an xarray with the data collected.
Parameters
==========
uid
The universal identifier for this scan, as return by a bluesky
RunEngine.
catalog_name
The name of the catalog as defined in the Intake file
(e.g. ~/.local/share/intake/catalogs.yml)
stream
The data stream defined by the bluesky RunEngine.
Returns
=======
data
The experimental data, as an xarray.
"""

res = load_result(uid=uid, catalog_name=catalog_name, stream=stream)
data = res.read()
return data
# def load_result(uid: str, catalog_name: str = "bluesky", stream: str):
# """Load a past experiment from the database.

# The result contains metadata and scan parameters. The data
# themselves are accessible from the result's *read()* method.

# Parameters
# ==========
# uid
# The universal identifier for this scan, as return by a bluesky
# RunEngine.
# catalog_name
# The name of the catalog as defined in the Intake file
# (e.g. ~/.local/share/intake/catalogs.yml)
# stream
# The data stream defined by the bluesky RunEngine.

# Returns
# =======
# result
# The experiment result, with data available via the *read()*
# method.

# """
# cat = load_catalog(name=catalog_name)
# result = cat[uid][stream]
# return result


# def load_data(uid, catalog_name: str="bluesky", stream: str):
# """Load a past experiment's data from the database.

# The result is an xarray with the data collected.

# Parameters
# ==========
# uid
# The universal identifier for this scan, as return by a bluesky
# RunEngine.
# catalog_name
# The name of the catalog as defined in the Intake file
# (e.g. ~/.local/share/intake/catalogs.yml)
# stream
# The data stream defined by the bluesky RunEngine.

# Returns
# =======
# data
# The experimental data, as an xarray.

# """

# res = load_result(uid=uid, catalog_name=catalog_name, stream=stream)
# data = res.read()
# return data


def with_thread_lock(fn):
Expand Down Expand Up @@ -232,7 +232,7 @@ def stream_names(self):

@run_in_executor
def _read_data(
self, signals: Sequence | None, dataset: str = "primary/internal/events"
self, signals: Sequence | None, dataset: str
):
data = self.container[dataset]
if signals is None:
Expand All @@ -254,26 +254,32 @@ async def export(self, filename: str, format: str):
def formats(self):
return self.container.formats

async def data(self, signals=None, stream="primary"):
async def data(self, *, signals=None, stream: str):
return await self._read_data(signals, f"{stream}/internal/events/")

@property
def loop(self):
return asyncio.get_running_loop()

@run_in_executor
def data_keys(self, stream="primary"):
def data_keys(self, stream):
return self.container[stream]["internal/events"].columns

async def hints(self):
async def hints(self, stream: str):
"""Retrieve the data hints for this scan.
Parameters
==========
stream
The name of the Tiled data stream to look up hints for.
Returns
=======
independent
The hints for the independent scanning axis.
dependent
The hints for the dependent scanning axis.
"""
metadata = await self.metadata
# Get hints for the independent (X)
Expand All @@ -283,14 +289,15 @@ async def hints(self):
warnings.warn("Could not get independent hints")
# Get hints for the dependent (X)
dependent = []
primary_metadata = await self._read_metadata("primary")
primary_metadata = await self._read_metadata(stream)
hints = primary_metadata["hints"]
for device, dev_hints in hints.items():
dependent.extend(dev_hints["fields"])
return independent, dependent

@run_in_executor
def _read_metadata(self, keys=None):
assert keys != "", "Metadata keys cannot be ''."
container = self.container
if keys is not None:
container = container[keys]
Expand All @@ -300,9 +307,9 @@ def _read_metadata(self, keys=None):
async def metadata(self):
return await self._read_metadata()

async def __getitem__(self, signal):
async def __getitem__(self, signal, stream: str):
"""Retrieve a signal from the dataset, with reshaping etc."""
arr = await self._read_data([signal])
arr = await self._read_data([f"{stream}/{signal}"], dataset=f"{stream}/internal/events")
arr = np.asarray(arr[signal])
# Re-shape to match the scan dimensions
metadata = await self.metadata
Expand Down

0 comments on commit 4f304db

Please sign in to comment.