#!/usr/bin/env python3
from ophyd import Signal, Device, Component as Cpt
from ophyd import DeviceStatus
import threading
import random
class RandDet(Device):
val = Cpt(Signal, name="val", kind="normal")
param = Cpt(Signal, name = "param",kind = "config")
param_ignore = Cpt(Signal, name = "param_ignore", kind = "omitted")
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Use this lock to ensure that we only process one "trigger" at a time.
# Generally bluesky should care of this, so this is just an extra
# precaution.
self._acquiring_lock = threading.Lock()
def _capture(self, status):
"This runs on a background thread."
try:
if not self._acquiring_lock.acquire(timeout=0):
raise RuntimeError("Cannot trigger, currently triggering!")
self.val.set(random.random())
except Exception as exc:
status.set_exception(exc)
else:
status.set_finished()
finally:
self._acquiring_lock.release()
def trigger(self):
status = DeviceStatus(self)
# Start a background thread to capture an image and write it to disk.
thread = threading.Thread(target=self._capture, args=(status,))
thread.start()
# Promptly return a status object, which will be marked "done" when the
# capture completes.
return status
def main():
rand1 = RandDet("",name="rand1")
print( "%s" % repr( rand1.read_attrs))
print( "%s" % repr( rand1.configuration_attrs))
status = rand1.trigger()
print( "%s" % repr( status))
status.wait()
print( "After wait\n %s" % repr( status))
print( "%s" % repr( rand1.read()))
return
if __name__ == "__main__":
main()
#
# ['val']
# ['param']
# DeviceStatus(device=rand1, done=False, success=False)
# After wait
# DeviceStatus(device=rand1, done=True, success=True)
# OrderedDict([('rand1_val', {'value': 0.7883604027465972, 'timestamp': 1676908053.312611})])