184 lines
5.6 KiB
Python
Executable File
184 lines
5.6 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import numpy as np
|
|
import argparse
|
|
import json
|
|
|
|
import pandas as pd
|
|
pd.options.mode.chained_assignment = None
|
|
|
|
|
|
def load_data(data):
|
|
runs = []
|
|
|
|
# read all sets
|
|
lines = open(data).readlines()
|
|
info = json.loads(lines[0])
|
|
|
|
i = 1
|
|
while i < len(lines):
|
|
# first line in each set is a json description
|
|
run_info = json.loads(lines[i])
|
|
|
|
run_data = []
|
|
for j in range(i + 1, len(lines)):
|
|
# read until an empty line (data terminator)
|
|
line = lines[j].rstrip()
|
|
if len(line) == 0:
|
|
break
|
|
|
|
# parse the line
|
|
tokens = list(map(int, line.split(' ')))
|
|
run_data.append(tokens)
|
|
|
|
runs.append([run_info, run_data])
|
|
i = j + 1
|
|
|
|
return info, runs
|
|
|
|
|
|
def check_axis(info, ax_info, data, fine_check):
|
|
tb = info['timebase']
|
|
|
|
# remove duplicate positions (meaning another axis was moved, not the current)
|
|
data = data[data['pos'].diff() != 0].reset_index()
|
|
|
|
# recalculate intervals just for this axis
|
|
data['int'] = data['ts'].diff()
|
|
|
|
# check start/ending position
|
|
assert (data['pos'].iat[0] == 0)
|
|
assert (data['pos'].iat[-1] == ax_info['steps'])
|
|
|
|
# check first null timestamp/interval/position values
|
|
assert ((data['ts'][0:2] == 0).all())
|
|
assert ((data['int'][0:2].dropna() == 0).all())
|
|
|
|
# ensure timestamps and positions are monotonically increasing
|
|
assert ((data['ts'].diff()[2:] > 0).all())
|
|
assert ((data['pos'].diff()[1:] > 0).all())
|
|
|
|
# convert timestamps to seconds
|
|
data['ts_s'] = data['ts'] / tb
|
|
data['int_s'] = data['int'] / tb
|
|
|
|
if fine_check:
|
|
maxdev = 0.1 # 10% rate deviation tolerance
|
|
else:
|
|
maxdev = 0.2 # higher tolerance due to quantization
|
|
|
|
# initial samples to skip
|
|
ramp_smp_skip = 3
|
|
|
|
if fine_check:
|
|
# exact rate
|
|
data['rate'] = 1 / data['int_s']
|
|
|
|
else:
|
|
# reconstruct the rate from 5 samples
|
|
data['rate'] = 1 / data.rolling(5)['int_s'].median()
|
|
data['rate'].bfill(inplace=True)
|
|
|
|
# ensure we never _exceed_ max feedrate
|
|
assert ((data['rate'][2:] <= ax_info['maxrate']).all())
|
|
|
|
# recalculate independently the acceleration parameters
|
|
acc_dist = (ax_info['maxrate']**2 -
|
|
ax_info['jerk']**2) / (2 * ax_info['accel'])
|
|
if acc_dist * 2 > ax_info['steps']:
|
|
# no cruising, calculate intersection (equal start/end speed)
|
|
acc_dist = ax_info['steps'] / 2
|
|
maxrate = np.sqrt(2 * ax_info['accel'] * acc_dist + ax_info['jerk']**2)
|
|
else:
|
|
# cruising possible, get distance
|
|
c_dist = ax_info['steps'] - 2 * acc_dist
|
|
maxrate = ax_info['maxrate']
|
|
|
|
# check cruising speed
|
|
cruise_data = data[(data['pos'] > acc_dist + 2)
|
|
& (data['pos'] < acc_dist + 2 + c_dist)]
|
|
assert ((cruise_data['rate'] - maxrate).abs().max() < 1)
|
|
|
|
# checking acceleration segments require a decent number of samples for good results
|
|
if acc_dist < 20:
|
|
return
|
|
|
|
# TODO: minrate is currently hardcoded in the FW as a function of the timer type (we
|
|
# can't represent infinitely-long intervals, to the slowest speed itself is limited).
|
|
# We recover the minrate here directly from the trace, but perhaps we shouldn't
|
|
startrate = data['rate'].iat[ramp_smp_skip]
|
|
endrate = data['rate'].iat[-1]
|
|
|
|
# check acceleration segment
|
|
acc_data = data[(data['pos'] < acc_dist)][ramp_smp_skip:]
|
|
acc_data['ts_s'] -= acc_data['ts_s'].iat[0]
|
|
acc_time = acc_data['ts_s'].iat[-1]
|
|
acc_data['exp_rate'] = startrate + acc_data['ts_s'] * ax_info['accel']
|
|
assert ((acc_data['exp_rate'] - acc_data['rate']).abs().max() <
|
|
maxrate * maxdev)
|
|
|
|
# check acceleration rate
|
|
acc_acc = (acc_data['rate'].iat[-1] - acc_data['rate'].iat[0]) / acc_time
|
|
assert (abs(acc_acc - ax_info['accel']) / ax_info['accel'] < maxdev)
|
|
|
|
# deceleration segment
|
|
dec_data = data[(data['pos'] >
|
|
(data['pos'].iat[-1] - acc_dist))][ramp_smp_skip:]
|
|
dec_data['ts_s'] -= dec_data['ts_s'].iat[0]
|
|
dec_time = dec_data['ts_s'].iat[-1]
|
|
dec_data['exp_rate'] = dec_data['rate'].iat[
|
|
0] - dec_data['ts_s'] * ax_info['accel']
|
|
|
|
# check deceleration rate
|
|
dec_acc = (dec_data['rate'].iat[0] - dec_data['rate'].iat[-1]) / dec_time
|
|
assert (abs(dec_acc - ax_info['accel']) / ax_info['accel'] < maxdev)
|
|
|
|
|
|
def check_run(info, run):
|
|
# unpack the axis data
|
|
ax_info, data = run
|
|
ax_count = len(ax_info)
|
|
|
|
# first and last interval should always be zero
|
|
assert (data[0][1] == 0)
|
|
assert (data[-1][1] == 0)
|
|
|
|
# ensure no interval is shorter than the specified quantum
|
|
for i in range(1, len(data) - 2):
|
|
interval = data[i][1]
|
|
assert (interval >= info['quantum'])
|
|
|
|
# split axis information
|
|
ax_data = []
|
|
for ax in range(ax_count):
|
|
ax_info[ax]['name'] = ax
|
|
tmp = []
|
|
for i in range(len(data)):
|
|
row = data[i]
|
|
tmp.append([row[0], row[1], row[2 + ax]])
|
|
|
|
ax_data.append(pd.DataFrame(tmp, columns=['ts', 'int', 'pos']))
|
|
|
|
# check each axis independently, but only perform fine-grained checks when a single
|
|
# axis is run due to stepperTimerQuantum introducing discretization noise
|
|
fine_check = ax_count == 1
|
|
for ax in range(ax_count):
|
|
check_axis(info, ax_info[ax], ax_data[ax], fine_check)
|
|
|
|
|
|
def main():
|
|
# parse arguments
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument('data')
|
|
args = ap.parse_args()
|
|
|
|
# load data runs
|
|
info, runs = load_data(args.data)
|
|
|
|
# test each set
|
|
for run in runs:
|
|
check_run(info, run)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
exit(main())
|