147 lines
4.3 KiB
Python
147 lines
4.3 KiB
Python
import tkinter as tk
|
|
import matplotlib.pyplot as plt
|
|
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg, NavigationToolbar2Tk
|
|
|
|
import asyncio
|
|
|
|
from .filter import *
|
|
from .gui.device import Device
|
|
from .gui.toolbar import RecordForm, FilterForm, DataStats
|
|
|
|
class FilterDevApp(tk.Tk):
|
|
def __init__(self, loop: asyncio.EventLoop):
|
|
super().__init__()
|
|
self.loop = loop
|
|
self.protocol("WM_DELETE_WINDOW", self.close)
|
|
self.tasks = []
|
|
self.tasks.append(loop.create_task(self.updater(1./100)))
|
|
|
|
self.filter = None
|
|
|
|
self.title("JannTers Filter Evaluation Tool")
|
|
|
|
# Create a frame for sliders
|
|
self.toolbar = tk.Frame(self, width=200, padx=10)
|
|
self.toolbar.pack(side=tk.LEFT)
|
|
|
|
# Device Settings
|
|
self.record_form = RecordForm(self.toolbar, self.record_data)
|
|
self.record_form.pack(pady=10)
|
|
|
|
self.device = Device(self.record_form.device_name)
|
|
|
|
# Filter Settings
|
|
self.filter_form = FilterForm(self.toolbar, self.update_filter)
|
|
self.filter = MovAvg(self.device, self.toolbar, self.update_plot)
|
|
|
|
self.data_stats = DataStats(self.toolbar, self.reset)
|
|
|
|
# Create a figure for plotting
|
|
self.frame = tk.Frame(self)
|
|
self.frame.pack(side=tk.RIGHT)
|
|
|
|
self.fig, self.ax = plt.subplots()
|
|
self.ax2 = self.ax.twinx()
|
|
self.canvas = FigureCanvasTkAgg(self.fig, master=self.frame)
|
|
self.canvas.get_tk_widget().pack(side=tk.TOP, fill=tk.BOTH)
|
|
NavigationToolbar2Tk(self.canvas, self.frame)
|
|
|
|
self.focus_force()
|
|
|
|
def update_plot(self):
|
|
if self.filter is None:
|
|
return
|
|
|
|
# Clear the current plot
|
|
self.ax.clear()
|
|
self.ax2.clear()
|
|
|
|
# Get current values from sliders
|
|
df = self.filter()
|
|
self.data_stats.update_stats(df)
|
|
|
|
# Generate data
|
|
x = df['timestamps']
|
|
y1 = df['weights']
|
|
y2 = df['filtered']
|
|
y2_g = df['filtered_calib']
|
|
|
|
# Plot the data
|
|
self.ax.plot(x, y1, label="raw")
|
|
self.ax.plot(x, y2, label="filtered")
|
|
|
|
# self.ax2.plot(x, y1_g)
|
|
self.ax2.plot(x, y2_g, label="filtered", color='green')
|
|
self.ax2.set_ylabel("Weight", color="green")
|
|
|
|
self.ax.set_xlabel("Time in ms")
|
|
self.ax.set_ylabel("Raw Weight")
|
|
self.ax.grid()
|
|
self.ax.legend(loc='upper left')
|
|
self.ax2.yaxis.set_label_position("right")
|
|
self.ax2.yaxis.tick_right()
|
|
self.ax2.legend(loc='lower right')
|
|
|
|
self.fig.tight_layout()
|
|
self.canvas.get_tk_widget().pack(side=tk.TOP, fill=tk.BOTH)
|
|
|
|
# Draw the updated plot
|
|
self.canvas.draw()
|
|
|
|
def update_filter(self):
|
|
option = self.filter_form.filter_type_combobox.get()
|
|
if option == 'MovAvg' and not isinstance(self.filter, MovAvg):
|
|
self.filter = MovAvg(self.device, self.toolbar, self.update_plot)
|
|
|
|
self.update_plot()
|
|
|
|
def record_data(self):
|
|
if self.device.is_connected:
|
|
self.record_form.pack(pady=10)
|
|
|
|
self.device.disconnect()
|
|
|
|
else:
|
|
record_duration = self.record_form.record_time.get_value()
|
|
task = self.loop.create_task(self.device.read_values(record_duration))
|
|
task.add_done_callback(self.data_recorded)
|
|
|
|
|
|
def data_recorded(self, *args):
|
|
if self.device.is_connected:
|
|
self.record_form.pack_forget()
|
|
|
|
self.filter_form.pack(pady=10)
|
|
self.filter.pack(pady=10)
|
|
self.data_stats.pack(pady=10)
|
|
|
|
self.update_filter()
|
|
|
|
async def updater(self, interval):
|
|
while await asyncio.sleep(interval, True):
|
|
self.update()
|
|
|
|
def reset(self):
|
|
self.device.disconnect()
|
|
self.device.clear_data()
|
|
|
|
self.filter_form.pack_forget()
|
|
self.filter.pack_forget()
|
|
self.data_stats.pack_forget()
|
|
|
|
self.record_form.pack()
|
|
self.update_plot()
|
|
|
|
def close(self):
|
|
for task in self.tasks:
|
|
task.cancel()
|
|
self.loop.stop()
|
|
self.destroy()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
loop = asyncio.new_event_loop()
|
|
app = FilterDevApp(loop)
|
|
loop.run_forever()
|
|
loop.close()
|