Files
evaluation/evaluation/calibration/frame.py
2025-10-13 21:56:40 +02:00

369 lines
16 KiB
Python

import time
import statistics
import threading
import pandas as pd
import os
from datetime import datetime
from tkinter import messagebox, ttk
class CalibrationFrame(ttk.Frame):
def __init__(self, master, serial_reader, **kwargs):
super().__init__(master, **kwargs)
self.serial_reader = serial_reader
self.calibration_in_progress = False
# UI Components
self.status_label = ttk.Label(self, text="Ready for calibration")
self.status_label.pack(pady=10)
self.calibration_factor_label = ttk.Label(self, text="Calibration Factor: Not set")
self.calibration_factor_label.pack(pady=5)
self.std_label = ttk.Label(self, text="Standard Deviation: Not calculated")
self.std_label.pack(pady=5)
self.drift_label = ttk.Label(self, text="Drift: Not calculated")
self.drift_label.pack(pady=5)
# Calibration name input
self.name_frame = ttk.Frame(self)
self.name_frame.pack(pady=10)
self.name_label = ttk.Label(self.name_frame, text="Calibration Name:")
self.name_label.pack(side='left', padx=(0, 5))
self.name_entry = ttk.Entry(self.name_frame, width=25)
self.name_entry.pack(side='left', padx=(0, 5))
self.name_entry.insert(0, self.generate_default_name()) # Default name
self.auto_name_button = ttk.Button(self.name_frame, text="Auto", command=self.generate_auto_name, width=6)
self.auto_name_button.pack(side='left')
self.calibrate_button = ttk.Button(self, text="Start Calibration", command=self.start_calibration)
self.calibrate_button.pack(pady=10)
self.reset_button = ttk.Button(self, text="Reset Calibration", command=self.reset_calibration)
self.reset_button.pack(pady=5)
self.progress_bar = ttk.Progressbar(self, length=300, mode='determinate')
self.progress_bar.pack(pady=5)
def start_calibration(self):
if self.calibration_in_progress:
return
# Check if serial reader is connected - try multiple ways to check connection
is_connected = self.serial_reader.is_connected
if not is_connected:
messagebox.showerror("Error", "Please connect to device first!")
return
# Get calibration name
calibration_name = self.name_entry.get().strip()
if not calibration_name:
calibration_name = "Unnamed Calibration"
# Start calibration routine
self.calibration_in_progress = True
self.calibrate_button.config(state='disabled')
self.progress_bar['value'] = 0
# Show initial prompt
result = messagebox.askokcancel(
f"Calibration Procedure - {calibration_name}",
"Step 1: Remove all weights from the scale and press OK to continue."
)
if not result:
self.calibration_in_progress = False
self.calibrate_button.config(state='normal')
return
# Wait a moment for scale to settle
self.status_label.config(text="Waiting for scale to settle...")
self.update_idletasks()
time.sleep(2)
# Show second prompt
result = messagebox.askokcancel(
f"Calibration Procedure - {calibration_name}",
"Step 2: Place the 100g calibration weight on the scale and press OK to start measurement."
)
if not result:
self.calibration_in_progress = False
self.calibrate_button.config(state='normal')
self.status_label.config(text=f"Calibration '{calibration_name}' cancelled")
return
# Start measurement thread
try:
self.status_label.config(text=f"Collecting data for '{calibration_name}' (30 seconds)...")
# Collect data for 30 seconds
start_time = time.time()
duration = 30.0
self.serial_reader.calibrating = True
while time.time() - start_time < duration:
# Update progress bar
elapsed = time.time() - start_time
progress = (elapsed / duration) * 100
self.progress_bar['value'] = progress
self.update_idletasks()
self.progress_bar['value'] = 100
calibration_readings = self.serial_reader.calib_window
self.serial_reader.calibrating = False
if len(calibration_readings) == 0:
messagebox.showerror("Error", "No readings collected. Check device connection.")
self.calibration_in_progress = False
self.calibrate_button.config(state='normal')
self.status_label.config(text=f"Calibration '{calibration_name}' failed")
return
# Calculate statistics
mean_reading = statistics.mean(calibration_readings)
std_reading = statistics.stdev(calibration_readings) if len(calibration_readings) > 1 else 0
# Calculate overall drift during the 30-second readout
drift_calculation = self.calculate_drift(calibration_readings)
overall_drift = drift_calculation['overall_drift']
drift_rate = drift_calculation['drift_rate']
# Calculate calibration factor (100g / mean_reading)
calibration_factor = 100.0 / mean_reading
# Set calibration factor in serial reader - try multiple ways
self.serial_reader.calib_factor = calibration_factor
# Save calibration data to CSV log
self.save_calibration_log(calibration_name, calibration_readings, mean_reading,
std_reading, overall_drift, drift_rate, calibration_factor)
# Save individual readings to separate CSV file
readings_file = self.save_individual_readings(calibration_name, calibration_readings)
# Update UI
self.status_label.config(text=f"Calibration '{calibration_name}' complete!")
self.calibration_factor_label.config(text=f"Calibration Factor: {calibration_factor:.2f}")
self.std_label.config(text=f"Standard Deviation: {std_reading:.2f} (raw units)")
self.drift_label.config(text=f"Drift: {overall_drift:.2f} units ({drift_rate:.2f} units/sec)")
# Show results
readings_file_msg = f"Individual readings saved to: {os.path.basename(readings_file)}" if readings_file else "Individual readings save failed"
messagebox.showinfo(
f"Calibration Complete - {calibration_name}",
f"Calibration '{calibration_name}' successful!\n\n"
f"Samples collected: {len(calibration_readings)}\n"
f"Mean reading: {mean_reading:.2f} (raw units)\n"
f"Standard deviation: {std_reading:.2f} (raw units)\n"
f"Overall drift: {overall_drift:.2f} units ({drift_rate:.2f} units/sec)\n"
f"Calibration factor: {calibration_factor:.2f}\n\n"
f"The scale is now calibrated for 100g.\n\n"
f"Data saved to:\n"
f"• Summary: calib_logs.csv\n"
f"{readings_file_msg}"
)
except Exception as e:
messagebox.showerror("Error", f"Calibration '{calibration_name}' failed: {str(e)}")
self.status_label.config(text=f"Calibration '{calibration_name}' failed")
finally:
self.calibration_in_progress = False
self.calibrate_button.config(state='normal')
self.progress_bar['value'] = 0
def calculate_drift(self, readings):
"""
Calculate the overall drift during the measurement period.
Args:
readings: List of measurement readings collected over time
Returns:
dict: Contains overall_drift (difference between end and start)
and drift_rate (drift per second)
"""
if len(readings) < 2:
return {'overall_drift': 0.0, 'drift_rate': 0.0}
# Calculate drift using linear regression to find the trend
n = len(readings)
x_values = list(range(n)) # Time indices
y_values = readings
# Simple linear regression: y = mx + b
# Calculate slope (m) which represents the drift rate per sample
x_mean = statistics.mean(x_values)
y_mean = statistics.mean(y_values)
numerator = sum((x_values[i] - x_mean) * (y_values[i] - y_mean) for i in range(n))
denominator = sum((x_values[i] - x_mean) ** 2 for i in range(n))
if denominator == 0:
slope = 0
else:
slope = numerator / denominator
# Overall drift is the difference between the projected end value and start value
overall_drift = slope * (n - 1)
# Convert to drift rate per second (assuming 30 seconds measurement duration)
measurement_duration = 30.0 # seconds
drift_rate_per_second = overall_drift / measurement_duration
# Alternative simple calculation: difference between first and last values
simple_drift = readings[-1] - readings[0]
return {
'overall_drift': overall_drift,
'drift_rate': drift_rate_per_second,
'simple_drift': simple_drift,
'slope': slope
}
def save_calibration_log(self, calibration_name, calibration_readings, mean_reading,
std_reading, overall_drift, drift_rate, calibration_factor):
"""
Save calibration data to calib_logs.csv using pandas
Args:
calibration_name: Name of the calibration run
calibration_readings: List of all raw readings
mean_reading: Mean of the readings
std_reading: Standard deviation of the readings
overall_drift: Overall drift during measurement
drift_rate: Drift rate per second
calibration_factor: Calculated calibration factor
"""
try:
# Create the calibration data record
timestamp = datetime.now()
# Create a record for the summary data
calibration_record = {
'timestamp': timestamp.strftime("%Y-%m-%d %H:%M:%S"),
'calibration_name': calibration_name,
'sample_count': len(calibration_readings),
'mean_reading': round(mean_reading, 4),
'std_deviation': round(std_reading, 4),
'overall_drift': round(overall_drift, 4),
'drift_rate_per_sec': round(drift_rate, 6),
'calibration_factor': round(calibration_factor, 4),
'min_reading': round(min(calibration_readings), 4),
'max_reading': round(max(calibration_readings), 4),
'measurement_duration_sec': 30.0
}
# Create DataFrame with the new record
new_record_df = pd.DataFrame([calibration_record])
csv_file = 'calib_logs.csv'
# Check if file exists and append, otherwise create new
if os.path.exists(csv_file):
# Append to existing file
new_record_df.to_csv(csv_file, mode='a', header=False, index=False)
else:
# Create new file with header
new_record_df.to_csv(csv_file, mode='w', header=True, index=False)
print(f"Calibration data saved to {csv_file}")
except Exception as e:
print(f"Error saving calibration log: {str(e)}")
messagebox.showwarning("Warning", f"Could not save calibration log: {str(e)}")
def save_individual_readings(self, calibration_name, calibration_readings):
"""
Save individual calibration readings to a separate CSV file in ./logs directory
Args:
calibration_name: Name of the calibration run
calibration_readings: List of all raw readings
"""
try:
# Create logs directory if it doesn't exist
logs_dir = './logs'
if not os.path.exists(logs_dir):
os.makedirs(logs_dir)
# Generate filename with date and calibration name
timestamp = datetime.now()
date_str = timestamp.strftime("%Y%m%d")
time_str = timestamp.strftime("%H%M%S")
# Clean calibration name for filename (remove invalid characters)
clean_name = "".join(c for c in calibration_name if c.isalnum() or c in (' ', '-', '_')).rstrip()
clean_name = clean_name.replace(' ', '_')
filename = f"{date_str}_{clean_name}_{time_str}_readings.csv"
filepath = os.path.join(logs_dir, filename)
# Create DataFrame with individual readings
# Add sample number and timestamp for each reading (assuming ~1 reading per second)
readings_data = []
for i, reading in enumerate(calibration_readings):
sample_time = i * (30.0 / len(calibration_readings)) # Distribute over 30 seconds
readings_data.append({
'sample_number': i + 1,
'time_seconds': round(sample_time, 3),
'raw_reading': reading,
'calibration_name': calibration_name,
'timestamp': timestamp.strftime("%Y-%m-%d %H:%M:%S")
})
readings_df = pd.DataFrame(readings_data)
# Save to CSV
readings_df.to_csv(filepath, index=False)
print(f"Individual readings saved to {filepath}")
return filepath
except Exception as e:
print(f"Error saving individual readings: {str(e)}")
messagebox.showwarning("Warning", f"Could not save individual readings: {str(e)}")
return None
def generate_default_name(self):
"""Generate a default calibration name with timestamp"""
from datetime import datetime
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
return f"Calib_{timestamp}"
def generate_auto_name(self):
"""Generate and set a new automatic calibration name"""
self.name_entry.delete(0, 'end')
self.name_entry.insert(0, self.generate_default_name())
def reset_calibration(self):
"""Reset calibration factor and UI display"""
# Reset calibration factor in serial reader
if hasattr(self.serial_reader, 'set_calibration_factor'):
self.serial_reader.set_calibration_factor(1.0)
elif hasattr(self.serial_reader, 'calibration_factor'):
self.serial_reader.calibration_factor = 1.0
elif hasattr(self.serial_reader, 'calib_factor'):
self.serial_reader.calib_factor = 1.0
elif hasattr(self.serial_reader, '_calibration_factor'):
self.serial_reader._calibration_factor = 1.0
# Reset UI
self.status_label.config(text="Ready for calibration")
self.calibration_factor_label.config(text="Calibration Factor: Not set")
self.std_label.config(text="Standard Deviation: Not calculated")
self.drift_label.config(text="Drift: Not calculated")
self.progress_bar['value'] = 0
# Reset name to a new auto-generated name
self.name_entry.delete(0, 'end')
self.name_entry.insert(0, self.generate_default_name())