193 lines
6.2 KiB
Python
Executable File
193 lines
6.2 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import numpy as np
|
|
import argparse
|
|
import json
|
|
|
|
import pandas as pd
|
|
pd.options.mode.chained_assignment = None
|
|
|
|
|
|
def load_data(data):
|
|
runs = []
|
|
|
|
# read all sets
|
|
lines = open(data).readlines()
|
|
info = json.loads(lines[0])
|
|
|
|
i = 1
|
|
while i < len(lines):
|
|
# first line in each set is a json description
|
|
run_info = json.loads(lines[i])
|
|
|
|
run_data = []
|
|
for j in range(i + 1, len(lines)):
|
|
# read until an empty line (data terminator)
|
|
line = lines[j].rstrip()
|
|
if len(line) == 0:
|
|
break
|
|
|
|
# parse the line
|
|
tokens = list(map(int, line.split(' ')))
|
|
run_data.append(tokens)
|
|
|
|
runs.append([run_info, run_data])
|
|
i = j + 1
|
|
|
|
return info, runs
|
|
|
|
|
|
def check_axis(info, ax_info, data, fine_check):
|
|
tb = info['timebase']
|
|
|
|
# remove duplicate positions (meaning another axis was moved, not the current)
|
|
data = data[data['pos'].diff() != 0]
|
|
|
|
# recalculate intervals just for this axis
|
|
data['int'] = data['ts'].diff()
|
|
|
|
# check start/ending position
|
|
assert (data['pos'].iat[0] == 0)
|
|
assert (data['pos'].iat[-1] == ax_info['steps'])
|
|
|
|
# check first null timestamp/interval/position values
|
|
assert ((data['ts'][0:2] == 0).all())
|
|
assert ((data['int'][0:2].dropna() == 0).all())
|
|
|
|
# ensure timestamps and positions are monotonically increasing
|
|
assert ((data['ts'].diff()[2:] > 0).all())
|
|
assert ((data['pos'].diff()[1:] > 0).all())
|
|
|
|
# convert timestamps to seconds
|
|
data['ts_s'] = data['ts'] / tb
|
|
data['int_s'] = data['int'] / tb
|
|
|
|
maxdev_fine = 20 # absolute maximum deviation
|
|
maxdev_acc = 0.05 # 5% acceleration deviation
|
|
maxdev_coarse = 0.1 # 10% speed deviation
|
|
ramp_smp_skip = 3 # skip initial null values
|
|
|
|
if fine_check:
|
|
# exact rate
|
|
data['rate'] = 1 / data['int_s']
|
|
|
|
else:
|
|
# reconstruct the rate from 3 samples
|
|
data['rate'] = 1 / data.rolling(3)['int_s'].median()
|
|
data['rate'].bfill(inplace=True)
|
|
|
|
# ensure we never _exceed_ max feedrate
|
|
assert ((data['rate'][2:] <= ax_info['maxrate']).all())
|
|
|
|
# recalculate independently the acceleration parameters
|
|
acc_dist = (ax_info['maxrate']**2 -
|
|
ax_info['jerk']**2) / (2 * ax_info['accel'])
|
|
if acc_dist * 2 > ax_info['steps']:
|
|
# no cruising, calculate intersection (equal start/end speed)
|
|
acc_dist = ax_info['steps'] / 2
|
|
maxrate = np.sqrt(2 * ax_info['accel'] * acc_dist + ax_info['jerk']**2)
|
|
else:
|
|
# cruising possible, get distance
|
|
c_dist = ax_info['steps'] - 2 * acc_dist
|
|
maxrate = ax_info['maxrate']
|
|
|
|
# check cruising speed
|
|
cruise_data = data[(data['pos'] > acc_dist + 2)
|
|
& (data['pos'] < acc_dist + 2 + c_dist)]
|
|
cruise_maxdev = 1 if fine_check else maxrate * maxdev_coarse
|
|
assert ((cruise_data['rate'] - maxrate).abs().max() < cruise_maxdev)
|
|
|
|
# checking acceleration segments require a decent number of samples for good results
|
|
if acc_dist < 10:
|
|
return
|
|
|
|
# TODO: minrate is currently hardcoded in the FW as a function of the timer type (we
|
|
# can't represent infinitely-long intervals, to the slowest speed itself is limited).
|
|
# We recover the minrate here directly from the trace, but perhaps we shouldn't
|
|
startrate = data['rate'].iat[ramp_smp_skip]
|
|
endrate = data['rate'].iat[-1]
|
|
|
|
# check acceleration segment (coarse)
|
|
acc_data = data[(data['pos'] < acc_dist)][ramp_smp_skip:]
|
|
acc_data['ts_s'] -= acc_data['ts_s'].iat[0]
|
|
acc_time = acc_data['ts_s'].iat[-1]
|
|
acc_data['exp_rate'] = startrate + acc_data['ts_s'] \
|
|
/ acc_time * (maxrate - startrate)
|
|
assert ((acc_data['exp_rate'] - acc_data['rate']).abs().max() <
|
|
maxrate * maxdev_coarse)
|
|
|
|
# acceleration (fine)
|
|
acc_data['exp_fine'] = acc_data['rate'].iat[0] + acc_data['ts_s'] \
|
|
/ acc_time * (acc_data['rate'].iat[-1] - startrate)
|
|
if fine_check:
|
|
assert ((acc_data['exp_fine'] - acc_data['rate']).abs().max() <
|
|
maxdev_fine)
|
|
|
|
# check effective acceleration rate
|
|
acc_vel = (acc_data['rate'].iat[-1] - acc_data['rate'].iat[0]) / acc_time
|
|
if fine_check:
|
|
assert (abs(acc_vel - ax_info['accel']) / ax_info['accel'] <
|
|
maxdev_acc)
|
|
|
|
# deceleration (coarse)
|
|
dec_data = data[(data['pos'] > (data['pos'].iat[-1] - acc_dist))][2:]
|
|
dec_data['ts_s'] -= dec_data['ts_s'].iat[0]
|
|
dec_time = dec_data['ts_s'].iat[-1]
|
|
dec_data['exp_rate'] = maxrate - dec_data['ts_s'] \
|
|
/ dec_time * (maxrate - endrate)
|
|
assert ((dec_data['exp_rate'] - dec_data['rate']).abs().max() <
|
|
maxrate * maxdev_coarse)
|
|
|
|
# deceleration (fine)
|
|
dec_data['exp_fine'] = dec_data['rate'].iat[0] - dec_data['ts_s'] \
|
|
/ dec_time * (dec_data['rate'].iat[0] - endrate)
|
|
if fine_check:
|
|
assert ((dec_data['exp_fine'] - dec_data['rate']).abs().max() <
|
|
maxdev_fine)
|
|
|
|
# check effective deceleration rate
|
|
dec_vel = (dec_data['rate'].iat[0] - dec_data['rate'].iat[-1]) / dec_time
|
|
if fine_check:
|
|
# TODO: deceleration rate is not as accurate as acceleration!
|
|
assert (abs(dec_vel - ax_info['accel']) / ax_info['accel'] < 0.15)
|
|
|
|
|
|
def check_run(info, run):
|
|
# unpack the axis data
|
|
ax_info, data = run
|
|
ax_count = len(ax_info)
|
|
|
|
# split axis information
|
|
ax_data = []
|
|
for ax in range(ax_count):
|
|
ax_info[ax]['name'] = ax
|
|
tmp = []
|
|
for i in range(len(data)):
|
|
row = data[i]
|
|
tmp.append([row[0], row[1], row[2 + ax]])
|
|
|
|
ax_data.append(pd.DataFrame(tmp, columns=['ts', 'int', 'pos']))
|
|
|
|
# check each axis independently, but only perform fine-grained checks when a single
|
|
# axis is run due to stepperTimerQuantum introducing discretization noise
|
|
fine_check = ax_count == 1
|
|
for ax in range(ax_count):
|
|
check_axis(info, ax_info[ax], ax_data[ax], fine_check)
|
|
|
|
|
|
def main():
|
|
# parse arguments
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument('data')
|
|
args = ap.parse_args()
|
|
|
|
# load data runs
|
|
info, runs = load_data(args.data)
|
|
|
|
# test each set
|
|
for run in runs:
|
|
check_run(info, run)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
exit(main())
|