Skip to content

Commit

Permalink
spliting stdata into chunks on the filesystem
Browse files Browse the repository at this point in the history
with all the logic to load them back and reconstruct the .stdata file
  • Loading branch information
Frix-x committed Sep 15, 2024
1 parent eed67f1 commit b9651bd
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 84 deletions.
1 change: 1 addition & 0 deletions shaketune/commands/axes_map_calibration.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,5 +113,6 @@ def axes_map_calibration(gcmd, config, st_process: ShakeTuneProcess) -> None:
ConsoleOutput.print('This may take some time (1-3min)')
creator = st_process.get_graph_creator()
creator.configure(accel, SEGMENT_LENGTH)
measurements_manager.wait_for_data_transfers(printer.get_reactor())
st_process.run(measurements_manager)
st_process.wait_for_completion()
5 changes: 2 additions & 3 deletions shaketune/commands/axes_shaper_calibration.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,12 @@ def axes_shaper_calibration(gcmd, config, st_process: ShakeTuneProcess) -> None:
else:
input_shaper = None

measurements_manager = MeasurementsManager()

# Filter axis configurations based on user input, assuming 'axis_input' can be 'x', 'y', 'all' (that means 'x' and 'y')
filtered_config = [
a for a in AXIS_CONFIG if a['axis'] == axis_input or (axis_input == 'all' and a['axis'] in ('x', 'y'))
]
for config in filtered_config:
measurements_manager.clear_measurements() # Clear the measurements in each iteration of the loop
measurements_manager = MeasurementsManager()

# First we need to find the accelerometer chip suited for the axis
accel_chip = Accelerometer.find_axis_accelerometer(printer, config['axis'])
Expand All @@ -117,6 +115,7 @@ def axes_shaper_calibration(gcmd, config, st_process: ShakeTuneProcess) -> None:
# And finally generate the graph for each measured axis
ConsoleOutput.print(f'{config["axis"].upper()} axis frequency profile generation...')
ConsoleOutput.print('This may take some time (1-3min)')
measurements_manager.wait_for_data_transfers(printer.get_reactor())
st_process.run(measurements_manager)
st_process.wait_for_completion()
toolhead.dwell(1)
Expand Down
1 change: 1 addition & 0 deletions shaketune/commands/compare_belts_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,5 +128,6 @@ def compare_belts_responses(gcmd, config, st_process: ShakeTuneProcess) -> None:
# Run post-processing
ConsoleOutput.print('Belts comparative frequency profile generation...')
ConsoleOutput.print('This may take some time (1-3min)')
measurements_manager.wait_for_data_transfers(printer.get_reactor())
st_process.run(measurements_manager)
st_process.wait_for_completion()
5 changes: 5 additions & 0 deletions shaketune/commands/create_vibrations_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ def create_vibrations_profile(gcmd, config, st_process: ShakeTuneProcess) -> Non
accelerometer.stop_recording()
accelerometer.wait_for_samples()

# For this command, we need to wait for the data transfers after finishing each of
# the measurements as there is a high probability to have a lot of measurements in
# the measurement manager and that chunks are written into the /tmp filesystem folder
measurements_manager.wait_for_data_transfers(printer.get_reactor())

toolhead.dwell(0.3)
toolhead.wait_moves()

Expand Down
1 change: 1 addition & 0 deletions shaketune/commands/excitate_axis_at_freq.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,5 +106,6 @@ def excitate_axis_at_freq(gcmd, config, st_process: ShakeTuneProcess) -> None:

creator = st_process.get_graph_creator()
creator.configure(freq, duration, accel_per_hz)
measurements_manager.wait_for_data_transfers(printer.get_reactor())
st_process.run(measurements_manager)
st_process.wait_for_completion()
225 changes: 144 additions & 81 deletions shaketune/helpers/accelerometer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import os
import pickle
import time
import uuid
from multiprocessing import Process
from pathlib import Path
from typing import List, Tuple, TypedDict
Expand All @@ -26,6 +27,8 @@
Sample = Tuple[float, float, float, float]
SamplesList = List[Sample]

CHUNK_SIZE = 15 # Maximum number of measurements to keep in memory at once


class Measurement(TypedDict):
name: str
Expand All @@ -35,30 +38,75 @@ class Measurement(TypedDict):
class MeasurementsManager:
def __init__(self):
self.measurements: List[Measurement] = []
self._write_process = None
self._uuid = str(uuid.uuid4())[:8]
self._temp_dir = Path(f'/tmp/shaketune_{self._uuid}')
self._temp_dir.mkdir(parents=True, exist_ok=True)
self._chunk_files: List[Path] = []
self._write_processes: List[Process] = []

def clear_measurements(self, keep_last: bool = False) -> None:
if keep_last and self.measurements:
self.measurements = [self.measurements[-1]]
else:
self.measurements = []

def add_measurement(self, name: str, samples: SamplesList = None):
samples = samples if samples is not None else []
self.measurements.append({'name': name, 'samples': samples})
def append_samples_to_last_measurement(self, additional_samples: SamplesList) -> None:
if not self.measurements:
raise ValueError('No measurements available to append samples to.')
self.measurements[-1]['samples'].extend(additional_samples)

def get_measurements(self) -> List[Measurement]:
return self.measurements

def append_samples_to_last_measurement(self, additional_samples: SamplesList):
def add_measurement(self, name: str, samples: SamplesList = None) -> None:
if samples is None:
samples = []
self.measurements.append({'name': name, 'samples': samples})
if len(self.measurements) > CHUNK_SIZE:
self._split_and_save_chunk()

def _split_and_save_chunk(self) -> None:
chunk_filename = self._temp_dir / f'{self._uuid}_{len(self._chunk_files)}.stchunk'
process = Process(target=self._save_to_file, args=(chunk_filename, self.measurements[:-1].copy()))
process.daemon = False
process.start()
self._write_processes.append(process)
self._chunk_files.append(chunk_filename)

# Clear measurements but keep the last one in memory to be able to append the last set
# of samples that can be still in recording (and that will be the first item of the next chunk)
self.clear_measurements(keep_last=True)

def save_stdata(self, filename: Path) -> None:
process = Process(target=self._reassemble_chunks, args=(filename,))
process.daemon = False
process.start()
self._write_processes.append(process)

def _reassemble_chunks(self, filename: Path) -> None:
try:
self.measurements[-1]['samples'].extend(additional_samples)
except IndexError as err:
raise ValueError('no measurements available to append samples to.') from err

def clear_measurements(self):
self.measurements = []

def save_stdata(self, filename: Path):
self._write_process = Process(target=self._save_to_file, args=(filename,))
self._write_process.daemon = False
self._write_process.start()
os.nice(19)
except Exception:
pass # Ignore errors as it's not critical
try:
all_measurements: List[Measurement] = []
# Load all chunk files to reconstruct the measurements array
for chunk_file in self._chunk_files:
chunk_measurements = self._load_measurements_from_file(chunk_file)
all_measurements.extend(chunk_measurements)
chunk_file.unlink(missing_ok=True) # Remove the chunk file after loading it

# Include the remaining measurements still in memory (that were not saved to a chunk file)
if self.measurements:
all_measurements.extend(self.measurements)

# And save the whole array into the final .stdata file (in the user results folder)
self._save_to_file(filename, all_measurements)

# Clean up the temporary files and memory
self.clear_measurements()
self._chunk_files = []
except Exception as e:
ConsoleOutput.print(f'Warning: unable to reconstruct data chunks into {filename}: {e}')

def _save_to_file(self, filename: Path):
def _save_to_file(self, filename: Path, measurements: List[Measurement]) -> None:
try:
os.nice(19)
except Exception:
Expand All @@ -67,84 +115,99 @@ def _save_to_file(self, filename: Path):
with open(filename, 'wb') as f:
cctx = zstd.ZstdCompressor(level=3)
with cctx.stream_writer(f) as compressor:
pickle.dump(self.measurements, compressor)
pickle.dump(measurements, compressor)
except Exception as e:
ConsoleOutput.print(f'Warning: unable to save the measurements to {filename}: {e}')
ConsoleOutput.print(f'Warning: unable to save the data to {filename}: {e}')

def wait_for_data_transfers(self, k_reactor, timeout: int = 30) -> None:
if not self._write_processes:
return # No file write is pending

eventtime = k_reactor.monotonic()
endtime = eventtime + timeout

while eventtime < endtime:
eventtime = k_reactor.pause(eventtime + 0.05)
if all(not p.is_alive() for p in self._write_processes):
break
else:
raise TimeoutError(
'Shake&Tune was unable to write the accelerometer data on the filesystem. '
'This might be due to a slow, busy, or full SD card.'
)

self._write_processes = []

def _load_measurements_from_file(self, filename: Path) -> List[Measurement]:
try:
with open(filename, 'rb') as f:
dctx = zstd.ZstdDecompressor()
with dctx.stream_reader(f) as decompressor:
measurements = pickle.load(decompressor)
return measurements
except Exception as e:
ConsoleOutput.print(f'Warning: unable to load measurements from {filename}: {e}')
return []

def get_measurements(self) -> List[Measurement]:
all_measurements: List[Measurement] = []
for chunk_file in self._chunk_files:
chunk_measurements = self._load_measurements_from_file(chunk_file)
all_measurements.extend(chunk_measurements)
all_measurements.extend(self.measurements) # Include any remaining measurements from memory
return all_measurements

def load_from_stdata(self, filename: Path) -> List[Measurement]:
with open(filename, 'rb') as f:
dctx = zstd.ZstdDecompressor()
with dctx.stream_reader(f) as decompressor:
self.measurements = pickle.load(decompressor)
self.measurements = self._load_measurements_from_file(filename)
return self.measurements

def load_from_csvs(self, klipper_CSVs: List[Path]) -> List[Measurement]:
for logname in klipper_CSVs:
def load_from_csvs(self, klipper_csvs: List[Path]) -> List[Measurement]:
for logname in klipper_csvs:
try:
if logname.suffix != '.csv':
if logname.suffix.lower() != '.csv':
ConsoleOutput.print(f'Warning: {logname} is not a CSV file. It will be ignored by Shake&Tune!')
continue
with open(logname) as f:
header = None
for line in f:
cleaned_line = line.strip()
# Check for a PSD file generated by Klipper and raise a warning
if cleaned_line.startswith('#freq,psd_x,psd_y,psd_z,psd_xyz'):
ConsoleOutput.print(
f'Warning: {logname} does not contain raw Klipper accelerometer data. '
'Please use the official Klipper script to process it instead. '
'Please use the official Klipper script to process it instead.'
)
continue
# Check for the expected legacy header used in Shake&Tune (raw accelerometer data from Klipper)
break
elif cleaned_line.startswith('#time,accel_x,accel_y,accel_z'):
header = cleaned_line
break
if not header:
ConsoleOutput.print(
f"Warning: file {logname} doesn't seem to be a Klipper raw accelerometer data file. "
f"Expected '#time,accel_x,accel_y,accel_z', but got '{header.strip()}'. "
'This file will be ignored by Shake&Tune!'
"Expected header '#time,accel_x,accel_y,accel_z'. This file will be ignored by Shake&Tune!"
)
continue
# If we have the correct raw data header, proceed to load the data
data = np.loadtxt(logname, comments='#', delimiter=',', skiprows=1)
if data.ndim == 1 or data.shape[1] != 4:
ConsoleOutput.print(
f'Warning: {logname} does not have the correct data format; expected 4 columns. '
'It will be ignored by Shake&Tune!'
)
continue

# Add the parsed klipper raw accelerometer data to Shake&Tune measurements object
samples = [tuple(row) for row in data]
self.add_measurement(name=logname.stem, samples=samples)
except Exception as err:
ConsoleOutput.print(f'Error while reading {logname}: {err}. It will be ignored by Shake&Tune!')
continue

return self.measurements

def wait_for_file_writes(self, k_reactor, timeout: int = 30):
if self._write_process is None:
return # No file write is pending

eventtime = k_reactor.monotonic()
endtime = eventtime + timeout
complete = False

while eventtime < endtime:
eventtime = k_reactor.pause(eventtime + 0.05)
if not self._write_process.is_alive():
complete = True
break

if not complete:
raise TimeoutError(
'Shake&Tune was unable to write the accelerometer data into the .STDATA file. '
'This might be due to a slow SD card or a busy or full filesystem.'
)

self._write_process = None
def __del__(self):
try:
if self._temp_dir.exists():
for chunk_file in self._temp_dir.glob('*.stchunk'):
chunk_file.unlink(missing_ok=True)
self._temp_dir.rmdir()
except Exception as e:
ConsoleOutput.print(f'Warning: error during temporary data chunks cleanup: {e}')


class Accelerometer:
Expand All @@ -157,7 +220,7 @@ def __init__(self, klipper_accelerometer, k_reactor):
self._sample_error = None

@staticmethod
def find_axis_accelerometer(printer, axis: str = 'xy'):
def find_axis_accelerometer(printer, axis: str = 'xy') -> str:
accel_chip_names = printer.lookup_object('resonance_tester').accel_chip_names
for chip_axis, chip_name in accel_chip_names:
if axis in {'x', 'y'} and chip_axis == 'xy':
Expand All @@ -166,36 +229,37 @@ def find_axis_accelerometer(printer, axis: str = 'xy'):
return chip_name
return None

def start_recording(self, measurements_manager: MeasurementsManager, name: str = None, append_time: bool = True):
if self._bg_client is None:
self._bg_client = self._k_accelerometer.start_internal_client()
def start_recording(
self, measurements_manager: MeasurementsManager, name: str = None, append_time: bool = True
) -> None:
if self._bg_client is not None:
raise ValueError('Recording already started!')

timestamp = time.strftime('%Y%m%d_%H%M%S')
if name is None:
name = timestamp
elif append_time:
name += f'_{timestamp}'
self._bg_client = self._k_accelerometer.start_internal_client()
timestamp = time.strftime('%Y%m%d_%H%M%S')
if name is None:
name = timestamp
elif append_time:
name += f'_{timestamp}'

if not name.replace('-', '').replace('_', '').isalnum():
raise ValueError('invalid measurement name!')
if not name.replace('-', '').replace('_', '').isalnum():
raise ValueError('Invalid measurement name!')

self._measurements_manager = measurements_manager
self._measurements_manager.add_measurement(name=name)
else:
raise ValueError('Recording already started!')
self._measurements_manager = measurements_manager
self._measurements_manager.add_measurement(name=name)

def stop_recording(self) -> MeasurementsManager:
if self._bg_client is None:
ConsoleOutput.print('Warning: no recording to stop!')
return None

# Register a callback in Klipper's reactor to finish the measurements and get the
# samples when Klipper is ready to process them (and without blocking its main thread)
# Register a callback in Klipper's reactor to finish measurements and get the samples. This is needed
# to schedule it when Klipper is ready to process them (and without blocking its main thread)
self._k_reactor.register_callback(self._finish_and_get_samples)

return self._measurements_manager

def _finish_and_get_samples(self, bg_client):
def _finish_and_get_samples(self, bg_client) -> None:
try:
self._bg_client.finish_measurements()
samples = self._bg_client.samples or self._bg_client.get_samples()
Expand All @@ -207,7 +271,7 @@ def _finish_and_get_samples(self, bg_client):
finally:
self._bg_client = None

def wait_for_samples(self, timeout: int = 60):
def wait_for_samples(self, timeout: int = 60) -> None:
eventtime = self._k_reactor.monotonic()
endtime = eventtime + timeout

Expand All @@ -217,8 +281,7 @@ def wait_for_samples(self, timeout: int = 60):
break
if self._sample_error:
raise self._sample_error

if not self._samples_ready:
else:
raise TimeoutError(
'Shake&Tune was unable to retrieve accelerometer data in time. '
'This might be due to slow hardware or a busy system.'
Expand Down

0 comments on commit b9651bd

Please sign in to comment.