add python_toolkit
This commit is contained in:
@@ -2,16 +2,21 @@ import tkinter as tk
|
||||
from tkinter import ttk
|
||||
from tkinter.messagebox import showinfo
|
||||
import threading
|
||||
from time import sleep
|
||||
from time import sleep, time
|
||||
from statistics import mean
|
||||
import pandas as pd
|
||||
|
||||
from .serial_reader import SerialReader
|
||||
from .config import DEFAULT_CALIB, DISPLAY_TYPES
|
||||
from python_toolkit.serial_reader import SerialReader
|
||||
from python_toolkit.gui.connect import ConnectFrame
|
||||
from .config import DEFAULT_CALIB_WEIGHT, DEFAULT_CALIB, DISPLAY_TYPES, MOV_AVG_DEFAULTS
|
||||
from .views import *
|
||||
|
||||
class WeightApp(tk.Tk):
|
||||
def __init__(self, weight_reader: SerialReader):
|
||||
super().__init__()
|
||||
self.recording = False
|
||||
self.record_start = None
|
||||
self.record_window = []
|
||||
self.weight_reader = weight_reader
|
||||
self.view = None
|
||||
|
||||
@@ -19,17 +24,10 @@ class WeightApp(tk.Tk):
|
||||
self.toolbar.pack(side=tk.LEFT)
|
||||
|
||||
#### Connection Settings ####
|
||||
self.connection_settings = tk.Frame(self.toolbar, pady=10)
|
||||
self.scan_ports_button = ttk.Button(self.connection_settings, text="Scan Devices", command=self.update_devices)
|
||||
self.scan_ports_button.pack()
|
||||
self.port_label = ttk.Label(self.connection_settings, text="Port:")
|
||||
self.port_label.pack(side=tk.LEFT)
|
||||
self.port = ttk.Combobox(self.connection_settings, values=[])
|
||||
self.update_devices()
|
||||
self.port.pack(side=tk.LEFT)
|
||||
self.connection_settings.pack()
|
||||
self.connect_button = ttk.Button(self.toolbar, text="Connect", command=self.connect)
|
||||
self.connect_button.pack()
|
||||
self.connection_settings = ConnectFrame(self.toolbar,
|
||||
self.weight_reader,
|
||||
self._on_connect,
|
||||
pady=30)
|
||||
|
||||
#### Weight Reader Settings ####
|
||||
self.reader_settings = tk.Frame(self.toolbar, pady=30)
|
||||
@@ -58,6 +56,11 @@ class WeightApp(tk.Tk):
|
||||
self.view_type_update = ttk.Button(self.view_type, text="Refresh", command=self.update_view)
|
||||
self.view_type_update.pack()
|
||||
|
||||
self.recording_frame = tk.Frame(self.toolbar, pady=20)
|
||||
self.recording_frame.pack()
|
||||
self.record_button = ttk.Button(self.recording_frame, text="Record", command=self.trigger_record)
|
||||
self.record_button.pack()
|
||||
|
||||
#### Display ####
|
||||
self.update_view()
|
||||
|
||||
@@ -69,36 +72,35 @@ class WeightApp(tk.Tk):
|
||||
|
||||
|
||||
def show_device_components(self):
|
||||
self.connect_button.pack_forget()
|
||||
self.connection_settings.pack_forget()
|
||||
self.reader_settings.pack_forget()
|
||||
self.view_type.pack_forget()
|
||||
self.recording_frame.pack_forget()
|
||||
self.view.pack_forget()
|
||||
|
||||
self.connect_button.pack()
|
||||
self.reader_settings.pack()
|
||||
self.view_type.pack()
|
||||
self.recording_frame.pack()
|
||||
self.view.pack()
|
||||
|
||||
def hide_device_components(self):
|
||||
self.connect_button.pack_forget()
|
||||
self.connection_settings.pack_forget()
|
||||
self.reader_settings.pack_forget()
|
||||
self.view_type.pack_forget()
|
||||
self.recording_frame.pack_forget()
|
||||
self.view.pack_forget()
|
||||
self.connection_settings.pack()
|
||||
self.connect_button.pack()
|
||||
|
||||
|
||||
def connect(self):
|
||||
if self.weight_reader.serial is None:
|
||||
port = self.port.get()
|
||||
self.weight_reader.connect(port)
|
||||
self.connect_button.config(text="Disconnect")
|
||||
def _on_connect(self, connected):
|
||||
if connected:
|
||||
# port = self.port.get()
|
||||
# self.weight_reader.connect(port)
|
||||
# self.connect_button.config(text="Disconnect")
|
||||
self.show_device_components()
|
||||
else:
|
||||
self.weight_reader.disconnect()
|
||||
self.connect_button.config(text="Connect")
|
||||
# self.weight_reader.disconnect()
|
||||
# self.connect_button.config(text="Connect")
|
||||
self.hide_device_components()
|
||||
|
||||
def update_devices(self):
|
||||
@@ -108,7 +110,8 @@ class WeightApp(tk.Tk):
|
||||
self.port.set(self.weight_reader.ports[0])
|
||||
|
||||
def update_calib(self):
|
||||
self.weight_reader.calib_factor = self.calib_weight.get() / float(self.calib_measurements.get())
|
||||
print(self.calib_weight.get(), self.calib_measurements.get())
|
||||
self.weight_reader.calib_factor = float(self.calib_weight.get()) / float(self.calib_measurements.get())
|
||||
|
||||
def update_view(self):
|
||||
selected_view = self.view_type_select.get()
|
||||
@@ -134,8 +137,10 @@ class WeightApp(tk.Tk):
|
||||
|
||||
def update_weight_display(self):
|
||||
weight = self.weight_reader.value
|
||||
if self.recording:
|
||||
self.record_window.append((time() - self.record_start, weight))
|
||||
self.view.update_weight(weight)
|
||||
self.after(100, self.update_weight_display)
|
||||
self.after(20, self.update_weight_display)
|
||||
|
||||
def calibrate(self):
|
||||
showinfo("Calibration", "Remove all weights.")
|
||||
@@ -148,11 +153,37 @@ class WeightApp(tk.Tk):
|
||||
print(self.weight_reader.calib_window)
|
||||
self.calib_measurements.delete(0, 'end')
|
||||
self.calib_measurements.insert(0, mean(self.weight_reader.calib_window))
|
||||
|
||||
def trigger_record(self):
|
||||
if self.recording:
|
||||
self.record_button.config(text="Record")
|
||||
self.recording = False
|
||||
file_path = tk.filedialog.asksaveasfilename(
|
||||
defaultextension=".csv",
|
||||
filetypes=[("CSV files", "*.csv"), ("All files", "*.*")],
|
||||
title="Save Recorded Weights"
|
||||
)
|
||||
if file_path:
|
||||
pd.DataFrame({
|
||||
'Timestamp': [t for t, _ in self.record_window],
|
||||
'Weight': [w for _, w in self.record_window],
|
||||
}).to_csv(file_path, index=False)
|
||||
self.record_window = []
|
||||
else:
|
||||
self.record_button.config(text="Stop")
|
||||
self.recording = True
|
||||
self.record_window = []
|
||||
self.record_start = time()
|
||||
|
||||
|
||||
|
||||
|
||||
def main():
|
||||
weight_reader = SerialReader()
|
||||
weight_reader = SerialReader(DEFAULT_CALIB_WEIGHT,
|
||||
DEFAULT_CALIB,
|
||||
window_size=MOV_AVG_DEFAULTS['window_size'],
|
||||
reset_threshold=MOV_AVG_DEFAULTS['reset_threshold'],
|
||||
ignore_samples=MOV_AVG_DEFAULTS['ignore_samples'])
|
||||
threading.Thread(target=weight_reader.read_weights, daemon=True).start()
|
||||
|
||||
app = WeightApp(weight_reader)
|
||||
@@ -1,112 +0,0 @@
|
||||
import time
|
||||
|
||||
from statistics import mean
|
||||
|
||||
from serial import Serial
|
||||
from serial.tools import list_ports
|
||||
|
||||
from .config import DEFAULT_CALIB, DEFAULT_CALIB_WEIGHT, MOV_AVG_DEFAULTS
|
||||
|
||||
class SerialReader:
|
||||
@property
|
||||
def value(self):
|
||||
return (self.current_raw_weight - self._tare) * self.calib_factor
|
||||
|
||||
@property
|
||||
def calib_factor(self):
|
||||
return self._calib_factor
|
||||
|
||||
@calib_factor.setter
|
||||
def calib_factor(self, value):
|
||||
self.calib_factor = value
|
||||
self._raw_reset_threshold = self.reset_threshold / value
|
||||
|
||||
@property
|
||||
def calibrating(self):
|
||||
return self._calibrating
|
||||
|
||||
@calibrating.setter
|
||||
def calibrating(self, value):
|
||||
if value:
|
||||
self.calib_window = []
|
||||
self._calibrating = value
|
||||
|
||||
def __init__(self):
|
||||
self.running = True
|
||||
self._calibrating = False
|
||||
self.ports = [d.device for d in list_ports.grep('usbmodem')]
|
||||
|
||||
self.serial = None
|
||||
|
||||
self._calib_factor = DEFAULT_CALIB_WEIGHT / DEFAULT_CALIB
|
||||
self.window_size = MOV_AVG_DEFAULTS['window_size']
|
||||
self.reset_threshold = MOV_AVG_DEFAULTS['reset_threshold']
|
||||
self._raw_reset_threshold = MOV_AVG_DEFAULTS['reset_threshold'] / self._calib_factor
|
||||
self.ignore_samples = MOV_AVG_DEFAULTS['ignore_samples']
|
||||
|
||||
self._tare = 0.0
|
||||
|
||||
self.window = []
|
||||
self.calib_window = []
|
||||
self.current_raw_weight = 0
|
||||
self.ignored_samples = 0
|
||||
|
||||
|
||||
def scan_devices(self):
|
||||
self.ports = [d.device for d in list_ports.grep('usbmodem')]
|
||||
|
||||
def connect(self, port, baudrate=115200):
|
||||
self.serial = Serial(port, baudrate)
|
||||
|
||||
def disconnect(self):
|
||||
self.serial.close()
|
||||
self.serial = None
|
||||
|
||||
def reset(self):
|
||||
if self.serial is not None:
|
||||
self.serial.write('reset'.encode())
|
||||
|
||||
def read_weights(self):
|
||||
while self.running:
|
||||
if self.serial is not None:
|
||||
try:
|
||||
line = self.serial.readline().decode('utf-8')
|
||||
if line.startswith('calibrated'):
|
||||
print('calibrated')
|
||||
continue
|
||||
raw_weight = int(line.split(',')[1])
|
||||
|
||||
if self._calibrating:
|
||||
self.calib_window.append(raw_weight)
|
||||
|
||||
self.filter(raw_weight)
|
||||
except:
|
||||
pass
|
||||
else:
|
||||
time.sleep(1)
|
||||
|
||||
def stop(self):
|
||||
self.running = False
|
||||
|
||||
def filter(self, raw_weight):
|
||||
if len(self.window) < self.window_size:
|
||||
self.window.append(raw_weight)
|
||||
self.current_raw_weight = mean(self.window)
|
||||
else:
|
||||
out_of_threshold = abs(self.current_raw_weight - raw_weight) > self._raw_reset_threshold
|
||||
if out_of_threshold and\
|
||||
self.ignored_samples < self.ignore_samples:
|
||||
self.ignored_samples += 1
|
||||
|
||||
elif out_of_threshold:
|
||||
self.ignored_samples = 0
|
||||
self.window = [raw_weight]
|
||||
self.current_raw_weight = raw_weight
|
||||
|
||||
else:
|
||||
self.ignored_samples = 0
|
||||
self.window.append(raw_weight)
|
||||
self.current_raw_weight = mean(self.window)
|
||||
|
||||
def tare(self):
|
||||
self._tare = self.current_raw_weight
|
||||
Reference in New Issue
Block a user