class _ReadbackSignal(Signal):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._metadata.update( connected=True, write_access=False,)
def get(self):
self._readback = self.parent.proxy.position
return self._readback
def describe(self):
res = super().describe()
return res
@property
def timestamp(self):
"""Timestamp of the readback value"""
return ttime.time()
#return self.parent.sim_state["readback_ts"]
def put(self, value, *, timestamp=None, force=False):
raise ReadOnlyError("The signal {} is readonly.".format(self.name))
def set(self, value, *, timestamp=None, force=False):
raise ReadOnlyError("The signal {} is readonly.".format(self.name))
class motorTango(Device):
"""
called with e.g. 'eh_mot65'
"""
position_readback = Cpt(_ReadbackSignal, value=0, kind="hinted")
def __init__( self, name, parent=None, labels=None, kind=None, **kwargs ):
self.name = name
p = tango.DeviceProxy( name)
self.proxy = tango.DeviceProxy( p.TangoDevice)
super().__init__(name=name, parent=parent, labels=labels, kind=kind, **kwargs)
def set(self, value):
st = DeviceStatus(device=self)
def write_and_wait():
self.proxy.position = value
while self.proxy.state() != tango.DevState.ON:
ttime.sleep( 0.1)
st.set_finished()
threading.Thread(target=write_and_wait, daemon=True).start()
return st
@property
def position(self):
return self.proxy.position