import time from statistics import mean from serial import Serial from serial.tools import list_ports from .config import DEFAULT_CALIB, DEFAULT_CALIB_WEIGHT, MOV_AVG_DEFAULTS class SerialReader: @property def value(self): return (self.current_raw_weight - self._tare) * self.calib_factor @property def calib_factor(self): return self._calib_factor @calib_factor.setter def calib_factor(self, value): self.calib_factor = value self._raw_reset_threshold = self.reset_threshold / value @property def calibrating(self): return self._calibrating @calibrating.setter def calibrating(self, value): if value: self.calib_window = [] self._calibrating = value def __init__(self): self.running = True self._calibrating = False self.ports = [d.device for d in list_ports.grep('usbmodem')] self.serial = None self._calib_factor = DEFAULT_CALIB_WEIGHT / DEFAULT_CALIB self.window_size = MOV_AVG_DEFAULTS['window_size'] self.reset_threshold = MOV_AVG_DEFAULTS['reset_threshold'] self._raw_reset_threshold = MOV_AVG_DEFAULTS['reset_threshold'] / self._calib_factor self.ignore_samples = MOV_AVG_DEFAULTS['ignore_samples'] self._tare = 0.0 self.window = [] self.calib_window = [] self.current_raw_weight = 0 self.ignored_samples = 0 def scan_devices(self): self.ports = [d.device for d in list_ports.grep('usbmodem')] def connect(self, port, baudrate=115200): self.serial = Serial(port, baudrate) def disconnect(self): self.serial.close() self.serial = None def reset(self): if self.serial is not None: self.serial.write('reset'.encode()) def read_weights(self): while self.running: if self.serial is not None: try: line = self.serial.readline().decode('utf-8') if line.startswith('calibrated'): print('calibrated') continue raw_weight = int(line.split(',')[1]) if self._calibrating: self.calib_window.append(raw_weight) self.filter(raw_weight) except: pass else: time.sleep(1) def stop(self): self.running = False def filter(self, raw_weight): if len(self.window) < self.window_size: self.window.append(raw_weight) self.current_raw_weight = mean(self.window) else: out_of_threshold = abs(self.current_raw_weight - raw_weight) > self._raw_reset_threshold if out_of_threshold and\ self.ignored_samples < self.ignore_samples: self.ignored_samples += 1 elif out_of_threshold: self.ignored_samples = 0 self.window = [raw_weight] self.current_raw_weight = raw_weight else: self.ignored_samples = 0 self.window.append(raw_weight) self.current_raw_weight = mean(self.window) def tare(self): self._tare = self.current_raw_weight