#!/usr/bin/env python3 from ophyd import Signal, Device, Component as Cpt from ophyd import DeviceStatus import threading import random class RandDet(Device): val = Cpt(Signal, name="val", kind="normal") param = Cpt(Signal, name = "param",kind = "config") param_ignore = Cpt(Signal, name = "param_ignore", kind = "omitted") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Use this lock to ensure that we only process one "trigger" at a time. # Generally bluesky should care of this, so this is just an extra # precaution. self._acquiring_lock = threading.Lock() def _capture(self, status): "This runs on a background thread." try: if not self._acquiring_lock.acquire(timeout=0): raise RuntimeError("Cannot trigger, currently triggering!") self.val.set(random.random()) except Exception as exc: status.set_exception(exc) else: status.set_finished() finally: self._acquiring_lock.release() def trigger(self): status = DeviceStatus(self) # Start a background thread to capture an image and write it to disk. thread = threading.Thread(target=self._capture, args=(status,)) thread.start() # Promptly return a status object, which will be marked "done" when the # capture completes. return status def main(): rand1 = RandDet("",name="rand1") print( "%s" % repr( rand1.read_attrs)) print( "%s" % repr( rand1.configuration_attrs)) status = rand1.trigger() print( "%s" % repr( status)) status.wait() print( "After wait\n %s" % repr( status)) print( "%s" % repr( rand1.read())) return if __name__ == "__main__": main() # # ['val'] # ['param'] # DeviceStatus(device=rand1, done=False, success=False) # After wait # DeviceStatus(device=rand1, done=True, success=True) # OrderedDict([('rand1_val', {'value': 0.7883604027465972, 'timestamp': 1676908053.312611})])