Plans can be a mixture of “stubs” (simple plans) and pre-assembled plans.
The following plan executes 3 events in 2 runs (aka scans).
#!/usr/bin/env python3
"""
ipython3 --matplotlib=qt5 planEventsRuns.py
multi_runs_multi_events is a plan grouping 3 events into 2 runs
(runs have a unique ScanId, correspond to scans
"""
from bluesky.plans import scan
import blueskyDESY
from bluesky import RunEngine
from bluesky.callbacks.best_effort import BestEffortCallback
import bluesky.plan_stubs as bps
def one_run_multi_events(detectors, num):
yield from bps.open_run()
for i in range(num):
yield from bps.trigger_and_read(detectors)
yield from bps.close_run()
def multi_runs_multi_events(detectors, num, num_runs):
for i in range(num_runs):
yield from one_run_multi_events(detectors, num)
def main():
bec = BestEffortCallback()
AMG = blueskyDESY.getActiveMntGrp()
print( "testMG.main: AMG %s " % repr( AMG))
mg = blueskyDESY.Experiment( read_attrs = AMG[ 'counters'])
eh_mot65 = blueskyDESY.motorTango( name = 'eh_mot65')
RE = RunEngine()
#
# matplotlib and printout
#
RE.subscribe( bec)
RE(multi_runs_multi_events([mg], num = 3, num_runs = 2))
return
if __name__ == "__main__":
main()
The following plan demonstrates the usage of preprocessor decorators:
#!/usr/bin/env python3
"""
ipython3 --matplotlib=qt5 customPlan.py
"""
from bluesky.plans import scan
import blueskyDESY
from bluesky import RunEngine
from bluesky.callbacks.best_effort import BestEffortCallback
import bluesky.plan_stubs as bps
import bluesky.preprocessors as bpp
def one_run_one_event(detectors):
# 'Stage' every device.
for det in detectors:
yield from bps.stage(det)
yield from bps.open_run()
yield from bps.trigger_and_read(detectors)
yield from bps.close_run()
# 'Unstage' every device.
for det in detectors:
yield from bps.unstage(det)
def one_run_one_eventPrePro(detectors):
@bpp.stage_decorator(detectors)
@bpp.run_decorator()
def inner():
yield from bps.trigger_and_read(detectors)
return (yield from inner())
def main():
bec = BestEffortCallback()
AMG = blueskyDESY.getActiveMntGrp()
print( "testMG.main: AMG %s " % repr( AMG))
mg = blueskyDESY.Experiment( read_attrs = AMG[ 'counters'])
eh_mot65 = blueskyDESY.motorTango( name = 'eh_mot65')
RE = RunEngine()
#
# matplotlib and printout
#
RE.subscribe( bec)
print( "without prepro")
RE( one_run_one_event([mg]))
print( "with prepro")
RE( one_run_one_eventPrePro([mg]))
return
if __name__ == "__main__":
main()