Skip to content

Commit

Permalink
Merge branch 'edge' into flex_stacker_protocol_edits
Browse files Browse the repository at this point in the history
  • Loading branch information
rclarke0 authored Jan 6, 2025
2 parents 2a598db + db7e201 commit 5077716
Show file tree
Hide file tree
Showing 1,426 changed files with 54,865 additions and 57,058 deletions.
19 changes: 19 additions & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,32 @@ module.exports = {
files: ['./protocol-designer/src/**/*.@(ts|tsx)'],
rules: {
'opentrons/no-imports-up-the-tree-of-life': 'warn',
'opentrons/no-margins-in-css': 'warn',
'opentrons/no-margins-inline': 'warn',
},
},
// apply application structure import requirements to app
{
files: ['./app/src/**/*.@(ts|tsx)'],
rules: {
'opentrons/no-imports-across-applications': 'error',
'opentrons/no-margins-in-css': 'warn',
'opentrons/no-margins-inline': 'warn',
},
},
{
files: ['./opentrons-ai-client/src/**/*.@(ts|tsx)'],
rules: {
'opentrons/no-imports-up-the-tree-of-life': 'warn',
'opentrons/no-margins-in-css': 'warn',
'opentrons/no-margins-inline': 'warn',
},
},
{
files: ['./components/src/**/*.@(ts|tsx)'],
rules: {
'opentrons/no-margins-in-css': 'warn',
'opentrons/no-margins-inline': 'warn',
},
},
],
Expand Down
1,539 changes: 869 additions & 670 deletions abr-testing/Pipfile.lock

Large diffs are not rendered by default.

138 changes: 138 additions & 0 deletions abr-testing/abr_testing/tools/module_control.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
"""Interface with opentrons modules!"""
from serial import Serial # type: ignore[import-untyped]
import asyncio
import subprocess
from typing import Any

# Generic
_READ_ALL = "readall"
_READ_LINE = "read"
_DONE = "done"

# TC commands
_MOVE_SEAL = "ms"
_MOVE_LID = "ml"
tc_gcode_shortcuts = {
"status": "M119",
_MOVE_SEAL: "M241.D", # move seal motor
_MOVE_LID: "M240.D", # move lid stepper motor
"ol": "M126", # open lid
"cl": "M127", # close lid
"sw": "M901.D", # status of all switches
"lt": "M141.D", # get lid temperature
"pt": "M105.D", # get plate temperature
}

# HS Commands
hs_gcode_shortcuts = {
"srpm": "M3 S{rpm}", # Set RPM
"grpm": "M123", # Get RPM
"home": "G28", # Home
"deactivate": "M106", # Deactivate
}

gcode_shortcuts = tc_gcode_shortcuts | hs_gcode_shortcuts


async def message_read(dev: Serial) -> Any:
"""Read message."""
response = dev.readline().decode()
while not response:
await asyncio.sleep(1)
response = dev.readline().decode()
return response


async def message_return(dev: Serial) -> Any:
"""Wait until message becomes available."""
try:
response = await asyncio.wait_for(message_read(dev), timeout=30)
return response
except asyncio.exceptions.TimeoutError:
print("response timed out.")
return ""


async def handle_module_gcode_shortcut(
dev: Serial, command: str, in_commands: bool, output: str = ""
) -> None:
"""Handle debugging commands that require followup."""
if in_commands:
if command == _MOVE_SEAL:
distance = input("enter distance in steps => ")
dev.write(
f"{gcode_shortcuts[command]} {distance}\n".encode()
) # (+) -> retract, (-) -> engage
# print(await message_return(dev))
elif command == _MOVE_LID:
distance = input(
"enter angular distance in degrees => "
) # (+) -> open, (-) -> close
dev.write(f"{gcode_shortcuts[command]} {distance}\n".encode())
# print(await message_return(dev))
# everything else
else:
dev.write(f"{gcode_shortcuts[command]}\n".encode())
else:
dev.write(f"{command}\n".encode())
try:
mr = await message_return(dev)
print(mr)
except TypeError:
print("Invalid input")
return

if output:
try:
with open(output, "a") as result_file:
if "OK" in mr:
status = command + ": SUCCESS"
else:
status = command + ": FAILURE"
result_file.write(status)
result_file.write(f" {mr}")
result_file.close()
except FileNotFoundError:
print(f"cannot open file: {output}")


async def comms_loop(dev: Serial, commands: list, output: str = "") -> bool:
"""Loop for commands."""
_exit = False
try:
command = commands.pop(0)
except IndexError:
command = input("\n>>> ")
if command == _READ_ALL:
print(dev.readlines())
elif command == _READ_LINE:
print(dev.readline())
elif command == _DONE:
_exit = True
elif command in gcode_shortcuts:
await handle_module_gcode_shortcut(dev, command, True, output)
else:
await handle_module_gcode_shortcut(dev, command, False, output)
return _exit


async def _main(module: str, commands: list = [], output: str = "") -> bool:
"""Main process."""
module_name = (
subprocess.check_output(["find", "/dev/", "-name", f"*{module}*"])
.decode()
.strip()
)
if not module_name:
print(f"{module} not found. Exiting.")
return False
dev = Serial(f"{module_name}", 9600, timeout=2)
_exit = False
while not _exit:
_exit = await comms_loop(dev, commands, output)
dev.close()
return True


if __name__ == "__main__":
asyncio.run(_main("heatershaker"))
155 changes: 155 additions & 0 deletions abr-testing/abr_testing/tools/test_modules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
"""Modules Tests Script!"""
import asyncio
import time
from datetime import datetime
import os
import module_control # type: ignore
from typing import Any, Tuple, Dict
import traceback

# To run:
# SSH into robot
# cd /opt/opentrons-robot-server/abr-testing/tools
# python3 test_modules.py


async def tc_test_1(module: str, path_to_file: str) -> None:
"""Thermocycler Test 1 Open and Close Lid."""
duration = int(input("How long to run this test for? (in seconds): "))
start = time.time()
while time.time() - start < duration:
try:
await (tc_open_lid(module, path_to_file))
except asyncio.TimeoutError:
return
time.sleep(5)
try:
await (tc_close_lid(module, path_to_file))
except asyncio.TimeoutError:
return
time.sleep(5)


async def hs_test_1(module: str, path_to_file: str) -> None:
"""Heater Shaker Test 1. (Home and Shake)."""
duration = int(input("How long to run this test for? (in seconds): "))
rpm = input("Target RPM (200-3000): ")
start = time.time()
while time.time() - start < duration:
try:
await (hs_test_home(module, path_to_file))
except asyncio.TimeoutError:
return
time.sleep(5)
try:
await (hs_test_set_shake(module, rpm, path_to_file))
except asyncio.TimeoutError:
return
time.sleep(10)
try:
await (hs_test_set_shake(module, "0", path_to_file))
except asyncio.TimeoutError:
return
time.sleep(10)


async def input_codes(module: str, path_to_file: str) -> None:
"""Opens serial for manual code input."""
await module_control._main(module, output=path_to_file)


hs_tests: Dict[str, Tuple[Any, str]] = {
"Test 1": (hs_test_1, "Repeatedly home heater shaker then set shake speed"),
"Input GCodes": (input_codes, "Input g codes"),
}

tc_tests: Dict[str, Tuple[Any, str]] = {
"Test 1": (tc_test_1, "Repeatedly open and close TC lid"),
"Input GCodes": (input_codes, "Input g codes"),
}

global modules

modules = {
"heatershaker": hs_tests,
"thermocycler": tc_tests,
}


async def main(module: str) -> None:
"""Select test to be run."""
# Select test to run
# Set directory for tests
BASE_DIRECTORY = "/userfs/data/testing_data/"
if not os.path.exists(BASE_DIRECTORY):
os.makedirs(BASE_DIRECTORY)
tests = modules[module]
for i, test in enumerate(tests.keys()):
function, description = tests[test]
print(f"{i}) {test} : {description}")
selected_test = int(input("Please select a test: "))
try:
function, description = tests[list(tests.keys())[selected_test]]
test_dir = BASE_DIRECTORY + f"{module}/test/{list(tests.keys())[selected_test]}"
print(f"{i}, {description}")
print(f"TEST DIR: {test_dir}")
date = datetime.now()
filename = f"results_{datetime.strftime(date, '%Y-%m-%d_%H:%M:%S')}.txt"
output_file = os.path.join(test_dir, filename)
try:
if not os.path.exists(test_dir):
os.makedirs(test_dir)
open(output_file, "a").close()
except Exception:
traceback.print_exc()
print(f"PATH: {output_file} ")
await (function(module, output_file))
except Exception:
print("Failed to run test")
traceback.print_exc()


# HS Test Functions
async def hs_test_home(module: str, path_to_file: str) -> None:
"""Home heater shaker."""
hs_gcodes = module_control.hs_gcode_shortcuts
home_gcode = hs_gcodes["home"]
await (module_control._main(module, [home_gcode, "done"], path_to_file))


async def hs_test_set_shake(module: str, rpm: str, path_to_file: str) -> None:
"""Shake heater shaker at specified speed."""
hs_gcodes = module_control.hs_gcode_shortcuts
set_shake_gcode = hs_gcodes["srpm"].format(rpm=rpm)
await (module_control._main(module, [set_shake_gcode, "done"], path_to_file))


async def hs_deactivate(module: str, path_to_file: str) -> None:
"""Deactivate Heater Shaker."""
hs_gcodes = module_control.hs_gcode_shortcuts
deactivate_gcode = hs_gcodes["deactivate"]
await (module_control._main(module, [deactivate_gcode, "done"], path_to_file))


# TC Test Functions
async def tc_open_lid(module: str, path_to_file: str) -> None:
"""Open thermocycler lid."""
tc_gcodes = module_control.tc_gcode_shortcuts
open_lid_gcode = tc_gcodes["ol"]
await (module_control._main(module, [open_lid_gcode, "done"], path_to_file))


async def tc_close_lid(module: str, path_to_file: str) -> None:
"""Open thermocycler lid."""
tc_gcodes = module_control.tc_gcode_shortcuts
close_lid_gcode = tc_gcodes["cl"]
await (module_control._main(module, [close_lid_gcode, "done"], path_to_file))


if __name__ == "__main__":
print("Modules:")
for i, module in enumerate(modules):
print(f"{i}) {module}")
module_int = int(input("Please select a module: "))
module = list(modules.keys())[module_int]
asyncio.run(main(module))
17 changes: 17 additions & 0 deletions analyses-snapshot-testing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,20 @@ You have the option to specify one or many protocols to run the analyses on. Thi
### Updating the snapshots locally

- `make snapshot-test-update-local` - this target builds the base image, builds the local code into the base image, then runs the analyses battery against the image you just created, updating the snapshots by passing the `--update-snapshots` flag to the test

### Add some protocols to the analyses battery

> The below instructions avoid needing docker and executing snapshot tests locally.

1. create new protocol file(s) in the [files/protocols](./files/protocols) directory following the naming convention in [files/README.md](./files/README.md)
1. add the protocol(s) to the [protocols.py](./automation/data/protocols.py)
1. `make format` (make sure you have followed setup instructions)
1. commit and push your branch
1. open a PR and add the label `gen-analyses-snapshot-pr`
1. when the snapshot fails because your new protocols don't have snapshots a PR will be created that heals.
1. merge the healing PR if the snapshots are as expected
1. get a review and merge! 🎉 now your protocols are a part of the test
### Add a protocol with overrides to the analyses battery
> TODO when we have a more straight forward example
42 changes: 42 additions & 0 deletions analyses-snapshot-testing/automation/data/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,48 @@ class Protocols:
file_extension="py",
robot="Flex",
)
# analyses-snapshot-testing/files/protocols/Flex_X_v2_21_plate_reader_no_trash.py
Flex_X_v2_21_plate_reader_no_trash: Protocol = Protocol(
file_stem="Flex_X_v2_21_plate_reader_no_trash",
file_extension="py",
robot="Flex",
)
# analyses-snapshot-testing/files/protocols/Flex_X_v2_21_plate_reader_wrong_plate.py
Flex_X_v2_21_plate_reader_wrong_plate: Protocol = Protocol(
file_stem="Flex_X_v2_21_plate_reader_wrong_plate",
file_extension="py",
robot="Flex",
)
# analyses-snapshot-testing/files/protocols/Flex_X_v2_21_plate_reader_wrong_plate2.py
Flex_X_v2_21_plate_reader_wrong_plate2: Protocol = Protocol(
file_stem="Flex_X_v2_21_plate_reader_wrong_plate2",
file_extension="py",
robot="Flex",
)
# analyses-snapshot-testing/files/protocols/Flex_X_v2_21_plate_reader_bad_slot.py
Flex_X_v2_21_plate_reader_bad_slot: Protocol = Protocol(
file_stem="Flex_X_v2_21_plate_reader_bad_slot",
file_extension="py",
robot="Flex",
)
# analyses-snapshot-testing/files/protocols/Flex_X_v2_21_plate_reader_no_close_lid.py
Flex_X_v2_21_plate_reader_no_close_lid: Protocol = Protocol(
file_stem="Flex_X_v2_21_plate_reader_no_close_lid",
file_extension="py",
robot="Flex",
)
# analyses-snapshot-testing/files/protocols/Flex_S_v2_21_tc_lids_happy_path.py
Flex_S_v2_21_tc_lids_happy_path: Protocol = Protocol(
file_stem="Flex_S_v2_21_tc_lids_happy_path",
file_extension="py",
robot="Flex",
)
# analyses-snapshot-testing/files/protocols/Flex_X_v2_21_tc_lids_wrong_target.py
Flex_X_v2_21_tc_lids_wrong_target: Protocol = Protocol(
file_stem="Flex_X_v2_21_tc_lids_wrong_target",
file_extension="py",
robot="Flex",
)

OT2_X_v2_18_None_None_duplicateRTPVariableName: Protocol = Protocol(
file_stem="OT2_X_v2_18_None_None_duplicateRTPVariableName",
Expand Down
File renamed without changes.
Loading

0 comments on commit 5077716

Please sign in to comment.