class _ReadbackSignal(Signal): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._metadata.update( connected=True, write_access=False,) def get(self): self._readback = self.parent.proxy.position return self._readback def describe(self): res = super().describe() return res @property def timestamp(self): """Timestamp of the readback value""" return ttime.time() #return self.parent.sim_state["readback_ts"] def put(self, value, *, timestamp=None, force=False): raise ReadOnlyError("The signal {} is readonly.".format(self.name)) def set(self, value, *, timestamp=None, force=False): raise ReadOnlyError("The signal {} is readonly.".format(self.name)) class motorTango(Device): """ called with e.g. 'eh_mot65' """ position_readback = Cpt(_ReadbackSignal, value=0, kind="hinted") def __init__( self, name, parent=None, labels=None, kind=None, **kwargs ): self.name = name p = tango.DeviceProxy( name) self.proxy = tango.DeviceProxy( p.TangoDevice) super().__init__(name=name, parent=parent, labels=labels, kind=kind, **kwargs) def set(self, value): st = DeviceStatus(device=self) def write_and_wait(): self.proxy.position = value while self.proxy.state() != tango.DevState.ON: ttime.sleep( 0.1) st.set_finished() threading.Thread(target=write_and_wait, daemon=True).start() return st @property def position(self): return self.proxy.position