Skeleton is up. Most table params are estimated. movec sucks, we may need to hack around it (linearize using movep)
This commit is contained in:
commit
e0639b2663
288
apitesting.py
Executable file
288
apitesting.py
Executable file
@ -0,0 +1,288 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import sys, urx, math, time, math3d
|
||||
|
||||
IP = "129.241.187.119"
|
||||
#JOINTS_HOME = [5*math.pi/4, -3*math.pi/4, -3*math.pi/4, -math.pi/2, 3*math.pi/2, math.pi]
|
||||
#JOINTS_HOME = [math.pi/4, -3*math.pi/4, -3*math.pi/4, -math.pi/2, 3*math.pi/2, math.pi]
|
||||
JOINTS_HOME = [0, -3*math.pi/4, -3*math.pi/4, -math.pi/2, -math.pi/2, math.pi]
|
||||
|
||||
|
||||
|
||||
|
||||
class RobotInitException(Exception):
|
||||
pass
|
||||
|
||||
class UR5TestController(object):
|
||||
|
||||
def __init__(self, ip, joint_init=None):
|
||||
self.vel = 0.1
|
||||
self.acc = 1.0
|
||||
|
||||
try:
|
||||
self.robot = urx.Robot(ip, useRTInterface=False)
|
||||
if joint_init:
|
||||
self.robot.movej(joint_init, acc=0.1, vel=0.5, radius=0, wait=True, relative=False)
|
||||
self.pose = self.robot.getl()
|
||||
except Exception, e:
|
||||
raise RobotInitException('Robot initiation failed: %s' % str(e))
|
||||
|
||||
def test_csys(self, quadrant):
|
||||
theta_c = math.pi*quadrant/2
|
||||
pose = self.robot.getl()
|
||||
|
||||
csys = math3d.Transform()
|
||||
csys.orient.rotate_zb(theta_c)
|
||||
self.robot.set_csys("csys", csys)
|
||||
self.robot.movel(pose, acc=0.1, vel=0.5, radius=0, wait=True, relative=False)
|
||||
|
||||
trans_base = self.robot.get_transform()
|
||||
|
||||
|
||||
trans = math3d.Transform(trans_base) #deep copy
|
||||
pos = pose[0:3]
|
||||
trans.pos = math3d.Vector(pos[0]+0.1, pos[1], pos[2])
|
||||
trans.orient.rotate_zb(math.pi/4)
|
||||
|
||||
self.robot.movel(trans.pose_vector.tolist(), acc=0.1, vel=0.5, radius=0, wait=True, relative=False)
|
||||
|
||||
|
||||
def table_quadrant_test(self, quadrant):
|
||||
# quadrant = {0..3}
|
||||
theta_c = math.pi*quadrant/2
|
||||
r = 0.6
|
||||
|
||||
joints0 = [theta_c, -3*math.pi/4, -3*math.pi/4, -math.pi/2, -math.pi/2, math.pi]
|
||||
self.robot.movej(joints0, acc=0.1, vel=0.5, radius=0, wait=True, relative=False)
|
||||
pose = self.robot.getl()
|
||||
pose[0] = r*math.cos(theta_c)
|
||||
pose[1] = r*math.sin(theta_c)
|
||||
pose[2] = 0
|
||||
self.robot.movel(pose, acc=0.1, vel=0.5, radius=0, wait=True, relative=False)
|
||||
print ('q %.1f: ' % quadrant) + str([('%.3f' % i) for i in self.robot.getl()])
|
||||
time.sleep(1)
|
||||
|
||||
def table_end_poses(self):
|
||||
joints0 = [0, -3*math.pi/4, -3*math.pi/4, -math.pi/2, -math.pi/2, math.pi]
|
||||
self.robot.movej(joints0, acc=0.1, vel=0.5, radius=0, wait=True, relative=False)
|
||||
print 'pose: ' + str(self.robot.getl())
|
||||
time.sleep(1)
|
||||
joints1 = [33*math.pi/16, -3*math.pi/4, -3*math.pi/4, -math.pi/2, 27*math.pi/16, math.pi]
|
||||
self.robot.movej(joints1, acc=0.1, vel=0.5, radius=0, wait=True, relative=False)
|
||||
print 'pose: ' + str(self.robot.getl())
|
||||
|
||||
def table_quadrant_test2(self, quadrant):
|
||||
# quadrant = {0..3}
|
||||
theta_c = math.pi*quadrant/2
|
||||
r = 0.6
|
||||
|
||||
pose = self.robot.getl()
|
||||
pose[0] = r*math.cos(theta_c)
|
||||
pose[1] = r*math.sin(theta_c)
|
||||
self.robot.movel(pose, acc=0.1, vel=0.5, radius=0, wait=True, relative=False)
|
||||
print 'pose: ' + str(self.robot.getl())
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
def test_circ_pose(self):
|
||||
theta_via = math.pi*2.75/2
|
||||
pose0 = [-0.424, -0.424, -0.097, -0.729, -1.761, 1.760]
|
||||
posevia = [0.6*math.cos(theta_via), 0.6*math.sin(theta_via), -0.097, 0, 0, 0]
|
||||
pose1 = [0.000, -0.600, -0.097, 0.000, 2.221, -2.221]
|
||||
#pose1 = [0.424, -0.424, -0.097, -0.729, 1.760, -1.759]
|
||||
|
||||
self.robot.movel(pose0, acc=0.1, vel=0.5, radius=0, wait=True, relative=False)
|
||||
print 'Starting circular move...'
|
||||
self.robot.movec(posevia, pose1, acc=0.1, vel=0.1, radius=0.1, wait=True)
|
||||
time.sleep(1)
|
||||
self.robot.movel(pose1, acc=0.1, vel=0.1, radius=0, wait=True)
|
||||
|
||||
|
||||
def test_circ_pose2(self):
|
||||
pose0 = [-0.424, -0.424, -0.097, -0.729, -1.761, 1.760]
|
||||
posevia = [0.424, -0.424, -0.097, -0.729, 1.760, -1.759]
|
||||
#posevia = [0.000, -0.600, -0.097, 0.000, 2.221, -2.221]
|
||||
pose1 = [0.424, 0.424, -0.097, -1.482, 0.614, -0.613]
|
||||
#pose1 = [0.424, -0.424, -0.097, -0.729, 1.760, -1.759]
|
||||
|
||||
self.robot.movel(pose0, acc=0.1, vel=0.5, radius=0, wait=True, relative=False)
|
||||
print 'Starting circular move...'
|
||||
self.robot.movec(posevia, pose1, acc=0.1, vel=0.1, radius=0, wait=True)
|
||||
time.sleep(1)
|
||||
self.robot.movel(pose1, acc=0.1, vel=0.1, radius=0, wait=True)
|
||||
|
||||
def test_axes(self, axis):
|
||||
dist = 0.6
|
||||
pose = self.robot.getl()
|
||||
pose[0] = dist
|
||||
pose[1] = dist
|
||||
|
||||
if axis == 'x':
|
||||
pose[1] = 0
|
||||
#pose[2] = 0
|
||||
|
||||
_theta = math.pi/2
|
||||
pose[3] = math.pi/2
|
||||
pose[4] = math.pi
|
||||
pose[5] = 0
|
||||
#pose:[0.5921354182317952, 0.00589012470721734, -0.0973002219692428, -1.4823549412900907, 0.614936708121804, -0.6137840711781131]
|
||||
|
||||
elif axis == 'y':
|
||||
pose[0] = 0
|
||||
self.robot.movel(pose, acc=0.1, vel=0.1, wait=True)
|
||||
print 'pose: ' + str(self.robot.getl())
|
||||
time.sleep(2)
|
||||
|
||||
|
||||
def test_circ(self):
|
||||
pose0 = self.robot.getl()
|
||||
pose1 = [i for i in pose0]
|
||||
posevia = [i for i in pose0]
|
||||
|
||||
r = 0.4
|
||||
t0 = 9*math.pi/8
|
||||
t1 = 13*math.pi/8
|
||||
tvia = t0 + (t1-t0) / 2
|
||||
|
||||
pose0[0] = r*math.cos(t0)
|
||||
pose0[1] = r*math.sin(t0)
|
||||
self.robot.movel(pose0, acc=0.1, vel=0.1, wait=True)
|
||||
|
||||
print 'pose: ' + str(pose0)
|
||||
print 'starting circular move'
|
||||
posevia[0] = r*math.cos(tvia)
|
||||
posevia[1] = r*math.sin(tvia)
|
||||
pose1[0] = r*math.cos(t1)
|
||||
pose1[1] = r*math.sin(t1)
|
||||
self.robot.movec(posevia, pose1, acc=0.1, vel=0.1, wait=True)
|
||||
print 'pose: ' + str(pose1)
|
||||
|
||||
|
||||
def test_speedl(self):
|
||||
|
||||
n = 100
|
||||
vel = 0.08
|
||||
dtheta = 4*math.pi/n
|
||||
vels = [(-vel*math.sin(i*dtheta), vel*math.cos(i*dtheta), 0, 0, 0, 0)
|
||||
for i in range(n)]
|
||||
|
||||
dt = 0.3
|
||||
for v in vels:
|
||||
self.robot.speedl(v, 0.7, dt)
|
||||
time.sleep(dt)
|
||||
|
||||
def test_movel(self):
|
||||
n = 10
|
||||
dt = 0.5
|
||||
|
||||
pose = self.robot.getl()
|
||||
pose[0] -= 0.1*math.sin(0)
|
||||
pose[1] += 0.1*math.cos(1)
|
||||
self.robot.movel(pose, acc=0.1, vel=0.1, radius=0, wait=False)
|
||||
time.sleep(dt)
|
||||
for i in range(1, n):
|
||||
pose = self.robot.getl()
|
||||
pose[0] -= math.sin(math.pi*i/n)
|
||||
pose[1] += math.cos(math.pi*i/n)
|
||||
self.robot.movel(pose, acc=1, vel=0.1, radius=0.1, wait=False)
|
||||
time.sleep(dt)
|
||||
self.robot.stopl(0.2)
|
||||
|
||||
|
||||
def test_servoj(self):
|
||||
j0 = self.robot.getj()
|
||||
dtheta = math.pi/4
|
||||
j_incr = [dtheta]*6
|
||||
|
||||
j = [i+math.pi/3 for i in j0]
|
||||
prog = "servoj([{},{},{},{},{},{}])".format(*j)
|
||||
self.robot.send_program(prog)
|
||||
time.sleep(2)
|
||||
return
|
||||
|
||||
j = j0
|
||||
for i in range(10):
|
||||
j = map(sum, zip(j, j_incr))
|
||||
prog = "servoj([{},{},{},{},{},{}])".format(*j)
|
||||
self.robot.send_program(prog)
|
||||
time.sleep(0.5)
|
||||
|
||||
|
||||
def velocity_control(self):
|
||||
if vels == None:
|
||||
n = 20
|
||||
vel = 0.05
|
||||
dtheta = math.pi/n
|
||||
vels = [(-vel*math.sin(i*dtheta), vel*math.cos(i*dtheta), 0, 0, 0, 0)
|
||||
for i in range(n)]
|
||||
|
||||
def test_circle(self):
|
||||
pass
|
||||
|
||||
|
||||
def follow_path(self, path):
|
||||
pass
|
||||
|
||||
|
||||
def move(self, vec):
|
||||
# check internal states
|
||||
# update pose etc
|
||||
self.pose = self.robot.getl()
|
||||
self.pose[0] += vec[0]
|
||||
self.pose[1] += vec[1]
|
||||
self.pose[2] += vec[2]
|
||||
|
||||
if sum(map(abs, vec)) > 0:
|
||||
self.robot.movel(self.pose, acc=self.acc, vel=self.vel, radius=0.1, wait=False)
|
||||
else:
|
||||
self.robot.stopl(self.stop_acc)
|
||||
|
||||
def get_tcp_force(self):
|
||||
return self.robot.get_tcp_force(wait=False)
|
||||
|
||||
def get_2dvec_out(self):
|
||||
# Vector outwards from the robot in (x, y) coordinates
|
||||
theta_d = 0.02
|
||||
j1 = self.robot.getj()
|
||||
j2 = [theta+theta_d if i==1 else theta for i,theta in enumerate(j1)]
|
||||
p1 = self.robot.getl()
|
||||
|
||||
# Make a small move outwards and inwards
|
||||
self.robot.movej(j2, acc=0.1, vel=0.5, radius=0, wait=True)
|
||||
p2 = self.robot.getl()
|
||||
self.robot.movej(j1, acc=0.1, vel=0.5, radius=0, wait=True)
|
||||
return (p1[0]-p2[0], p1[1]-p2[1])
|
||||
|
||||
def cleanup(self):
|
||||
self.robot.cleanup()
|
||||
|
||||
if __name__ == '__main__':
|
||||
controller = None
|
||||
try:
|
||||
controller = UR5TestController(IP, joint_init=JOINTS_HOME)
|
||||
controller.test_csys(1)
|
||||
#controller.test_circ_pose()
|
||||
#controller.test_circ_pose2()
|
||||
#controller.table_end_poses()
|
||||
#controller.table_quadrant_test(0)
|
||||
#controller.table_quadrant_test(0)
|
||||
#controller.table_quadrant_test(0.5)
|
||||
#controller.table_quadrant_test(1)
|
||||
#controller.table_quadrant_test(1.5)
|
||||
#controller.table_quadrant_test(2)
|
||||
#controller.table_quadrant_test(2.5)
|
||||
#controller.table_quadrant_test(3)
|
||||
#controller.table_quadrant_test(3.5)
|
||||
#controller.test_circ()
|
||||
#controller.test_axes('x')
|
||||
#controller.test_axes('dummy')
|
||||
#controller.test_axes('y')
|
||||
#controller.test_speedl()
|
||||
#controller.test_servoj()
|
||||
#controller.test_movel()
|
||||
except Exception, e:
|
||||
print e
|
||||
|
||||
if controller:
|
||||
controller.cleanup()
|
||||
43
config.py
Normal file
43
config.py
Normal file
@ -0,0 +1,43 @@
|
||||
import math
|
||||
|
||||
DEBUG = True
|
||||
|
||||
# IP of the robot.
|
||||
UR5_IP = "129.241.187.119"
|
||||
|
||||
# Enables force constraints on the tool. Note: Setting this to "True" involves
|
||||
# polling the UR5 through a real-time monitor that runs at 125Hz. If the controller
|
||||
# runs slow, try disabling this first.
|
||||
USE_FORCE_MONITOR = True
|
||||
|
||||
# Control parameters.
|
||||
VELOCITY = 0.1
|
||||
ACCELERATION = 0.5
|
||||
DECELERATION = 0.5
|
||||
|
||||
# Block parameters
|
||||
BLOCK_DIM = 0.1 #TODO measure
|
||||
|
||||
# Table polar parameters
|
||||
TABLE_QUADRANT = 3 # In range [0..4). Quadrant 0 is along the x-axis of the robot.
|
||||
TABLE_ORIGO_OFFSET = -0.25 # TODO measure
|
||||
TABLE_EDGE_LEFT = -math.pi/4
|
||||
TABLE_EDGE_RIGHT = math.pi/4
|
||||
R_MIN = 0.6 # relative to TABLE center
|
||||
R_MAX = 0.9 # relative to TABLE center
|
||||
|
||||
# Table height parameters (TODO CHECK THESE)
|
||||
Z_TABLE = -0.337
|
||||
Z_MIN = Z_TABLE + BLOCK_DIM/2
|
||||
Z_MAX = Z_MIN + 5*BLOCK_DIM
|
||||
|
||||
|
||||
# Home quadrant joint configuration.
|
||||
# We need this expressed in joint space to ensure that the robot does not
|
||||
# collide with itself when moving to the base configuration.
|
||||
QUADRANT_JOINT_CONFIG = [TABLE_QUADRANT*math.pi/2,
|
||||
-3*math.pi/4,
|
||||
-3*math.pi/4,
|
||||
-math.pi/2,
|
||||
-math.pi/2,
|
||||
math.pi]
|
||||
BIN
config.pyc
Normal file
BIN
config.pyc
Normal file
Binary file not shown.
8
quadrants.txt
Normal file
8
quadrants.txt
Normal file
@ -0,0 +1,8 @@
|
||||
q 0.0: ['0.600', '0.000', '-0.097', '-1.209', '1.209', '-1.209']
|
||||
q 0.5: ['0.424', '0.424', '-0.097', '-1.482', '0.614', '-0.613']
|
||||
q 1.0: ['-0.000', '0.600', '-0.097', '-1.571', '-0.000', '0.000']
|
||||
q 1.5: ['-0.424', '0.424', '-0.097', '-1.483', '-0.614', '0.614']
|
||||
q 2.0: ['-0.600', '-0.000', '-0.097', '-1.210', '-1.209', '1.209']
|
||||
q 2.5: ['-0.424', '-0.424', '-0.097', '-0.729', '-1.761', '1.760']
|
||||
q 3.0: ['0.000', '-0.600', '-0.097', '0.000', '2.221', '-2.221']
|
||||
q 3.5: ['0.424', '-0.424', '-0.097', '-0.729', '1.760', '-1.759']
|
||||
66
testcontroller.py
Executable file
66
testcontroller.py
Executable file
@ -0,0 +1,66 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import sys, config, math, time
|
||||
from ur5controller import DemoController
|
||||
|
||||
|
||||
def check_edges(controller):
|
||||
controller.movel(config.R_MIN, config.TABLE_EDGE_RIGHT, config.Z_MIN)
|
||||
print 'Right edge.'
|
||||
dummy = raw_input('Press any key to continue...')
|
||||
|
||||
controller.movel(config.R_MIN, 0, config.Z_MIN)
|
||||
print 'Middle.'
|
||||
time.sleep(1)
|
||||
dummy = raw_input('Press any key to continue...')
|
||||
|
||||
controller.movel(config.R_MIN, config.TABLE_EDGE_LEFT, config.Z_MIN)
|
||||
print 'Left edge.'
|
||||
time.sleep(1)
|
||||
dummy = raw_input('Press any key to continue...')
|
||||
|
||||
def test_movec(controller):
|
||||
controller.movec(config.R_MIN, config.TABLE_EDGE_RIGHT, config.Z_MIN)
|
||||
print 'Right edge.'
|
||||
dummy = raw_input('Press any key to continue...')
|
||||
|
||||
controller.movec(config.R_MIN, 0, config.Z_MIN)
|
||||
print 'Middle.'
|
||||
time.sleep(1)
|
||||
dummy = raw_input('Press any key to continue...')
|
||||
|
||||
controller.movec(config.R_MIN, config.TABLE_EDGE_LEFT, config.Z_MIN)
|
||||
print 'Left edge.'
|
||||
time.sleep(1)
|
||||
dummy = raw_input('Press any key to continue...')
|
||||
|
||||
def test_movec_hax(controller):
|
||||
controller.movec_hax(config.R_MIN, config.TABLE_EDGE_RIGHT, config.Z_MIN)
|
||||
print 'Right edge.'
|
||||
dummy = raw_input('Press any key to continue...')
|
||||
|
||||
controller.movec_hax(config.R_MIN, 0, config.Z_MIN)
|
||||
print 'Middle.'
|
||||
time.sleep(1)
|
||||
dummy = raw_input('Press any key to continue...')
|
||||
|
||||
controller.movec_hax(config.R_MIN, config.TABLE_EDGE_LEFT, config.Z_MIN)
|
||||
print 'Left edge.'
|
||||
time.sleep(1)
|
||||
dummy = raw_input('Press any key to continue...')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
controller = None
|
||||
try:
|
||||
controller = DemoController(config)
|
||||
#check_edges(controller)
|
||||
test_movec_hax(controller)
|
||||
except Exception, e:
|
||||
print e
|
||||
|
||||
time.sleep(1)
|
||||
if controller:
|
||||
controller.cleanup()
|
||||
sys.exit(1)
|
||||
127
ur5controller.py
Normal file
127
ur5controller.py
Normal file
@ -0,0 +1,127 @@
|
||||
import urx, math, math3d
|
||||
|
||||
class UR5Exception(Exception):
|
||||
pass
|
||||
|
||||
class DemoController(object):
|
||||
"""Implements control of the UR5 in cylinder coordinates (corresponding to attached table).
|
||||
-Checks bounds of the arc section
|
||||
-Methods for placing blocks
|
||||
"""
|
||||
|
||||
def __init__(self, config):
|
||||
"""Initializes the controller with parameters specified in config"""
|
||||
self.debug = config.DEBUG
|
||||
try:
|
||||
self.robot = urx.Robot(config.UR5_IP, useRTInterface=config.USE_FORCE_MONITOR) #TODO or hostname
|
||||
except Exception, e:
|
||||
raise UR5Exception('Robot initiation failed: %s' % str(e))
|
||||
|
||||
# Set cylinder parameters
|
||||
self.quadrant = config.TABLE_QUADRANT
|
||||
self.phi = self.quadrant*math.pi/2
|
||||
self.cyl_offset = config.TABLE_ORIGO_OFFSET
|
||||
self.cyl_ox = self.cyl_offset*math.cos(self.phi)
|
||||
self.cyl_oy = self.cyl_offset*math.sin(self.phi)
|
||||
|
||||
self.r_min = config.R_MIN
|
||||
self.r_max = config.R_MAX
|
||||
self.theta_min = config.TABLE_EDGE_LEFT
|
||||
self.theta_max = config.TABLE_EDGE_RIGHT
|
||||
self.z_min = config.Z_MIN
|
||||
self.z_max = config.Z_MAX
|
||||
|
||||
self.j_home = config.QUADRANT_JOINT_CONFIG
|
||||
|
||||
self.vel = config.VELOCITY
|
||||
self.acc = config.ACCELERATION
|
||||
self.dec = config.DECELERATION
|
||||
|
||||
# Move to reference coordinate system base
|
||||
self.set_pose_home()
|
||||
|
||||
# Set reference coordinate system parameters
|
||||
csys = math3d.Transform()
|
||||
csys.orient.rotate_zb(self.phi)
|
||||
self.robot.set_csys("csys", csys)
|
||||
self.trans_base = self.robot.get_transform()
|
||||
self.cyl_ox = self.cyl_offset
|
||||
self.cyl_oy = 0
|
||||
|
||||
|
||||
|
||||
def set_pose_home(self):
|
||||
"""Move to "home" configuration. The resulting pose ensures correct orientation."""
|
||||
try:
|
||||
self.robot.movej(self.j_home, acc=0.1, vel=0.5, radius=0, wait=True, relative=False)
|
||||
pose = self.robot.getl()
|
||||
pose[0] = self.r_min*math.cos(self.phi) + self.cyl_ox
|
||||
pose[1] = self.r_min*math.sin(self.phi) + self.cyl_oy
|
||||
pose[2] = self.z_min
|
||||
self.robot.movel(pose, acc=0.1, vel=0.2, radius=0, wait=True, relative=False)
|
||||
self.current_cyl = (self.r_min, 0, self.z_min)
|
||||
except Exception, e:
|
||||
raise UR5Exception('Move to home configuration failed: %s' % str(e))
|
||||
|
||||
|
||||
def cylinder2pose(self, r, theta, z):
|
||||
"""Translate table cylinder coordinates to robot cartesian pose.
|
||||
(r, theta) is given in TABLE coordinates.
|
||||
"""
|
||||
trans = math3d.Transform(self.trans_base) #deep copy
|
||||
trans.orient.rotate_zb(theta)
|
||||
trans.pos = math3d.Vector(r*math.cos(theta) + self.cyl_ox,
|
||||
r*math.sin(theta) + self.cyl_oy,
|
||||
z)
|
||||
return trans.pose_vector.tolist()
|
||||
|
||||
|
||||
def movel(self, r, theta, z):
|
||||
pose = self.cylinder2pose(r, theta, z)
|
||||
self.robot.movel(pose, acc=self.acc, vel=self.vel, radius=0, wait=True, relative=False)
|
||||
self.current_cyl = (r, theta, z)
|
||||
|
||||
|
||||
def movec(self, r, theta, z):
|
||||
#Note: r should be the same.
|
||||
curr_theta = self.current_cyl[1]
|
||||
pose_via = self.cylinder2pose(r, curr_theta+(theta-curr_theta)/2, z)
|
||||
pose = self.cylinder2pose(r, theta, z)
|
||||
self.robot.movec(pose_via, pose, acc=self.acc, vel=self.vel, radius=0, wait=True)
|
||||
self.current_cyl = (r, theta, z)
|
||||
|
||||
def movec_hax(self, r, theta, z):
|
||||
"""Since movec is unreliable in reaching the end pose: linearize the arc."""
|
||||
curr_r, curr_theta, curr_z = self.current_cyl
|
||||
dtheta = theta - curr_theta
|
||||
if dtheta > 0:
|
||||
theta_incr = math.pi/41
|
||||
else:
|
||||
theta_incr = -math.pi/41
|
||||
if abs(dtheta) < abs(theta_incr):
|
||||
self.movel(r, theta, z)
|
||||
return
|
||||
|
||||
pose_list = []
|
||||
segments = int(math.floor(dtheta/theta_incr))
|
||||
r_incr = (r-curr_r)/(segments+1)
|
||||
z_incr = (z-curr_z)/(segments+1)
|
||||
for i in range(1, segments+1):
|
||||
pose_list.append(self.cylinder2pose(curr_r + i*r_incr,
|
||||
curr_theta + i*theta_incr,
|
||||
curr_z + i*z_incr))
|
||||
pose_list.append(self.cylinder2pose(r, theta, z))
|
||||
|
||||
#debug
|
||||
if self.debug:
|
||||
print "pose list for movec_hax"
|
||||
for p in pose_list:
|
||||
print [('%.3f' % i) for i in p]
|
||||
|
||||
self.robot.movels(pose_list, acc=self.acc, vel=self.vel, radius=0.05, wait=True)
|
||||
self.current_cyl = (r, theta, z)
|
||||
|
||||
|
||||
|
||||
def cleanup(self):
|
||||
self.robot.cleanup()
|
||||
BIN
ur5controller.pyc
Normal file
BIN
ur5controller.pyc
Normal file
Binary file not shown.
9
urx/__init__.py
Normal file
9
urx/__init__.py
Normal file
@ -0,0 +1,9 @@
|
||||
"""
|
||||
Python library to control an UR robot through its TCP/IP interface
|
||||
"""
|
||||
__version__ = "0.8"
|
||||
|
||||
|
||||
|
||||
from .robot import Robot, RobotException, URRobot
|
||||
|
||||
BIN
urx/__init__.pyc
Normal file
BIN
urx/__init__.pyc
Normal file
Binary file not shown.
651
urx/robot.py
Normal file
651
urx/robot.py
Normal file
@ -0,0 +1,651 @@
|
||||
"""
|
||||
Python library to control an UR robot through its TCP/IP interface
|
||||
DOC LINK
|
||||
http://support.universal-robots.com/URRobot/RemoteAccess
|
||||
"""
|
||||
from __future__ import absolute_import # necessary for import tricks to work with python2
|
||||
|
||||
__author__ = "Olivier Roulet-Dubonnet"
|
||||
__copyright__ = "Copyright 2011-2013, Sintef Raufoss Manufacturing"
|
||||
__license__ = "GPLv3"
|
||||
|
||||
import logging
|
||||
|
||||
|
||||
MATH3D = True
|
||||
try:
|
||||
import math3d as m3d
|
||||
import numpy as np
|
||||
except ImportError:
|
||||
MATH3D = False
|
||||
print 'asdfasdf'
|
||||
print("python-math3d library could not be found on this computer, disabling use of matrices and path blending")
|
||||
|
||||
from urx import urrtmon
|
||||
from urx import ursecmon
|
||||
|
||||
|
||||
class RobotException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class URRobot(object):
|
||||
"""
|
||||
Python interface to socket interface of UR robot.
|
||||
programs are send to port 3002
|
||||
data is read from secondary interface(10Hz?) and real-time interface(125Hz) (called Matlab interface in documentation)
|
||||
Since parsing the RT interface uses som CPU, and does not support all robots versions, it is disabled by default
|
||||
The RT interfaces is only used for the get_force related methods
|
||||
Rmq: A program sent to the robot i executed immendiatly and any running program is stopped
|
||||
"""
|
||||
def __init__(self, host, useRTInterface=False, logLevel=logging.WARN, parserLogLevel=logging.WARN):
|
||||
self.logger = logging.getLogger(self.__class__.__name__)
|
||||
if len(logging.root.handlers) == 0: #dirty hack
|
||||
logging.basicConfig()
|
||||
self.logger.setLevel(logLevel)
|
||||
self.host = host
|
||||
self.csys = None
|
||||
|
||||
self.logger.info("Opening secondary monitor socket")
|
||||
self.secmon = ursecmon.SecondaryMonitor(self.host, logLevel=logLevel, parserLogLevel=parserLogLevel) #data from robot at 10Hz
|
||||
|
||||
self.rtmon = None
|
||||
if useRTInterface:
|
||||
self.rtmon = self.get_realtime_monitor()
|
||||
#the next 3 values must be conservative! otherwise we may wait forever
|
||||
self.joinEpsilon = 0.01 # precision of joint movem used to wait for move completion
|
||||
# It seems URScript is limited in the character length of floats it accepts
|
||||
self.max_float_length = 6 # FIXME: check max length!!!
|
||||
|
||||
self.secmon.wait() # make sure we get data from robot before letting clients access our methods
|
||||
|
||||
|
||||
def __repr__(self):
|
||||
return "Robot Object (IP=%s, state=%s)" % (self.host, self.secmon.get_all_data()["RobotModeData"])
|
||||
|
||||
def __str__(self):
|
||||
return self.__repr__()
|
||||
|
||||
def is_running(self): # legacy
|
||||
return self.secmon.running
|
||||
|
||||
def is_program_running(self):
|
||||
return self.secmon.is_program_running()
|
||||
|
||||
def send_program(self, prog):
|
||||
self.logger.info("Sending program: " + prog)
|
||||
self.secmon.send_program(prog)
|
||||
|
||||
def get_tcp_force(self, wait=True):
|
||||
"""
|
||||
return measured force in TCP
|
||||
if wait==True, waits for next packet before returning
|
||||
"""
|
||||
return self.rtmon.getTCFForce(wait)
|
||||
|
||||
def get_force(self, wait=True):
|
||||
"""
|
||||
length of force vector returned by get_tcp_force
|
||||
if wait==True, waits for next packet before returning
|
||||
"""
|
||||
tcpf = self.get_tcp_force( wait)
|
||||
force = 0
|
||||
for i in tcpf:
|
||||
force += i**2
|
||||
return force**0.5
|
||||
|
||||
def set_tcp(self, tcp):
|
||||
"""
|
||||
set robot flange to tool tip transformation
|
||||
"""
|
||||
prog = "set_tcp(p[{}, {}, {}, {}, {}, {}])".format(*tcp)
|
||||
self.send_program(prog)
|
||||
|
||||
def set_payload(self, weight, cog=None):
|
||||
"""
|
||||
set payload in Kg
|
||||
cog is a vector x,y,z
|
||||
if cog is not specified, then tool center point is used
|
||||
"""
|
||||
if cog:
|
||||
cog = list(cog)
|
||||
cog.insert(0, weight)
|
||||
prog = "set_payload({}, ({},{},{}))".format(*cog)
|
||||
else:
|
||||
prog = "set_payload(%s)" % weight
|
||||
self.send_program(prog)
|
||||
|
||||
def set_gravity(self, vector):
|
||||
"""
|
||||
set direction of gravity
|
||||
"""
|
||||
prog = "set_gravity(%s)" % list(vector)
|
||||
self.send_program(prog)
|
||||
|
||||
def send_message(self, msg):
|
||||
"""
|
||||
send message to the GUI log tab on the robot controller
|
||||
"""
|
||||
prog = "textmsg(%s)" % msg
|
||||
self.send_program(prog)
|
||||
|
||||
def set_digital_out(self, output, val):
|
||||
"""
|
||||
set digital output. val is a bool
|
||||
"""
|
||||
if val in (True, 1):
|
||||
val = "True"
|
||||
else:
|
||||
val = "False"
|
||||
self.send_program('digital_out[%s]=%s' % (output, val))
|
||||
|
||||
def get_analog_inputs(self):
|
||||
"""
|
||||
get analog input
|
||||
"""
|
||||
return self.secmon.get_analog_inputs()
|
||||
|
||||
def get_analog_in(self, nb, wait=False):
|
||||
"""
|
||||
get analog input
|
||||
"""
|
||||
return self.secmon.get_analog_in(nb, wait=wait)
|
||||
|
||||
def get_digital_in_bits(self):
|
||||
"""
|
||||
get digital output
|
||||
"""
|
||||
return self.secmon.get_digital_in_bits()
|
||||
|
||||
def get_digital_in(self, nb, wait=False):
|
||||
"""
|
||||
get digital output
|
||||
"""
|
||||
return self.secmon.get_digital_in(nb, wait)
|
||||
|
||||
def get_digital_out(self, val, wait=False):
|
||||
"""
|
||||
get digital output
|
||||
"""
|
||||
return self.secmon.get_digital_out(val , wait=wait)
|
||||
|
||||
def set_analog_out(self, output, val):
|
||||
"""
|
||||
set analog output, val is a float
|
||||
"""
|
||||
prog = "set_analog_output(%s, %s)" % (output, val)
|
||||
self.send_program(prog)
|
||||
|
||||
def set_tool_voltage(self, val):
|
||||
"""
|
||||
set voltage to be delivered to the tool, val is 0, 12 or 24
|
||||
"""
|
||||
prog = "set_tool_voltage(%s)" % (val)
|
||||
self.send_program(prog)
|
||||
|
||||
def wait_for_move(self, radius=0, target=None):
|
||||
"""
|
||||
wait until a move is completed
|
||||
if radius is not 0, returns when tcp is radius closed to target(using mathd3d dist method)
|
||||
"""
|
||||
self.logger.debug("Waiting for move completion")
|
||||
# it is necessary to wait since robot may takes a while to get into running state,
|
||||
# for a physical move 0.5s is very short
|
||||
for _ in range(3):
|
||||
self.secmon.wait()
|
||||
if not radius or not MATH3D:
|
||||
return self._wait_for_move()
|
||||
else:
|
||||
return self._wait_for_move_radius(radius, target)
|
||||
|
||||
def _wait_for_move(self):
|
||||
while True:
|
||||
if not self.is_running():
|
||||
raise RobotException("Robot stopped")
|
||||
jts = self.secmon.get_joint_data(wait=True)
|
||||
finished = True
|
||||
for i in range(0, 6):
|
||||
#Rmq: q_target is an interpolated target we have no control over
|
||||
if abs(jts["q_actual%s"%i] - jts["q_target%s"%i]) > self.joinEpsilon:
|
||||
self.logger.debug("Waiting for end move, q_actual is {}, q_target is {}, diff is {}, epsilon is {}".format( jts["q_actual%s"%i], jts["q_target%s"%i] , jts["q_actual%s"%i] - jts["q_target%s"%i], self.joinEpsilon))
|
||||
finished = False
|
||||
break
|
||||
if finished and not self.secmon.is_program_running():
|
||||
self.logger.debug("move has ended")
|
||||
return
|
||||
|
||||
def _wait_for_move_radius(self, radius, target):
|
||||
print("Wait move with radius")
|
||||
target = m3d.Transform(target)
|
||||
while True:
|
||||
if not self.is_running():
|
||||
raise RobotException("Robot stopped")
|
||||
pose = self.get_pose(wait=True)
|
||||
dist = pose.dist(target)
|
||||
print("dist is ", dist, radius)
|
||||
if (dist < radius) or not self.secmon.is_program_running():
|
||||
self.logger.debug("move has ended")
|
||||
return
|
||||
|
||||
def dist(self, poseA, poseB):
|
||||
"""
|
||||
Return the metric distance between two poses as unweighted combined linear
|
||||
and angular distance.
|
||||
This would be better not to rely on math3d just for that method...
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def getj(self, wait=False):
|
||||
"""
|
||||
get joints position
|
||||
"""
|
||||
jts = self.secmon.get_joint_data(wait)
|
||||
return [jts["q_actual0"], jts["q_actual1"], jts["q_actual2"], jts["q_actual3"], jts["q_actual4"], jts["q_actual5"]]
|
||||
|
||||
def speedl(self, velocities, acc, min_time):
|
||||
"""
|
||||
move at given velocities until minimum min_time seconds
|
||||
"""
|
||||
vels = [round(i, self.max_float_length) for i in velocities]
|
||||
vels.append(acc)
|
||||
vels.append(min_time)
|
||||
prog = "speedl([{},{},{},{},{},{}], a={}, t_min={})".format(*vels)
|
||||
self.send_program(prog)
|
||||
|
||||
def speedj(self, velocities, acc, min_time):
|
||||
"""
|
||||
move at given joint velocities until minimum min_time seconds
|
||||
"""
|
||||
vels = [round(i, self.max_float_length) for i in velocities]
|
||||
vels.append(acc)
|
||||
vels.append(min_time)
|
||||
prog = "speedj([{},{},{},{},{},{}], a={}, t_min={})".format(*vels)
|
||||
self.send_program(prog)
|
||||
|
||||
def movej(self, joints, acc=0.1, vel=0.05, radius=0, wait=True, relative=False):
|
||||
"""
|
||||
move in joint space
|
||||
"""
|
||||
if relative:
|
||||
l = self.getj()
|
||||
joints = [v + l[i] for i, v in enumerate(joints)]
|
||||
joints = [round(j, self.max_float_length) for j in joints]
|
||||
joints.append(acc)
|
||||
joints.append(vel)
|
||||
joints.append(radius)
|
||||
prog = "movej([{},{},{},{},{},{}], a={}, v={}, r={})".format(*joints)
|
||||
self.send_program(prog)
|
||||
if not wait:
|
||||
return None
|
||||
else:
|
||||
self.wait_for_move(radius, joints[:6])
|
||||
return self.getj()
|
||||
|
||||
def movel(self, tpose, acc=0.01, vel=0.01, radius=0, wait=True, relative=False):
|
||||
"""
|
||||
linear move
|
||||
"""
|
||||
if relative:
|
||||
l = self.getl()
|
||||
tpose = [v + l[i] for i, v in enumerate(tpose)]
|
||||
tpose = [round(i, self.max_float_length) for i in tpose]
|
||||
#prog = "movel(p%s, a=%s, v=%s)" % (tpose, acc, vel)
|
||||
tpose.append(acc)
|
||||
tpose.append(vel)
|
||||
tpose.append(radius)
|
||||
prog = "movel(p[{},{},{},{},{},{}], a={}, v={}, r={})".format(*tpose)
|
||||
self.send_program(prog)
|
||||
if not wait:
|
||||
return None
|
||||
else:
|
||||
self.wait_for_move(radius, tpose[:6])
|
||||
return self.getl()
|
||||
|
||||
def movep(self, tpose, acc=0.01, vel=0.01, radius=0, wait=True, relative=False):
|
||||
"""
|
||||
Send a movep command to the robot. See URScript documentation.
|
||||
From URX the main advantage of movep is that it allows for path blending if
|
||||
math3d is installed (BROKEN!)
|
||||
"""
|
||||
if relative:
|
||||
l = self.getl()
|
||||
tpose = [v + l[i] for i, v in enumerate(tpose)]
|
||||
tpose = [round(i, self.max_float_length) for i in tpose]
|
||||
#prog = "movel(p%s, a=%s, v=%s)" % (tpose, acc, vel)
|
||||
tpose.append(acc)
|
||||
tpose.append(vel)
|
||||
tpose.append(radius)
|
||||
prog = "movep(p[{},{},{},{},{},{}], a={}, v={}, r={})".format(*tpose)
|
||||
self.send_program(prog)
|
||||
if not wait:
|
||||
return None
|
||||
else:
|
||||
self.wait_for_move(radius, tpose[:6])
|
||||
return self.getl()
|
||||
|
||||
def getl(self, wait=False):
|
||||
"""
|
||||
get TCP position
|
||||
"""
|
||||
pose = self.secmon.get_cartesian_info(wait)
|
||||
if pose:
|
||||
pose = [pose["X"], pose["Y"], pose["Z"], pose["Rx"], pose["Ry"], pose["Rz"]]
|
||||
self.logger.debug("Current pose from robot: " + str(pose))
|
||||
return pose
|
||||
|
||||
def get_pose(self, wait=False):
|
||||
return self.getl(wait)
|
||||
|
||||
def movec(self, pose_via, pose_to, acc=0.01, vel=0.01, radius=0, wait=True):
|
||||
"""
|
||||
Move Circular: Move to position (circular in tool-space)
|
||||
see UR documentation
|
||||
"""
|
||||
pose_via = [round(i, self.max_float_length) for i in pose_via]
|
||||
pose_to = [round(i, self.max_float_length) for i in pose_to]
|
||||
prog = "movec(p%s, p%s, a=%s, v=%s, r=%s)" % (pose_via, pose_to, acc, vel, radius)
|
||||
self.send_program(prog)
|
||||
if not wait:
|
||||
return None
|
||||
else:
|
||||
self.wait_for_move(radius, pose_to)
|
||||
return self.getl()
|
||||
|
||||
def movels(self, pose_list, acc=0.01, vel=0.01 , radius=0.01, wait=True):
|
||||
"""
|
||||
Concatenate several movel commands and applies a blending radius
|
||||
pose_list is a list of pose.
|
||||
"""
|
||||
header = "def myProg():\n"
|
||||
end = "end\n"
|
||||
template = "movel(p[{},{},{},{},{},{}], a={}, v={}, r={})\n"
|
||||
prog = header
|
||||
for idx, pose in enumerate(pose_list):
|
||||
pose.append(acc)
|
||||
pose.append(vel)
|
||||
if idx != (len(pose_list) -1 ):
|
||||
pose.append(radius)
|
||||
else:
|
||||
pose.append(0)
|
||||
prog += template.format(*pose)
|
||||
prog += end
|
||||
self.send_program(prog)
|
||||
if not wait:
|
||||
return None
|
||||
else:
|
||||
self.wait_for_move()
|
||||
return self.getl()
|
||||
|
||||
def stopl(self, acc = 0.5):
|
||||
self.send_program("stopl(%s)" % acc)
|
||||
|
||||
def stopj(self, acc = 1.5):
|
||||
self.send_program("stopj(%s)" % acc)
|
||||
|
||||
def stop(self):
|
||||
self.stopj()
|
||||
|
||||
def cleanup(self):
|
||||
self.logger.info("Closing sockets to robot")
|
||||
self.secmon.cleanup()
|
||||
if self.rtmon:
|
||||
self.rtmon.stop()
|
||||
shutdown = cleanup #this might be wrong since we could also shutdown the robot hardware from this script
|
||||
|
||||
def set_freedrive(self, val):
|
||||
if val:
|
||||
self.send_program("set robotmode freedrive")
|
||||
else:
|
||||
self.send_program("set robotmode run")
|
||||
|
||||
def set_simulation(self, val):
|
||||
if val:
|
||||
self.send_program("set sim")
|
||||
else:
|
||||
self.send_program("set real")
|
||||
|
||||
def get_realtime_monitor(self):
|
||||
"""
|
||||
return a pointer to the realtime monitor object
|
||||
usefull to track robot position for example
|
||||
"""
|
||||
if not self.rtmon:
|
||||
self.logger.info("Opening real-time monitor socket")
|
||||
self.rtmon = urrtmon.URRTMonitor(self.host)# som information is only available on rt interface
|
||||
self.rtmon.start()
|
||||
self.rtmon.set_csys(self.csys)
|
||||
return self.rtmon
|
||||
|
||||
|
||||
|
||||
|
||||
class Robot(URRobot):
|
||||
"""
|
||||
Generic Python interface to an industrial robot.
|
||||
Compared to the URRobot class, this class adds the possibilty to work directly with matrices
|
||||
and includes support for calibrating the robot coordinate system
|
||||
"""
|
||||
def __init__(self, host, useRTInterface=False, logLevel = logging.WARN, parserLogLevel=logging.WARN):
|
||||
URRobot.__init__(self, host, useRTInterface, logLevel=logLevel, parserLogLevel=parserLogLevel)
|
||||
self.default_linear_acceleration = 0.01
|
||||
self.default_linear_velocity = 0.01
|
||||
self.csys_dict = {}
|
||||
self.csys = None
|
||||
self.csys_inv = None
|
||||
self.set_csys("Robot", m3d.Transform()) #identity
|
||||
|
||||
def set_tcp(self, tcp):
|
||||
if type(tcp) == m3d.Transform:
|
||||
tcp = tcp.pose_vector
|
||||
URRobot.set_tcp(self, tcp)
|
||||
|
||||
def add_csys(self, name, matrix):
|
||||
self.csys_dict[name] = matrix
|
||||
|
||||
def get_csys_list(self):
|
||||
return self.csys_dict
|
||||
|
||||
def set_csys(self, name, matrix=None):
|
||||
"""
|
||||
Set reference corrdinate system to use
|
||||
if matrix != None then a new csys is created
|
||||
"""
|
||||
if matrix:
|
||||
self.add_csys(name, matrix)
|
||||
self.csys = self.csys_dict[name]
|
||||
self.csys_inv = self.csys.inverse()
|
||||
|
||||
def orient(self, orient, acc=None, vel=None, radius=0, wait=True):
|
||||
if type(orient) is not m3d.Orientation:
|
||||
orient = m3d.Orientation(orient)
|
||||
trans = self.get_transform()
|
||||
trans.orient = orient
|
||||
self.apply_transform(trans, acc, vel, radius, wait=wait)
|
||||
|
||||
set_orientation = orient
|
||||
|
||||
def translate(self, vect, acc=None, vel=None, radius=0, wait=True):
|
||||
"""
|
||||
move tool in base coordinate, keeping orientation
|
||||
"""
|
||||
t = m3d.Transform()
|
||||
if not type(vect) is m3d.Vector:
|
||||
vect = m3d.Vector(vect)
|
||||
t.pos += m3d.Vector(vect)
|
||||
return self.add_transform_base(t, acc, vel, radius, wait=wait)
|
||||
|
||||
def translate_tool(self, vect, acc=None, vel=None, radius=0, wait=True):
|
||||
"""
|
||||
move tool in tool coordinate, keeping orientation
|
||||
"""
|
||||
t = m3d.Transform()
|
||||
if not type(vect) is m3d.Vector:
|
||||
vect = m3d.Vector(vect)
|
||||
t.pos += vect
|
||||
return self.add_transform_tool(t, acc, vel, radius, wait=wait)
|
||||
|
||||
def set_pos(self, vect, acc=None, vel=None, radius=0, wait=True):
|
||||
"""
|
||||
set tool to given pos, keeping constant orientation
|
||||
"""
|
||||
if not type(vect) is m3d.Vector:
|
||||
vect = m3d.Vector(vect)
|
||||
trans = m3d.Transform(self.get_orientation(), m3d.Vector(vect))
|
||||
return self.apply_transform(trans, acc, vel, radius, wait=wait)
|
||||
|
||||
def apply_transform(self, trans, acc=None, vel=None, radius=0, wait=True, process=False):
|
||||
"""
|
||||
move tcp to point and orientation defined by a transformation
|
||||
if process is True, movep is used instead of movel
|
||||
if radius is not 0 and wait is True, the method will return when tcp
|
||||
is radius close to the target. It is then possible to send another command
|
||||
and the controller will blend the path. Blending only works with process(movep). (BROKEN!)
|
||||
"""
|
||||
if not acc:
|
||||
acc = self.default_linear_acceleration
|
||||
if not vel:
|
||||
vel = self.default_linear_velocity
|
||||
t = self.csys * trans
|
||||
if process:
|
||||
pose = URRobot.movep(self, t.pose_vector, acc=acc, vel=vel, wait=wait, radius=radius)
|
||||
else:
|
||||
pose = URRobot.movel(self, t.pose_vector, acc=acc, vel=vel, wait=wait, radius=radius)
|
||||
if pose != None : #movel does not return anything when wait is False
|
||||
return self.csys_inv * m3d.Transform(pose)
|
||||
|
||||
set_transform = apply_transform #since we have set_pos, set_orient, we should be consistent and use set for tranform to ..
|
||||
|
||||
def add_transform_base(self, trans, acc=None, vel=None, radius=0, wait=True, process=False):
|
||||
"""
|
||||
Add transform expressed in base coordinate
|
||||
"""
|
||||
pose = self.get_transform()
|
||||
return self.apply_transform(trans * pose, acc, vel, radius, wait=wait, process=process)
|
||||
|
||||
def add_transform_tool(self, trans, acc=None, vel=None, radius=0, wait=True, process=False):
|
||||
"""
|
||||
Add transform expressed in tool coordinate
|
||||
"""
|
||||
pose = self.get_transform()
|
||||
return self.apply_transform(pose * trans, acc, vel, radius, wait=wait, process=process)
|
||||
|
||||
def get_transform(self, wait=False):
|
||||
"""
|
||||
get current transform from base to to tcp
|
||||
"""
|
||||
pose = URRobot.getl(self, wait)
|
||||
trans = self.csys_inv * m3d.Transform(pose)
|
||||
return trans
|
||||
|
||||
def get_pose(self, wait=False):
|
||||
"""
|
||||
get current transform from base to to tcp
|
||||
"""
|
||||
return self.get_transform(wait)
|
||||
|
||||
def get_orientation(self, wait=False):
|
||||
trans = self.get_transform(wait)
|
||||
return trans.orient
|
||||
|
||||
def get_pos(self, wait=False):
|
||||
trans = self.get_transform(wait)
|
||||
return trans.pos
|
||||
|
||||
def speedl(self, velocities, acc, min_time):
|
||||
"""
|
||||
move at given velocities until minimum min_time seconds
|
||||
"""
|
||||
v = self.csys.orient * m3d.Vector(velocities[:3])
|
||||
w = self.csys.orient * m3d.Vector(velocities[3:])
|
||||
URRobot.speedl(self, np.concatenate((v.data, w.data)), acc, min_time)
|
||||
|
||||
def speedl_tool(self, velocities, acc, min_time):
|
||||
"""
|
||||
move in tool coordinate at given velocities until minimum min_time seconds
|
||||
"""
|
||||
pose = self.get_transform()
|
||||
v = self.csys.orient * pose.orient * m3d.Vector(velocities[:3])
|
||||
w = self.csys.orient * pose.orient * m3d.Vector(velocities[3:])
|
||||
URRobot.speedl(self, np.concatenate((v.data, w.data)), acc, min_time)
|
||||
|
||||
def movel(self, pose, acc=None, vel=None, wait=True, relative=False, radius=0.01):
|
||||
"""
|
||||
move linear to given pose in current csys
|
||||
if pose is a list of poses then movels is called
|
||||
"""
|
||||
t = m3d.Transform(pose)
|
||||
if relative:
|
||||
return self.add_transform_base(t, acc, vel, wait=wait, process=False)
|
||||
else:
|
||||
return self.apply_transform(t, acc, vel, radius, wait=wait, process=False)
|
||||
|
||||
def movec(self, pose_via, pose_to, acc=0.01, vel=0.01, radius=0, wait=True):
|
||||
"""
|
||||
Move Circular: Move to position (circular in tool-space)
|
||||
see UR documentation
|
||||
"""
|
||||
via = self.csys * m3d.Transform(pose_via)
|
||||
to = self.csys * m3d.Transform(pose_to)
|
||||
return URRobot.movec(self, via.pose_vector, to.pose_vector, acc=acc, vel=vel, radius=radius, wait=wait)
|
||||
|
||||
def movel_tool(self, pose, acc=None, vel=None, wait=True):
|
||||
"""
|
||||
move linear to given pose in tool coordinate
|
||||
"""
|
||||
t = m3d.Transform(pose)
|
||||
self.add_transform_tool(t, acc, vel, wait=wait, process=False)
|
||||
|
||||
def movep(self, pose, acc=None, vel=None, radius=0, wait=True, relative=False):
|
||||
pose = m3d.Transform(pose)
|
||||
if relative:
|
||||
return self.add_transform_base(pose, acc, vel, wait=wait, process=True, radius=radius)
|
||||
else:
|
||||
return self.apply_transform(pose, acc, vel, wait=wait, process=True, radius=radius)
|
||||
|
||||
def getl(self, wait=False):
|
||||
"""
|
||||
return current transformation from tcp to current csys
|
||||
"""
|
||||
t = self.get_transform(wait)
|
||||
return t.pose_vector.tolist()
|
||||
|
||||
def movels(self, pose_list, acc=0.01, vel=0.01 , radius=0.01, wait=True):
|
||||
"""
|
||||
Concatenate several movep commands and applies a blending radius
|
||||
pose_list is a list of pose.
|
||||
"""
|
||||
new_poses = []
|
||||
for pose in pose_list:
|
||||
t = self.csys * m3d.Transform(pose)
|
||||
pose = t.pose_vector
|
||||
pose = [round(i, self.max_float_length) for i in pose]
|
||||
new_poses.append(pose)
|
||||
return URRobot.movels(self, new_poses, acc, vel, radius, wait=wait)
|
||||
|
||||
def set_gravity(self, vector):
|
||||
if type(vector) == m3d.Vector:
|
||||
vector = vector.list
|
||||
return URRobot.set_gravity(self, vector)
|
||||
|
||||
|
||||
if not MATH3D:
|
||||
Robot = URRobot
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.INFO) #enable logging
|
||||
try:
|
||||
#robot = Robot( '192.168.1.6')
|
||||
robot = Robot( '192.168.1.5')
|
||||
r = robot
|
||||
|
||||
from IPython.frontend.terminal.embed import InteractiveShellEmbed
|
||||
ipshell = InteractiveShellEmbed( banner1="\n\n robot object is available \n\n")
|
||||
ipshell(local_ns=locals())
|
||||
|
||||
finally:
|
||||
if "robot" in dir():
|
||||
robot.cleanup()
|
||||
|
||||
|
||||
|
||||
|
||||
BIN
urx/robot.pyc
Normal file
BIN
urx/robot.pyc
Normal file
Binary file not shown.
267
urx/urrtmon.py
Normal file
267
urx/urrtmon.py
Normal file
@ -0,0 +1,267 @@
|
||||
'''
|
||||
Module for implementing a UR controller real-time monitor over socket port 30003.
|
||||
Confer http://support.universal-robots.com/Technical/RealTimeClientInterface
|
||||
Note: The packet lenght given in the web-page is 740. What is actually received from the controller is 692. It is assumed that the motor currents, the last group of 48 bytes, are not send.
|
||||
Originally Written by Morten Lind
|
||||
'''
|
||||
|
||||
__author__ = "Morten Lind, Olivier Roulet-Dubonnet"
|
||||
__copyright__ = "Copyright 2011, NTNU/SINTEF Raufoss Manufacturing AS"
|
||||
__credits__ = ["Morten Lind, Olivier Roulet-Dubonnet"]
|
||||
__license__ = "GPLv3"
|
||||
|
||||
|
||||
|
||||
import logging
|
||||
import socket
|
||||
import struct
|
||||
import time
|
||||
import threading
|
||||
from copy import deepcopy
|
||||
|
||||
import numpy as np
|
||||
|
||||
import math3d as m3d
|
||||
|
||||
class URRTMonitor(threading.Thread):
|
||||
|
||||
## Struct for revision of the UR controller giving 692 bytes
|
||||
rtstruct692 = struct.Struct('>d6d6d6d6d6d6d6d6d18d6d6d6dQ')
|
||||
|
||||
## for revision of the UR controller giving 540 byte. Here TCP
|
||||
## pose is not included!
|
||||
rtstruct540 = struct.Struct('>d6d6d6d6d6d6d6d6d18d')
|
||||
|
||||
def __init__(self, urHost, loglevel=logging.WARNING):
|
||||
threading.Thread.__init__(self)
|
||||
self.logger = logging.getLogger(self.__class__.__name__)
|
||||
if len(logging.root.handlers) == 0: #dirty hack
|
||||
logging.basicConfig()
|
||||
self.logger.setLevel(loglevel)
|
||||
self.daemon = True
|
||||
self._stop_event = True
|
||||
self._dataEvent = threading.Condition()
|
||||
self._dataAccess = threading.Lock()
|
||||
self._rtSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self._rtSock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
||||
self._urHost = urHost
|
||||
## Package data variables
|
||||
self._timestamp = None
|
||||
self._ctrlTimestamp = None
|
||||
self._qActual = None
|
||||
self._qTarget = None
|
||||
self._tcp = None
|
||||
self._tcp_force = None
|
||||
self.__recvTime = 0
|
||||
self._last_ctrl_ts = 0
|
||||
#self._last_ts = 0
|
||||
self._buffering = False
|
||||
self._buffer_lock = threading.Lock()
|
||||
self._buffer = []
|
||||
self._csys = None
|
||||
self._csys_lock = threading.Lock()
|
||||
|
||||
def set_csys(self, csys):
|
||||
with self._csys_lock:
|
||||
self._csys = csys
|
||||
|
||||
def __recv_bytes(self, nBytes):
|
||||
''' Facility method for receiving exactly "nBytes" bytes from
|
||||
the robot connector socket.'''
|
||||
## Record the time of arrival of the first of the stream block
|
||||
recvTime = 0
|
||||
pkg = b''
|
||||
while len(pkg) < nBytes:
|
||||
pkg += self._rtSock.recv(nBytes - len(pkg))
|
||||
if recvTime == 0:
|
||||
recvTime = time.time()
|
||||
self.__recvTime = recvTime
|
||||
return pkg
|
||||
|
||||
def wait(self):
|
||||
with self._dataEvent:
|
||||
self._dataEvent.wait()
|
||||
|
||||
def q_actual(self, wait=False, timestamp=False):
|
||||
""" Get the actual joint position vector."""
|
||||
if wait:
|
||||
self.wait()
|
||||
with self._dataAccess:
|
||||
if timestamp:
|
||||
return self._timestamp, self._qActual
|
||||
else:
|
||||
return self._qActual
|
||||
getActual = q_actual
|
||||
|
||||
def q_target(self, wait=False, timestamp=False):
|
||||
""" Get the target joint position vector."""
|
||||
if wait:
|
||||
self.wait()
|
||||
with self._dataAccess:
|
||||
if timestamp:
|
||||
return self._timestamp, self._qTarget
|
||||
else:
|
||||
return self._qTarget
|
||||
getTarget = q_target
|
||||
|
||||
def tcf_pose(self, wait=False, timestamp=False, ctrlTimestamp=False):
|
||||
""" Return the tool pose values."""
|
||||
if wait:
|
||||
self.wait()
|
||||
with self._dataAccess:
|
||||
tcf = self._tcp
|
||||
if ctrlTimestamp or timestamp:
|
||||
ret = [tcf]
|
||||
if timestamp:
|
||||
ret.insert(-1, self._timestamp)
|
||||
if ctrlTimestamp:
|
||||
ret.insert(-1, self._ctrlTimestamp)
|
||||
return ret
|
||||
else:
|
||||
return tcf
|
||||
getTCF = tcf_pose
|
||||
|
||||
def tcf_force(self, wait=False, timestamp=False):
|
||||
""" Get the tool force. The returned tool force is a
|
||||
six-vector of three forces and three moments."""
|
||||
if wait:
|
||||
self.wait()
|
||||
with self._dataAccess:
|
||||
# tcf = self._fwkin(self._qActual)
|
||||
tcf_force = self._tcp_force
|
||||
if timestamp:
|
||||
return self._timestamp, tcf_force
|
||||
else:
|
||||
return tcf_force
|
||||
getTCFForce = tcf_force
|
||||
|
||||
def __recv_rt_data(self):
|
||||
head = self.__recv_bytes(4)
|
||||
## Record the timestamp for this logical package
|
||||
timestamp = self.__recvTime
|
||||
pkgsize = struct.unpack('>i', head)[0]
|
||||
self.logger.debug('Received header telling that package is %d bytes long'%pkgsize)
|
||||
payload = self.__recv_bytes(pkgsize-4)
|
||||
if pkgsize >= 692:
|
||||
unp = self.rtstruct692.unpack(payload[:self.rtstruct692.size])
|
||||
elif pkgsize >= 540:
|
||||
unp = self.rtstruct540.unpack(payload[:self.rtstruct540.size])
|
||||
else:
|
||||
self.logger.warning('Error, Received packet of length smaller than 540: ', pkgsize)
|
||||
return
|
||||
|
||||
with self._dataAccess:
|
||||
self._timestamp = timestamp
|
||||
# it seems that packet often arrives packed as two... maybe TCP_NODELAY is not set on UR controller??
|
||||
#if (self._timestamp - self._last_ts) > 0.010:
|
||||
#self.logger.warning("Error the we did not receive a packet for {}s ".format( self._timestamp - self._last_ts))
|
||||
#self._last_ts = self._timestamp
|
||||
self._ctrlTimestamp = np.array(unp[0])
|
||||
if self._last_ctrl_ts != 0 and (self._ctrlTimestamp - self._last_ctrl_ts) > 0.010:
|
||||
self.logger.warning("Error the controller failed to send us a packet: time since last packet {}s ".format( self._ctrlTimestamp - self._last_ctrl_ts))
|
||||
self._last_ctrl_ts = self._ctrlTimestamp
|
||||
self._qActual = np.array(unp[31:37])
|
||||
self._qTarget = np.array(unp[1:7])
|
||||
self._tcp_force = np.array(unp[67:73])
|
||||
self._tcp = np.array(unp[73:79])
|
||||
|
||||
if self._csys:
|
||||
with self._csys_lock:
|
||||
tcp = self._csys * m3d.Transform(self._tcp)#might be a godd idea to remove dependancy on m3d
|
||||
self._tcp = tcp.pose_vector
|
||||
if self._buffering:
|
||||
with self._buffer_lock:
|
||||
self._buffer.append((self._timestamp, self._ctrlTimestamp, self._tcp, self._qActual))#FIXME use named arrays of allow to configure what data to buffer
|
||||
|
||||
with self._dataEvent:
|
||||
self._dataEvent.notifyAll()
|
||||
|
||||
def start_buffering(self):
|
||||
"""
|
||||
start buffering all data from controller
|
||||
"""
|
||||
self._buffer = []
|
||||
self._buffering = True
|
||||
|
||||
def stop_buffering(self):
|
||||
self._buffering = False
|
||||
|
||||
def try_pop_buffer(self):
|
||||
"""
|
||||
return oldest value in buffer
|
||||
"""
|
||||
with self._buffer_lock:
|
||||
if len(self._buffer) > 0:
|
||||
return self._buffer.pop(0)
|
||||
else:
|
||||
return None
|
||||
|
||||
def pop_buffer(self):
|
||||
"""
|
||||
return oldest value in buffer
|
||||
"""
|
||||
while True:
|
||||
with self._buffer_lock:
|
||||
if len(self._buffer) > 0:
|
||||
return self._buffer.pop(0)
|
||||
time.sleep(0.001)
|
||||
|
||||
|
||||
def get_buffer(self):
|
||||
"""
|
||||
return a copy of the entire buffer
|
||||
"""
|
||||
with self._buffer_lock:
|
||||
return deepcopy(self._buffer)
|
||||
|
||||
def get_all_data(self, wait=True):
|
||||
"""
|
||||
return all data parsed from robot as a dict
|
||||
"""
|
||||
if wait:
|
||||
self.wait()
|
||||
with self._dataAccess:
|
||||
return dict(timestamp=self._timestamp, ctrltimestamp=self._ctrlTimestamp, qActual=self._qActual, qTarget=self._qTarget, tcp=self._tcp, tcp_force=self._tcp_force)
|
||||
|
||||
def stop(self):
|
||||
#print(self.__class__.__name__+': Stopping')
|
||||
self._stop_event = True
|
||||
|
||||
def cleanup(self):
|
||||
self.stop()
|
||||
self.join()
|
||||
|
||||
def run(self):
|
||||
self._stop_event = False
|
||||
self._rtSock.connect((self._urHost, 30003))
|
||||
while not self._stop_event:
|
||||
self.__recv_rt_data()
|
||||
self._rtSock.close()
|
||||
|
||||
|
||||
|
||||
|
||||
def startupInteractive():
|
||||
from optparse import OptionParser
|
||||
from IPython import embed
|
||||
## Require the urhost arg.
|
||||
parser = OptionParser()
|
||||
parser.add_option('--debug', action='store_true', default=False, dest='debug')
|
||||
parser.add_option('--start', action='store_true', default=False, dest='start')
|
||||
opts, args = parser.parse_args()
|
||||
if len(args) != 1:
|
||||
raise Exception('Must have argument with ur-host name or ip!')
|
||||
urHost = args[0]
|
||||
print('Connecting to UR real-time socket inteface on "%s"'%urHost)
|
||||
# # Start the connectors
|
||||
urRTMon = URRTMonitor(urHost, debug=opts.debug)
|
||||
# # Register for hard shutdown
|
||||
try:
|
||||
if opts.start:
|
||||
urRTMon.start()
|
||||
embed()
|
||||
finally:
|
||||
urRTMon.stop()
|
||||
|
||||
if __name__ == '__main__':
|
||||
startupInteractive()
|
||||
BIN
urx/urrtmon.pyc
Normal file
BIN
urx/urrtmon.pyc
Normal file
Binary file not shown.
390
urx/ursecmon.py
Normal file
390
urx/ursecmon.py
Normal file
@ -0,0 +1,390 @@
|
||||
"""
|
||||
This file contains 2 classes:
|
||||
- ParseUtils containing utilies to parse data from UR robot
|
||||
- SecondaryMonitor, a class opening a socket to the robot and with methods to access data and send programs to the robot
|
||||
Both use data from the secondary port of the URRobot.
|
||||
Only the last connected socket on 3001 is the primary client !!!! So do not rely on it unless you know no other client is running (Hint the UR java interface is a client...)
|
||||
http://support.universal-robots.com/Technical/PrimaryAndSecondaryClientInterface
|
||||
"""
|
||||
|
||||
__author__ = "Olivier Roulet-Dubonnet"
|
||||
__copyright__ = "Copyright 2011-2013, Sintef Raufoss Manufacturing"
|
||||
__credits__ = ["Olivier Roulet-Dubonnet"]
|
||||
__license__ = "GPLv3"
|
||||
|
||||
from threading import Thread, Condition, Lock
|
||||
import logging
|
||||
import struct
|
||||
import socket
|
||||
from copy import copy
|
||||
import time
|
||||
|
||||
|
||||
|
||||
class ParsingException(Exception):
|
||||
def __init__(self, *args):
|
||||
Exception.__init__(self, *args)
|
||||
|
||||
class TimeoutException(Exception):
|
||||
def __init__(self, *args):
|
||||
Exception.__init__(self, *args)
|
||||
|
||||
|
||||
|
||||
|
||||
class ParserUtils(object):
|
||||
def __init__(self, logLevel=logging.WARN):
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self.logger.setLevel(logLevel)
|
||||
|
||||
def parse(self, data):
|
||||
"""
|
||||
parse a packet from the UR socket and return a dictionary with the data
|
||||
"""
|
||||
allData = {}
|
||||
#print "Total size ", len(data)
|
||||
while data:
|
||||
psize, ptype, pdata, data = self.analyze_header(data)
|
||||
#print "We got packet with size %i and type %s" % (psize, ptype)
|
||||
if ptype == 16:
|
||||
allData["SecondaryClientData"] = self._get_data(pdata, "!iB", ("size", "type"))
|
||||
data = (pdata + data)[5:] # This is the total size so we resend data to parser
|
||||
elif ptype == 0:
|
||||
allData["RobotModeData"] = self._get_data(pdata, "!iBQ???????Bd", ("size", "type", "timestamp", "isRobotConnected", "isRealRobotEnabled", "isPowerOnRobot", "isEmergencyStopped", "isSecurityStopped", "isProgramRunning", "isProgramPaused", "robotMode", "speedFraction"))
|
||||
elif ptype == 1:
|
||||
tmpstr = ["size", "type"]
|
||||
for i in range(0, 6):
|
||||
tmpstr += ["q_actual%s" % i, "q_target%s" % i, "qd_actual%s" % i, "I_actual%s" % i, "V_actual%s" % i, "T_motor%s" % i ,"T_micro%s" % i, "jointMode%s" % i]
|
||||
|
||||
allData["JointData"] = self._get_data(pdata, "!iB dddffffB dddffffB dddffffB dddffffB dddffffB dddffffB", tmpstr)
|
||||
|
||||
elif ptype == 4:
|
||||
allData["CartesianInfo"] = self._get_data(pdata, "iBdddddd", ("size", "type", "X", "Y", "Z", "Rx", "Ry", "Rz"))
|
||||
elif ptype == 5:
|
||||
allData["LaserPointer(OBSOLETE)"] = self._get_data(pdata, "iBddd" , ("size", "type"))
|
||||
elif ptype == 3:
|
||||
allData["MasterBoardData"] = self._get_data(pdata, "iBhhbbddbbddffffBBb" , ("size", "type", "digitalInputBits", "digitalOutputBits", "analogInputRange0", "analogInputRange1", "analogInput0", "analogInput1", "analogInputDomain0", "analogInputDomain1", "analogOutput0", "analogOutput1", "masterBoardTemperature", "robotVoltage48V", "robotCurrent", "masterIOCurrent"))#, "masterSafetyState" ,"masterOnOffState", "euromap67InterfaceInstalled" ))
|
||||
elif ptype == 2:
|
||||
allData["ToolData"] = self._get_data(pdata, "iBbbddfBffB" , ("size", "type", "analoginputRange2", "analoginputRange3", "analogInput2", "analogInput3", "toolVoltage48V", "toolOutputVoltage", "toolCurrent", "toolTemperature", "toolMode" ))
|
||||
|
||||
#elif ptype == 8:
|
||||
#allData["varMessage"] = self._get_data(pdata, "!iBQbb iiBAcAc", ("size", "type", "timestamp", "source", "robotMessageType", "code", "argument", "titleSize", "messageTitle", "messageText"))
|
||||
#elif ptype == 7:
|
||||
#allData["keyMessage"] = self._get_data(pdata, "!iBQbb iiBAcAc", ("size", "type", "timestamp", "source", "robotMessageType", "code", "argument", "titleSize", "messageTitle", "messageText"))
|
||||
|
||||
elif ptype == 20:
|
||||
tmp = self._get_data(pdata, "!iB Qbb", ("size", "type", "timestamp", "source", "robotMessageType"))
|
||||
if tmp["robotMessageType"] == 3:
|
||||
allData["VersionMessage"] = self._get_data(pdata, "!iBQbb bAbBBiAb", ("size", "type", "timestamp", "source", "robotMessageType", "projectNameSize", "projectName", "majorVersion", "minorVersion", "svnRevision", "buildDate"))
|
||||
elif tmp["robotMessageType"] == 6:
|
||||
allData["robotCommMessage"] = self._get_data(pdata, "!iBQbb iiAc", ("size", "type", "timestamp", "source", "robotMessageType", "code", "argument", "messageText"))
|
||||
elif tmp["robotMessageType"] == 1:
|
||||
allData["labelMessage"] = self._get_data(pdata, "!iBQbb iAc", ("size", "type", "timestamp", "source", "robotMessageType", "id", "messageText"))
|
||||
elif tmp["robotMessageType"] == 2:
|
||||
allData["popupMessage"] = self._get_data(pdata, "!iBQbb ??BAcAc", ("size", "type", "timestamp", "source", "robotMessageType", "warning", "error", "titleSize", "messageTitle", "messageText"))
|
||||
elif tmp["robotMessageType"] == 0:
|
||||
allData["messageText"] = self._get_data(pdata, "!iBQbb Ac", ("size", "type", "timestamp", "source", "robotMessageType", "messageText"))
|
||||
elif tmp["robotMessageType"] == 8:
|
||||
allData["varMessage"] = self._get_data(pdata, "!iBQbb iiBAcAc", ("size", "type", "timestamp", "source", "robotMessageType", "code", "argument", "titleSize", "messageTitle", "messageText"))
|
||||
elif tmp["robotMessageType"] == 7:
|
||||
allData["keyMessage"] = self._get_data(pdata, "!iBQbb iiBAcAc", ("size", "type", "timestamp", "source", "robotMessageType", "code", "argument", "titleSize", "messageTitle", "messageText"))
|
||||
elif tmp["robotMessageType"] == 5:
|
||||
allData["keyMessage"] = self._get_data(pdata, "!iBQbb iiAc", ("size", "type", "timestamp", "source", "robotMessageType", "code", "argument", "messageText"))
|
||||
else:
|
||||
self.logger.debug("Message type parser not implemented ", tmp)
|
||||
else:
|
||||
self.logger.debug("Unknown packet type %s with size %s" % (ptype, psize))
|
||||
|
||||
return allData
|
||||
|
||||
def _get_data(self, data, fmt, names):
|
||||
"""
|
||||
fill data into a dictionary
|
||||
data is data from robot packet
|
||||
fmt is struct format, but with added A for arrays and no support for numerical in fmt
|
||||
names args are strings used to store values
|
||||
"""
|
||||
tmpdata = copy(data)
|
||||
fmt = fmt.strip() # space may confuse us
|
||||
d = dict()
|
||||
i = 0
|
||||
j = 0
|
||||
while j < len(fmt) and i < len(names):
|
||||
f = fmt[j]
|
||||
if f in (" ", "!", ">", "<"):
|
||||
j += 1
|
||||
elif f == "A": #we got an array
|
||||
# first we need to find its size
|
||||
if j == len(fmt) - 2: # we are last element, size is the rest of data in packet
|
||||
arraysize = len(tmpdata)
|
||||
else: # size should be given in last element
|
||||
asn = names[i-1]
|
||||
if not asn.endswith("Size"):
|
||||
raise ParsingException("Error, array without size ! %s %s" % (asn, i))
|
||||
else:
|
||||
arraysize = d[asn]
|
||||
d[names[i]] = tmpdata[0:arraysize]
|
||||
#print "Array is ", names[i], d[names[i]]
|
||||
tmpdata = tmpdata[arraysize:]
|
||||
j += 2
|
||||
i += 1
|
||||
else:
|
||||
fmtsize = struct.calcsize(fmt[j])
|
||||
#print "reading ", f , i, j, fmtsize, len(tmpdata)
|
||||
if len(tmpdata) < fmtsize: #seems to happen on windows
|
||||
raise ParsingException("Error, length of data smaller than advertized: ", len(tmpdata), fmtsize, "for names ", names, f, i, j)
|
||||
d[names[i]] = struct.unpack("!" + f, tmpdata[0:fmtsize])[0]
|
||||
#print names[i], d[names[i]]
|
||||
tmpdata = tmpdata[fmtsize:]
|
||||
j += 1
|
||||
i += 1
|
||||
return d
|
||||
|
||||
def get_header(self, data):
|
||||
return struct.unpack("!iB", data[0:5])
|
||||
|
||||
def analyze_header(self, data):
|
||||
"""
|
||||
read first 5 bytes and return complete packet
|
||||
"""
|
||||
if not len(data) >= 5:
|
||||
raise ParsingException("Packet size %s smaller than header size (5 bytes)" % len(data))
|
||||
else:
|
||||
psize, ptype = self.get_header(data)
|
||||
if psize < 5:
|
||||
raise ParsingException("Error, declared length of data smaller than its own header(5): ", psize)
|
||||
elif psize > len(data):
|
||||
raise ParsingException("Error, length of data smaller (%s) than declared (%s)" %( len(data), psize))
|
||||
return psize, ptype, data[:psize], data[psize:]
|
||||
|
||||
def find_first_packet(self, data):
|
||||
"""
|
||||
find the first complete packet in a string
|
||||
returns None if none found
|
||||
"""
|
||||
counter = 0
|
||||
limit = 10
|
||||
while True:
|
||||
if len(data) >= 5:
|
||||
psize, ptype = self.get_header(data)
|
||||
if psize < 5 or psize > 2000 or ptype != 16:
|
||||
data = data[1:]
|
||||
counter += 1
|
||||
if counter > limit:
|
||||
self.logger.warn("tried {} times to find a packet in data, advertised packet size: {}, type: {}".format(counter, psize, ptype))
|
||||
self.logger.warn("Data length: {}".format(len(data)))
|
||||
limit = limit * 10
|
||||
elif len(data) >= psize:
|
||||
self.logger.debug("Got packet with size {0} and type {1}".format(psize, ptype))
|
||||
if counter:
|
||||
self.logger.info("Remove {0} bytes of garbage at begining of packet".format(counter))
|
||||
#ok we we have somehting which looks like a packet"
|
||||
return (data[:psize], data[psize:])
|
||||
else:
|
||||
#packet is not complete
|
||||
self.logger.debug("Packet is not complete, advertised size is {0}, received size is {1}, type is {2}".format(psize, len(data), ptype))
|
||||
return None
|
||||
else:
|
||||
#self.logger.debug("data smaller than 5 bytes")
|
||||
return None
|
||||
|
||||
|
||||
|
||||
class SecondaryMonitor(Thread):
|
||||
"""
|
||||
Monitor data from secondary port and send programs to robot
|
||||
"""
|
||||
def __init__(self, host, logLevel=logging.WARN, parserLogLevel=logging.WARN):
|
||||
Thread.__init__(self)
|
||||
self.logger = logging.getLogger(self.__class__.__name__)
|
||||
self.logger.setLevel(logLevel)
|
||||
self._parser = ParserUtils(parserLogLevel)
|
||||
self._dict = {}
|
||||
self._dictLock = Lock()
|
||||
self.host = host
|
||||
secondary_port = 30002 # Secondary client interface on Universal Robots
|
||||
self._s_secondary = socket.create_connection((self.host, secondary_port), timeout=0.5)
|
||||
self._prog_queue = []
|
||||
self._prog_queue_lock = Lock()
|
||||
self._dataqueue = bytes()
|
||||
self._trystop = False # to stop thread
|
||||
self.running = False #True when robot is on and listening
|
||||
self._dataEvent = Condition()
|
||||
self.lastpacket_timestamp = 0
|
||||
|
||||
self.start()
|
||||
self.wait()# make sure we got some data before someone calls us
|
||||
|
||||
def send_program(self, prog):
|
||||
"""
|
||||
send program to robot in URRobot format
|
||||
If another program is send while a program is running the first program is aborded.
|
||||
"""
|
||||
prog.strip()
|
||||
self.logger.debug("Sending program: " + prog)
|
||||
if type(prog) != bytes:
|
||||
prog = prog.encode()
|
||||
with self._prog_queue_lock:
|
||||
self._prog_queue.append(prog + b"\n")
|
||||
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
check program execution status in the secondary client data packet we get from the robot
|
||||
This interface uses only data from the secondary client interface (see UR doc)
|
||||
Only the last connected client is the primary client,
|
||||
so this is not guaranted and we cannot rely on information to the primary client.
|
||||
"""
|
||||
|
||||
while not self._trystop:
|
||||
with self._prog_queue_lock:
|
||||
if len(self._prog_queue) > 0:
|
||||
prog = self._prog_queue.pop(0)
|
||||
self._s_secondary.send(prog)
|
||||
|
||||
data = self._get_data()
|
||||
try:
|
||||
tmpdict = self._parser.parse(data)
|
||||
with self._dictLock:
|
||||
self._dict = tmpdict
|
||||
except ParsingException as ex:
|
||||
self.logger.warn("Error parsing one packet from urrobot: " + str(ex) )
|
||||
continue
|
||||
|
||||
if "RobotModeData" not in self._dict:
|
||||
self.logger.warn( "Got a packet from robot without RobotModeData, strange ...")
|
||||
continue
|
||||
|
||||
self.lastpacket_timestamp = time.time()
|
||||
|
||||
if self._dict["RobotModeData"]["robotMode"] == 0 \
|
||||
and self._dict["RobotModeData"]["isRealRobotEnabled"] == True \
|
||||
and self._dict["RobotModeData"]["isEmergencyStopped"] == False \
|
||||
and self._dict["RobotModeData"]["isSecurityStopped"] == False \
|
||||
and self._dict["RobotModeData"]["isRobotConnected"] == True \
|
||||
and self._dict["RobotModeData"]["isPowerOnRobot"] == True:
|
||||
self.running = True
|
||||
else:
|
||||
if self.running == True:
|
||||
self.logger.error("Robot not running: " + str( self._dict["RobotModeData"]))
|
||||
self.running = False
|
||||
with self._dataEvent:
|
||||
#print("X: new data")
|
||||
self._dataEvent.notifyAll()
|
||||
|
||||
def _get_data(self):
|
||||
"""
|
||||
returns something that looks like a packet, nothing is guaranted
|
||||
"""
|
||||
while True:
|
||||
#self.logger.debug("data queue size is: {}".format(len(self._dataqueue)))
|
||||
ans = self._parser.find_first_packet(self._dataqueue[:])
|
||||
if ans:
|
||||
self._dataqueue = ans[1]
|
||||
#self.logger.debug("found packet of size {}".format(len(ans[0])))
|
||||
return ans[0]
|
||||
else:
|
||||
#self.logger.debug("Could not find packet in received data")
|
||||
tmp = self._s_secondary.recv(1024)
|
||||
self._dataqueue += tmp
|
||||
|
||||
def wait(self, timeout=0.5):
|
||||
"""
|
||||
wait for next data packet from robot
|
||||
"""
|
||||
tstamp = self.lastpacket_timestamp
|
||||
with self._dataEvent:
|
||||
self._dataEvent.wait(timeout)
|
||||
if tstamp == self.lastpacket_timestamp:
|
||||
raise TimeoutException("Did not receive a valid data packet from robot in {}".format(timeout) )
|
||||
|
||||
def get_cartesian_info(self, wait=False):
|
||||
if wait:
|
||||
self.wait()
|
||||
with self._dictLock:
|
||||
if "CartesianInfo" in self._dict:
|
||||
return self._dict["CartesianInfo"]
|
||||
else:
|
||||
return None
|
||||
|
||||
def get_all_data(self, wait=False):
|
||||
"""
|
||||
return last data obtained from robot in dictionnary format
|
||||
"""
|
||||
if wait:
|
||||
self.wait()
|
||||
with self._dictLock:
|
||||
return self._dict.copy()
|
||||
|
||||
def get_joint_data(self, wait=False):
|
||||
if wait:
|
||||
self.wait()
|
||||
with self._dictLock:
|
||||
if "JointData" in self._dict:
|
||||
return self._dict["JointData"]
|
||||
else:
|
||||
return None
|
||||
|
||||
def get_digital_out(self, nb, wait=False):
|
||||
if wait:
|
||||
self.wait()
|
||||
with self._dictLock:
|
||||
output = self._dict["MasterBoardData"]["digitalOutputBits"]
|
||||
mask = 1 << nb
|
||||
if (output & mask):
|
||||
return 1
|
||||
else:
|
||||
return 0
|
||||
|
||||
def get_digital_in(self, nb, wait=False):
|
||||
if wait:
|
||||
self.wait()
|
||||
with self._dictLock:
|
||||
output = self._dict["MasterBoardData"]["digitalInputBits"]
|
||||
mask = 1 << nb
|
||||
if (output & mask):
|
||||
return 1
|
||||
else:
|
||||
return 0
|
||||
|
||||
def get_analog_in(self, nb, wait=False):
|
||||
if wait:
|
||||
self.wait()
|
||||
with self._dictLock:
|
||||
return self._dict["MasterBoardData"]["analogInput" + str(nb)]
|
||||
|
||||
def get_digital_in_bits(self, wait=False):
|
||||
if wait:
|
||||
self.wait()
|
||||
with self._dictLock:
|
||||
return self._dict["MasterBoardData"]["digitalInputBits"]
|
||||
|
||||
def get_analog_inputs(self, wait=False):
|
||||
if wait:
|
||||
self.wait()
|
||||
with self._dictLock:
|
||||
return self._dict["MasterBoardData"]["analogInput0"], self._dict["MasterBoardData"]["analogInput1"]
|
||||
|
||||
def is_program_running(self, wait=False):
|
||||
"""
|
||||
return True if robot is executing a program
|
||||
Rmq: The refresh rate is only 10Hz so the information may be outdated
|
||||
"""
|
||||
if wait:
|
||||
self.wait()
|
||||
with self._dictLock:
|
||||
return self._dict["RobotModeData"]["isProgramRunning"]
|
||||
|
||||
|
||||
def cleanup(self):
|
||||
self._trystop = True
|
||||
self.join()
|
||||
#with self._dataEvent: #wake up any thread that may be waiting for data before we close. Should we do that?
|
||||
#self._dataEvent.notifyAll()
|
||||
if self._s_secondary:
|
||||
with self._prog_queue_lock:
|
||||
self._s_secondary.close()
|
||||
|
||||
|
||||
|
||||
|
||||
BIN
urx/ursecmon.pyc
Normal file
BIN
urx/ursecmon.pyc
Normal file
Binary file not shown.
Loading…
x
Reference in New Issue
Block a user