Source code for NetFT

import socket
import struct
from threading import Thread
from time import sleep

[docs]class Sensor: '''The class interface for an ATI Force/Torque sensor. This class contains all the functions necessary to communicate with an ATI Force/Torque sensor with a Net F/T interface using RDT. ''' def __init__(self, ip): '''Start the sensor interface This function initializes the class and opens the socket for the sensor. Args: ip (str): The IP address of the Net F/T box. ''' self.ip = ip self.port = 49152 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.sock.connect((ip, self.port)) self.mean = [0] * 6 self.stream = False
[docs] def send(self, command, count = 0): '''Send a given command to the Net F/T box with specified sample count. This function sends the given RDT command to the Net F/T box, along with the specified sample count, if needed. Args: command (int): The RDT command. count (int, optional): The sample count to send. Defaults to 0. ''' header = 0x1234 message = struct.pack('!HHI', header, command, count) self.sock.send(message)
[docs] def receive(self): '''Receives and unpacks a response from the Net F/T box. This function receives and unpacks an RDT response from the Net F/T box and saves it to the data class attribute. Returns: list of float: The force and torque values received. The first three values are the forces recorded, and the last three are the measured torques. ''' rawdata = self.sock.recv(1024) data = struct.unpack('!IIIiiiiii', rawdata)[3:] self.data = [data[i] - self.mean[i] for i in range(6)] return self.data
[docs] def tare(self, n = 10): '''Tare the sensor. This function takes a given number of readings from the sensor and averages them. This mean is then stored and subtracted from all future measurements. Args: n (int, optional): The number of samples to use in the mean. Defaults to 10. Returns: list of float: The mean values calculated. ''' self.mean = [0] * 6 self.getMeasurements(n = n) mean = [0] * 6 for i in range(n): self.receive() for i in range(6): mean[i] += self.measurement()[i] / float(n) self.mean = mean return mean
[docs] def zero(self): '''Remove the mean found with `tare` to start receiving raw sensor values.''' self.mean = [0] * 6
[docs] def receiveHandler(self): '''A handler to receive and store data.''' while self.stream: self.receive()
[docs] def getMeasurement(self): '''Get a single measurement from the sensor Request a single measurement from the sensor and return it. If The sensor is currently streaming, started by running `startStreaming`, then this function will simply return the most recently returned value. Returns: list of float: The force and torque values received. The first three values are the forces recorded, and the last three are the measured torques. ''' self.getMeasurements(1) self.receive() return self.data
[docs] def measurement(self): '''Get the most recent force/torque measurement Returns: list of float: The force and torque values received. The first three values are the forces recorded, and the last three are the measured torques. ''' return self.data
[docs] def getForce(self): '''Get a single force measurement from the sensor Request a single measurement from the sensor and return it. Returns: list of float: The force values received. ''' return self.getMeasurement()[:3]
[docs] def force(self): '''Get the most recent force measurement Returns: list of float: The force values received. ''' return self.measurement()[:3]
[docs] def getTorque(self): '''Get a single torque measurement from the sensor Request a single measurement from the sensor and return it. Returns: list of float: The torque values received. ''' return self.getMeasurement()[3:]
[docs] def torque(self): '''Get the most recent torque measurement Returns: list of float: The torque values received. ''' return self.measurement()[3:]
[docs] def startStreaming(self, handler = True): '''Start streaming data continuously This function commands the Net F/T box to start sending data continuously. By default this also starts a new thread with a handler to save all data points coming in. These data points can still be accessed with `measurement`, `force`, and `torque`. This handler can also be disabled and measurements can be received manually using the `receive` function. Args: handler (bool, optional): If True start the handler which saves data to be used with `measurement`, `force`, and `torque`. If False the measurements must be received manually. Defaults to True. ''' self.getMeasurements(0) if handler: self.stream = True self.thread = Thread(target = self.receiveHandler) self.thread.daemon = True self.thread.start()
[docs] def getMeasurements(self, n): '''Request a given number of samples from the sensor This function requests a given number of samples from the sensor. These measurements must be received manually using the `receive` function. Args: n (int): The number of samples to request. ''' self.send(2, count = n)
[docs] def stopStreaming(self): '''Stop streaming data continuously This function stops the sensor from streaming continuously as started using `startStreaming`. ''' self.stream = False sleep(0.1) self.send(0)