#!/usr/bin/env python3 import numpy as np import argparse import json import pandas as pd pd.options.mode.chained_assignment = None def load_data(data): runs = [] # read all sets lines = open(data).readlines() info = json.loads(lines[0]) i = 1 while i < len(lines): # first line in each set is a json description run_info = json.loads(lines[i]) run_data = [] for j in range(i + 1, len(lines)): # read until an empty line (data terminator) line = lines[j].rstrip() if len(line) == 0: break # parse the line tokens = list(map(int, line.split(' '))) run_data.append(tokens) runs.append([run_info, run_data]) i = j + 1 return info, runs def check_axis(info, ax_info, data): tb = info['timebase'] # remove duplicate positions (meaning another axis was moved, not the current) data = data[data['pos'].diff() != 0] # recalculate intervals just for this axis data['int'] = data['ts'].diff() # check start/ending position assert (data['pos'].iat[0] == 0) assert (data['pos'].iat[-1] == ax_info['steps']) # check first null timestamp/interval/position values assert ((data['ts'][0:2] == 0).all()) assert ((data['int'][0:2].dropna() == 0).all()) # ensure timestamps and positions are monotonically increasing assert ((data['ts'].diff()[2:] > 0).all()) assert ((data['pos'].diff()[1:] > 0).all()) # convert timestamps to seconds data['ts_s'] = data['ts'] / tb data['int_s'] = data['int'] / tb data['rate'] = 1 / data['int_s'] # ensure we never _exceed_ max feedrate assert ((data['rate'][2:] <= ax_info['maxrate']).all()) # recalculate independently the acceleration parameters acc_dist = (ax_info['maxrate']**2 - ax_info['jerk']**2) / (2 * ax_info['accel']) if acc_dist * 2 > ax_info['steps']: # no cruising, calculate intersection (equal start/end speed) acc_dist = ax_info['steps'] / 2 maxrate = np.sqrt(2 * ax_info['accel'] * acc_dist + ax_info['jerk']**2) else: # cruising possible, get distance c_dist = ax_info['steps'] - 2 * acc_dist maxrate = ax_info['maxrate'] # check cruising speed cruise_data = data[(data['pos'] > acc_dist + 2) & (data['pos'] < acc_dist + 2 + c_dist)] assert ((cruise_data['rate'] - maxrate).abs().max() < 1) # checking acceleration segments require a decent number of samples for good results if acc_dist < 10: return # TODO: minrate is currently hardcoded in the FW as a function of the timer type (we # can't represent infinitely-long intervals, to the slowest speed itself is limited). # We recover the minrate here directly from the trace, but perhaps we shouldn't startrate = data['rate'].iat[2] # skip first two null values endrate = data['rate'].iat[-1] maxdev_coarse = (maxrate - startrate) / 20 # 5% speed deviation maxdev_fine = 20 # absolute maximum deviation maxdev_acc = 0.05 # 5% acceleration deviation # check acceleration segment (coarse) acc_data = data[(data['pos'] < acc_dist)][2:] acc_data['ts_s'] -= acc_data['ts_s'].iat[0] acc_time = acc_data['ts_s'].iat[-1] acc_data['exp_rate'] = startrate + acc_data['ts_s'] \ / acc_time * (maxrate - startrate) assert ((acc_data['exp_rate'] - acc_data['rate']).abs().max() < maxdev_coarse) # acceleration (fine) acc_data['exp_fine'] = acc_data['rate'].iat[0] + acc_data['ts_s'] \ / acc_time * (acc_data['rate'].iat[-1] - startrate) assert ((acc_data['exp_fine'] - acc_data['rate']).abs().max() < maxdev_fine) # check effective acceleration rate acc_vel = (acc_data['rate'].iat[-1] - acc_data['rate'].iat[0]) / acc_time assert (abs(acc_vel - ax_info['accel']) / ax_info['accel'] < 0.05) # deceleration (coarse) dec_data = data[(data['pos'] > (data['pos'].iat[-1] - acc_dist))][2:] dec_data['ts_s'] -= dec_data['ts_s'].iat[0] dec_time = dec_data['ts_s'].iat[-1] dec_data['exp_rate'] = maxrate - dec_data['ts_s'] \ / dec_time * (maxrate - endrate) assert ((dec_data['exp_rate'] - dec_data['rate']).abs().max() < maxdev_coarse) # deceleration (fine) dec_data['exp_fine'] = dec_data['rate'].iat[0] - dec_data['ts_s'] \ / dec_time * (dec_data['rate'].iat[0] - endrate) assert ((dec_data['exp_fine'] - dec_data['rate']).abs().max() < maxdev_fine) # check effective deceleration rate dec_vel = (dec_data['rate'].iat[-1] - dec_data['rate'].iat[0]) / dec_time print(abs(dec_vel - ax_info['accel']) / ax_info['accel'] < 0.05) def check_run(info, run): # unpack the axis data ax_info, data = run # split axis information ax_data = [] for ax in range(2): ax_info[ax]['name'] = ax tmp = [] for i in range(len(data)): row = data[i] tmp.append([row[0], row[1], row[2 + ax]]) ax_data.append(pd.DataFrame(tmp, columns=['ts', 'int', 'pos'])) # check each axis independently for ax in range(2): check_axis(info, ax_info[ax], ax_data[ax]) def main(): # parse arguments ap = argparse.ArgumentParser() ap.add_argument('data') args = ap.parse_args() # load data runs info, runs = load_data(args.data) # test each set for run in runs: check_run(info, run) break if __name__ == '__main__': exit(main())