Files
frontend-dev/frontend/serial_reader.py
2025-05-05 22:52:16 +02:00

112 lines
3.3 KiB
Python

import time
from statistics import mean
from serial import Serial
from serial.tools import list_ports
from .config import DEFAULT_CALIB, DEFAULT_CALIB_WEIGHT, MOV_AVG_DEFAULTS
class SerialReader:
@property
def value(self):
return (self.current_raw_weight - self._tare) * self.calib_factor
@property
def calib_factor(self):
return self._calib_factor
@calib_factor.setter
def calib_factor(self, value):
self.calib_factor = value
self._raw_reset_threshold = self.reset_threshold / value
@property
def calibrating(self):
return self._calibrating
@calibrating.setter
def calibrating(self, value):
if value:
self.calib_window = []
self._calibrating = value
def __init__(self):
self.running = True
self._calibrating = False
self.ports = [d.device for d in list_ports.grep('usbmodem')]
self.serial = None
self._calib_factor = DEFAULT_CALIB_WEIGHT / DEFAULT_CALIB
self.window_size = MOV_AVG_DEFAULTS['window_size']
self.reset_threshold = MOV_AVG_DEFAULTS['reset_threshold']
self._raw_reset_threshold = MOV_AVG_DEFAULTS['reset_threshold'] / self._calib_factor
self.ignore_samples = MOV_AVG_DEFAULTS['ignore_samples']
self._tare = 0.0
self.window = []
self.calib_window = []
self.current_raw_weight = 0
self.ignored_samples = 0
def scan_devices(self):
self.ports = [d.device for d in list_ports.grep('usbmodem')]
def connect(self, port, baudrate=115200):
self.serial = Serial(port, baudrate)
def disconnect(self):
self.serial.close()
self.serial = None
def reset(self):
if self.serial is not None:
self.serial.write('reset'.encode())
def read_weights(self):
while self.running:
if self.serial is not None:
try:
line = self.serial.readline().decode('utf-8')
if line.startswith('calibrated'):
print('calibrated')
continue
raw_weight = int(line.split(',')[1])
if self._calibrating:
self.calib_window.append(raw_weight)
self.filter(raw_weight)
except:
pass
else:
time.sleep(1)
def stop(self):
self.running = False
def filter(self, raw_weight):
if len(self.window) < self.window_size:
self.window.append(raw_weight)
self.current_raw_weight = mean(self.window)
else:
out_of_threshold = abs(self.current_raw_weight - raw_weight) > self._raw_reset_threshold
if out_of_threshold and\
self.ignored_samples < self.ignore_samples:
self.ignored_samples += 1
elif out_of_threshold:
self.ignored_samples = 0
self.window = [raw_weight]
self.current_raw_weight = raw_weight
else:
self.ignored_samples = 0
self.window.append(raw_weight)
self.current_raw_weight = mean(self.window)
def tare(self):
self._tare = self.current_raw_weight