init
This commit is contained in:
90
frontend/serial_reader.py
Normal file
90
frontend/serial_reader.py
Normal file
@@ -0,0 +1,90 @@
|
||||
import time
|
||||
|
||||
from statistics import mean
|
||||
|
||||
from serial import Serial
|
||||
from serial.tools import list_ports
|
||||
|
||||
from .config import DEFAULT_CALIB, DEFAULT_CALIB_WEIGHT, MOV_AVG_DEFAULTS
|
||||
|
||||
class WeightReader:
|
||||
@property
|
||||
def value(self):
|
||||
return (self.current_raw_weight - self._tare) * self.calib_factor
|
||||
|
||||
@property
|
||||
def calib_factor(self):
|
||||
return self._calib_factor
|
||||
|
||||
@calib_factor.setter
|
||||
def calib_factor(self, value):
|
||||
self.calib_factor = value
|
||||
self._raw_reset_threshold = self.reset_threshold / value
|
||||
|
||||
def __init__(self):
|
||||
self.running = True
|
||||
self.ports = [d.device for d in list_ports.grep('usbmodem')]
|
||||
|
||||
self.serial = None
|
||||
|
||||
self._calib_factor = DEFAULT_CALIB_WEIGHT / DEFAULT_CALIB
|
||||
self.window_size = MOV_AVG_DEFAULTS['window_size']
|
||||
self.reset_threshold = MOV_AVG_DEFAULTS['reset_threshold']
|
||||
self._raw_reset_threshold = MOV_AVG_DEFAULTS['reset_threshold'] / self._calib_factor
|
||||
self.ignore_samples = MOV_AVG_DEFAULTS['ignore_samples']
|
||||
|
||||
self._tare = 0.0
|
||||
|
||||
self.window = []
|
||||
self.current_raw_weight = 0
|
||||
self.ignored_samples = 0
|
||||
|
||||
|
||||
def scan_devices(self):
|
||||
self.ports = [d.device for d in list_ports.grep('usbmodem')]
|
||||
|
||||
def connect(self, port, baudrate=115200):
|
||||
self.serial = Serial(port, baudrate)
|
||||
|
||||
def disconnect(self):
|
||||
self.serial.close()
|
||||
self.serial = None
|
||||
|
||||
def read_weights(self):
|
||||
while self.running:
|
||||
if self.serial is not None:
|
||||
try:
|
||||
line = self.serial.readline().decode('utf-8')
|
||||
raw_weight = int(line.split(',')[1])
|
||||
|
||||
self.filter(raw_weight)
|
||||
except:
|
||||
pass
|
||||
else:
|
||||
time.sleep(1)
|
||||
|
||||
def stop(self):
|
||||
self.running = False
|
||||
|
||||
def filter(self, raw_weight):
|
||||
if len(self.window) < self.window_size:
|
||||
self.window.append(raw_weight)
|
||||
self.current_raw_weight = mean(self.window)
|
||||
else:
|
||||
out_of_threshold = abs(self.current_raw_weight - raw_weight) > self._raw_reset_threshold
|
||||
if out_of_threshold and\
|
||||
self.ignored_samples < self.ignore_samples:
|
||||
self.ignored_samples += 1
|
||||
|
||||
elif out_of_threshold:
|
||||
self.ignored_samples = 0
|
||||
self.window = [raw_weight]
|
||||
self.current_raw_weight = raw_weight
|
||||
|
||||
else:
|
||||
self.ignored_samples = 0
|
||||
self.window.append(raw_weight)
|
||||
self.current_raw_weight = mean(self.window)
|
||||
|
||||
def tare(self):
|
||||
self._tare = self.current_raw_weight
|
||||
Reference in New Issue
Block a user