From b8832f889dc6a5816a33d371c5f5455da036bb2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?F=C3=A9lix=20Boisselier?= Date: Sun, 15 Sep 2024 20:41:50 +0200 Subject: [PATCH] spliting stdata into chunks on the filesystem with all the logic to load them back and reconstruct the .stdata file --- shaketune/commands/axes_map_calibration.py | 1 + shaketune/commands/axes_shaper_calibration.py | 5 +- shaketune/commands/compare_belts_responses.py | 1 + .../commands/create_vibrations_profile.py | 5 + shaketune/commands/excitate_axis_at_freq.py | 1 + shaketune/helpers/accelerometer.py | 154 +++++++++++++----- 6 files changed, 123 insertions(+), 44 deletions(-) diff --git a/shaketune/commands/axes_map_calibration.py b/shaketune/commands/axes_map_calibration.py index ed9c33e..95c4939 100644 --- a/shaketune/commands/axes_map_calibration.py +++ b/shaketune/commands/axes_map_calibration.py @@ -113,5 +113,6 @@ def axes_map_calibration(gcmd, config, st_process: ShakeTuneProcess) -> None: ConsoleOutput.print('This may take some time (1-3min)') creator = st_process.get_graph_creator() creator.configure(accel, SEGMENT_LENGTH) + measurements_manager.wait_for_data_transfers(printer.get_reactor()) st_process.run(measurements_manager) st_process.wait_for_completion() diff --git a/shaketune/commands/axes_shaper_calibration.py b/shaketune/commands/axes_shaper_calibration.py index c4977c8..5aaf807 100644 --- a/shaketune/commands/axes_shaper_calibration.py +++ b/shaketune/commands/axes_shaper_calibration.py @@ -90,14 +90,12 @@ def axes_shaper_calibration(gcmd, config, st_process: ShakeTuneProcess) -> None: else: input_shaper = None - measurements_manager = MeasurementsManager() - # Filter axis configurations based on user input, assuming 'axis_input' can be 'x', 'y', 'all' (that means 'x' and 'y') filtered_config = [ a for a in AXIS_CONFIG if a['axis'] == axis_input or (axis_input == 'all' and a['axis'] in ('x', 'y')) ] for config in filtered_config: - measurements_manager.clear_measurements() # Clear the measurements in each iteration of the loop + measurements_manager = MeasurementsManager() # First we need to find the accelerometer chip suited for the axis accel_chip = Accelerometer.find_axis_accelerometer(printer, config['axis']) @@ -117,6 +115,7 @@ def axes_shaper_calibration(gcmd, config, st_process: ShakeTuneProcess) -> None: # And finally generate the graph for each measured axis ConsoleOutput.print(f'{config["axis"].upper()} axis frequency profile generation...') ConsoleOutput.print('This may take some time (1-3min)') + measurements_manager.wait_for_data_transfers(printer.get_reactor()) st_process.run(measurements_manager) st_process.wait_for_completion() toolhead.dwell(1) diff --git a/shaketune/commands/compare_belts_responses.py b/shaketune/commands/compare_belts_responses.py index 5e27ba2..664649e 100644 --- a/shaketune/commands/compare_belts_responses.py +++ b/shaketune/commands/compare_belts_responses.py @@ -128,5 +128,6 @@ def compare_belts_responses(gcmd, config, st_process: ShakeTuneProcess) -> None: # Run post-processing ConsoleOutput.print('Belts comparative frequency profile generation...') ConsoleOutput.print('This may take some time (1-3min)') + measurements_manager.wait_for_data_transfers(printer.get_reactor()) st_process.run(measurements_manager) st_process.wait_for_completion() diff --git a/shaketune/commands/create_vibrations_profile.py b/shaketune/commands/create_vibrations_profile.py index 83717a4..ccd1e84 100644 --- a/shaketune/commands/create_vibrations_profile.py +++ b/shaketune/commands/create_vibrations_profile.py @@ -137,6 +137,11 @@ def create_vibrations_profile(gcmd, config, st_process: ShakeTuneProcess) -> Non accelerometer.stop_recording() accelerometer.wait_for_samples() + # For this command, we need to wait for the data transfers after finishing each of + # the measurements as there is a high probability to have a lot of measurements in + # the measurement manager and that chunks are written into the /tmp filesystem folder + measurements_manager.wait_for_data_transfers(printer.get_reactor()) + toolhead.dwell(0.3) toolhead.wait_moves() diff --git a/shaketune/commands/excitate_axis_at_freq.py b/shaketune/commands/excitate_axis_at_freq.py index 1392e91..f6d6d45 100644 --- a/shaketune/commands/excitate_axis_at_freq.py +++ b/shaketune/commands/excitate_axis_at_freq.py @@ -106,5 +106,6 @@ def excitate_axis_at_freq(gcmd, config, st_process: ShakeTuneProcess) -> None: creator = st_process.get_graph_creator() creator.configure(freq, duration, accel_per_hz) + measurements_manager.wait_for_data_transfers(printer.get_reactor()) st_process.run(measurements_manager) st_process.wait_for_completion() diff --git a/shaketune/helpers/accelerometer.py b/shaketune/helpers/accelerometer.py index c65fa11..dd1fd3f 100644 --- a/shaketune/helpers/accelerometer.py +++ b/shaketune/helpers/accelerometer.py @@ -14,6 +14,7 @@ import os import pickle import time +import uuid from multiprocessing import Process from pathlib import Path from typing import List, Tuple, TypedDict @@ -26,6 +27,8 @@ Sample = Tuple[float, float, float, float] SamplesList = List[Sample] +CHUNK_SIZE = 15 # Maximum number of measurements to keep in memory at once + class Measurement(TypedDict): name: str @@ -35,14 +38,17 @@ class Measurement(TypedDict): class MeasurementsManager: def __init__(self): self.measurements: List[Measurement] = [] - self._write_process = None - - def add_measurement(self, name: str, samples: SamplesList = None): - samples = samples if samples is not None else [] - self.measurements.append({'name': name, 'samples': samples}) - - def get_measurements(self) -> List[Measurement]: - return self.measurements + self._uuid = str(uuid.uuid4())[:8] + self._temp_dir = Path(f'/tmp/shaketune_{self._uuid}') + self._temp_dir.mkdir(parents=True, exist_ok=True) + self._chunk_files = [] + self._write_processes = [] + + def clear_measurements(self, keep_last: bool = False): + if keep_last: + self.measurements = [self.measurements[-1]] + else: + self.measurements = [] def append_samples_to_last_measurement(self, additional_samples: SamplesList): try: @@ -50,15 +56,55 @@ def append_samples_to_last_measurement(self, additional_samples: SamplesList): except IndexError as err: raise ValueError('no measurements available to append samples to.') from err - def clear_measurements(self): - self.measurements = [] + def add_measurement(self, name: str, samples: SamplesList = None): + samples = samples if samples is not None else [] + self.measurements.append({'name': name, 'samples': samples}) + if len(self.measurements) > CHUNK_SIZE: + self._save_chunk() + + def _save_chunk(self): + # Save the measurements to the chunk file. We keep the last measurement + # in memory to be able to append new samples to it later if needed + chunk_filename = self._temp_dir / f'{self._uuid}_{len(self._chunk_files)}.stchunk' + process = Process(target=self._save_to_file, args=(chunk_filename, self.measurements[:-1].copy())) + process.daemon = False + process.start() + self._write_processes.append(process) + self._chunk_files.append(chunk_filename) + self.clear_measurements(keep_last=True) def save_stdata(self, filename: Path): - self._write_process = Process(target=self._save_to_file, args=(filename,)) - self._write_process.daemon = False - self._write_process.start() + process = Process(target=self._reassemble_chunks, args=(filename,)) + process.daemon = False + process.start() + self._write_processes.append(process) - def _save_to_file(self, filename: Path): + def _reassemble_chunks(self, filename: Path): + try: + os.nice(19) + except Exception: + pass # Ignore errors as it's not critical + try: + all_measurements = [] + for chunk_file in self._chunk_files: + chunk_measurements = self._load_measurements_from_file(chunk_file) + all_measurements.extend(chunk_measurements) + os.remove(chunk_file) # Remove the chunk file after reading + + # Include any remaining measurements in memory + if self.measurements: + all_measurements.extend(self.measurements) + + # Save all measurements to the final .stdata file + self._save_to_file(filename, all_measurements) + + # Clean up + self.clear_measurements() + self._chunk_files = [] + except Exception as e: + ConsoleOutput.print(f'Warning: unable to assemble chunks into {filename}: {e}') + + def _save_to_file(self, filename: Path, measurements: List[Measurement]): try: os.nice(19) except Exception: @@ -67,15 +113,54 @@ def _save_to_file(self, filename: Path): with open(filename, 'wb') as f: cctx = zstd.ZstdCompressor(level=3) with cctx.stream_writer(f) as compressor: - pickle.dump(self.measurements, compressor) + pickle.dump(measurements, compressor) + except Exception as e: + ConsoleOutput.print(f'Warning: unable to save the data to {filename}: {e}') + + def wait_for_data_transfers(self, k_reactor, timeout: int = 30): + if not self._write_processes: + return # No file write is pending + + eventtime = k_reactor.monotonic() + endtime = eventtime + timeout + complete = False + + while eventtime < endtime: + eventtime = k_reactor.pause(eventtime + 0.05) + if all(not p.is_alive() for p in self._write_processes): + complete = True + break + + if not complete: + raise TimeoutError( + 'Shake&Tune was unable to write the accelerometer data on the fylesystem. ' + 'This might be due to a slow, busy or full SD card.' + ) + + self._write_processes = [] + + def _load_measurements_from_file(self, filename: Path) -> List[Measurement]: + try: + with open(filename, 'rb') as f: + dctx = zstd.ZstdDecompressor() + with dctx.stream_reader(f) as decompressor: + measurements = pickle.load(decompressor) + return measurements except Exception as e: - ConsoleOutput.print(f'Warning: unable to save the measurements to {filename}: {e}') + ConsoleOutput.print(f'Warning: unable to load measurements from {filename}: {e}') + return [] + + def get_measurements(self) -> List[Measurement]: + all_measurements = [] + for chunk_file in self._chunk_files: + chunk_measurements = self._load_measurements_from_file(chunk_file) + all_measurements.extend(chunk_measurements) + all_measurements.extend(self.measurements) # Include any remaining measurements in memory + + return all_measurements def load_from_stdata(self, filename: Path) -> List[Measurement]: - with open(filename, 'rb') as f: - dctx = zstd.ZstdDecompressor() - with dctx.stream_reader(f) as decompressor: - self.measurements = pickle.load(decompressor) + self.measurements = self._load_measurements_from_file(filename) return self.measurements def load_from_csvs(self, klipper_CSVs: List[Path]) -> List[Measurement]: @@ -124,27 +209,14 @@ def load_from_csvs(self, klipper_CSVs: List[Path]) -> List[Measurement]: return self.measurements - def wait_for_file_writes(self, k_reactor, timeout: int = 30): - if self._write_process is None: - return # No file write is pending - - eventtime = k_reactor.monotonic() - endtime = eventtime + timeout - complete = False - - while eventtime < endtime: - eventtime = k_reactor.pause(eventtime + 0.05) - if not self._write_process.is_alive(): - complete = True - break - - if not complete: - raise TimeoutError( - 'Shake&Tune was unable to write the accelerometer data into the .STDATA file. ' - 'This might be due to a slow SD card or a busy or full filesystem.' - ) - - self._write_process = None + def __del__(self): + try: + if self._temp_dir.exists(): + for chunk_file in self._temp_dir.glob('*.stchunk'): + chunk_file.unlink() + self._temp_dir.rmdir() + except Exception: + pass # Ignore errors during cleanup class Accelerometer: