Below you find the code of the docCallback function. It is called from the RunEngine
#!/usr/bin/env python3
from bluesky.callbacks import CallbackBase
from pprint import pprint
import PySpectra
from . import env
import PySpectra
from datetime import datetime
from io import StringIO
from pprint import pformat
import sys
import time
from warnings import warn
import numpy as np
from bluesky.callbacks import LiveTable
from bluesky.callbacks.mpl_plotting import QtAwareCallback
class docCallback(QtAwareCallback):
"""
derived from /usr/lib/python3/dist-packages/bluesky/callbacks/best_effort.py
"""
def __init__(self, *, fig_factory=None, table_enabled=True,
calc_derivative_and_stats=False, **kwargs):
super().__init__(**kwargs)
# internal state
self._start_doc = None
self._descriptors = {}
self._table = None
self._heading_enabled = True
self._table_enabled = table_enabled
self._baseline_enabled = True
self._plots_enabled = True
# axes supplied from outside
self._fig_factory = fig_factory
# maps descriptor uid to dict which maps data key to LivePlot instance
self._cleanup_motor_heuristic = False
self._stream_names_seen = set()
# public options
self.noplot_streams = ['baseline']
self.omit_single_point_plot = True
# hack to handle the bottom border of the table
self._buffer = StringIO()
self._baseline_toggle = True
def enable_heading(self):
"Print timestamp and IDs at the top of a run."
self._heading_enabled = True
def disable_heading(self):
"Opposite of enable_heading()"
self._heading_enabled = False
def enable_table(self):
"Print hinted readings from the 'primary' stream in a LiveTable."
self._table_enabled = True
def disable_table(self):
"Opposite of enable_table()"
self._table_enabled = False
def enable_baseline(self):
"Print hinted fields from the 'baseline' stream."
self._baseline_enabled = True
def disable_baseline(self):
"Opposite of enable_baseline()"
self._baseline_enabled = False
def enable_plots(self):
"Plot hinted fields from all streams not in “noplot_streams“."
self._plots_enabled = True
def disable_plots(self):
"Do not plot anything."
self._plots_enabled = False
def __call__(self, name, doc, *args, **kwargs):
if not (self._table_enabled or self._baseline_enabled or
self._plots_enabled):
return
super().__call__(name, doc, *args, **kwargs)
def start(self, doc):
print( "\n docHandler.start\n -----\n")
pprint( doc)
self.clear()
self._start_doc = doc
self.plan_hints = doc.get('hints', {})
# Prepare a guess about the dimensions (independent variables) in case
# we need it.
# motors ('eh_mot70',)
self.motors = self._start_doc.get('motors')
if self.motors is not None:
GUESS = [([motor], 'primary') for motor in self.motors]
else:
GUESS = [(['time'], 'primary')]
# Ues the guess if there is not hint about dimensions.
dimensions = self.plan_hints.get('dimensions')
# dimensions [(['eh_mot70_position_readback'], 'primary')]
if dimensions is None:
self._cleanup_motor_heuristic = True
dimensions = GUESS
# We can only cope with all the dimensions belonging to the same
# stream unless we resample. We are not doing to handle that yet.
if len(set(d[1] for d in dimensions)) != 1:
self._cleanup_motor_heuristic = True
dimensions = GUESS # Fall back on our GUESS.
warn("We are ignoring the dimensions hinted because we cannot "
"combine streams.")
# for each dimension, choose one field only
# the plan can supply a list of fields. It's assumed the first
# of the list is always the one plotted against
#
# dim_fields ['eh_mot70_position_readback']
self.dim_fields = [fields[0]
for fields, stream_name in dimensions]
# make distinction between flattened fields and plotted fields
# motivation for this is that when plotting, we find dependent variable
# by finding elements that are not independent variables
#
# all_dim_fields ['eh_mot70_position_readback']
self.all_dim_fields = [field
for fields, stream_name in dimensions
for field in fields]
_, self.dim_stream = dimensions[0]
# Print heading.
tt = datetime.fromtimestamp(self._start_doc['time']).utctimetuple()
if self._heading_enabled:
print("\n\nTransient Scan ID: {0} Time: {1}".format(
self._start_doc.get('scan_id', ”),
time.strftime("%Y-%m-%d %H:%M:%S", tt)))
print("Persistent Unique Scan ID: '{0}'".format(
self._start_doc['uid']))
def _set_up_plots(self, doc, stream_name):
"""Using the descriptor doc"""
plot_data = True
print( "+++docHandler.set_up_plots, stream %s, columns %s" % (repr( stream_name), repr( self.columns)))
if not self._plots_enabled:
plot_data = False
if stream_name in self.noplot_streams: # e.g. baseline
plot_data = False
if not self.columns:
plot_data = False
if ((self._start_doc.get('num_points') == 1) and
(stream_name == self.dim_stream) and
self.omit_single_point_plot):
plot_data = False
if not plot_data:
return
# This is a heuristic approach until we think of how to hint this in a
# generalizable way.
if stream_name == self.dim_stream:
dim_fields = self.dim_fields
else:
dim_fields = ['time'] # 'time' once LivePlot can do that
ndims = len(dim_fields)
if not 0 < ndims < 3:
# we need 1 or 2 dims to do anything, do not make empty figures
warn("Plots are only made for 1 or 2 dimensions. "
"Adjust the metadata hints field for BestEffortCallback to produce plots.")
return
if ndims != 1:
raise NotImplementedError( "docHandler.set_up_plot: ndim3 == %d" % ndims)
ret = PySpectra.toPyspViewer( { 'command': ["cls",
"delete"]})
if ret[ 'result'].upper() != 'DONE':
print( "*** error from pyspMonitor-1 %s" % ret[ 'result'])
self.x_key = None
return
xMin = self._start_doc.get('plan_args')[ 'args'][1]
xMax = self._start_doc.get('plan_args')[ 'args'][2]
self.num_points = self._start_doc.get('num_points')
self.plan_name = self._start_doc.get('plan_name')
ret = PySpectra.toPyspViewer( { 'command': ["setTitle \"%s %s %g %g %g\"" % \
(self.plan_name, \
repr( self._start_doc.get( 'motors')), \
xMin, \
xMax, \
env.getSampleTime())]})
if ret[ 'result'].upper() != 'DONE':
print( "*** error from pyspMonitor %s" % ret[ 'result'])
self.x_key = None
return
#
# dim_fiels ['eh_mot70_position_readback']
#
self.x_key, = dim_fields
#
# columns ['MG1_eh_c01', 'MG1_eh_c02', 'MG1_eh_c03']
#
for y_key in self.columns:
#print( "docHandler.set_up_plot: %s %s" % ( repr( self.x_key), repr( y_key)))
dtype = doc['data_keys'][y_key]['dtype']
if dtype not in ('number', 'integer'):
warn("Omitting {} from plot because dtype is {}"
"".format(y_key, dtype))
continue
#
# MCAs have a non-zeor shape
#
if len( doc[ 'data_keys'][ y_key]['shape']) != 0:
continue
ret = PySpectra.toPyspViewer( {'Scan': { 'name': y_key,
'xMin': xMin,
'xMax': xMax,
'xLabel': self.x_key,
'autoscaleX': False,
'nPts': self.num_points}})
PySpectra.toPyspViewer( { 'command': ['setCurrentIndex %s 0' % y_key]})
def descriptor(self, doc):
self._descriptors[doc['uid']] = doc
stream_name = doc.get('name', 'primary') # fall back for old docs
if stream_name not in self._stream_names_seen:
self._stream_names_seen.add(stream_name)
if self._table_enabled:
print("docHandler.New stream: {!r}".format(stream_name))
self.columns = hinted_fields( doc)
print( "+++docHandler.descriptor: columns %s" % repr( self.columns))
# ## This deals with old documents. ## #
if stream_name == 'primary' and self._cleanup_motor_heuristic:
# We stashed object names in self.dim_fields, which we now need to
# look up the actual fields for.
self._cleanup_motor_heuristic = False
fixed_dim_fields = []
for obj_name in self.dim_fields:
# Special case: 'time' can be a dim_field, but it's not an
# object name. Just add it directly to the list of fields.
if obj_name == 'time':
fixed_dim_fields.append('time')
continue
try:
fields = doc.get('hints', {}).get(obj_name, {})['fields']
except KeyError:
fields = doc['object_keys'][obj_name]
fixed_dim_fields.extend(fields)
self.dim_fields = fixed_dim_fields
# Ensure that no independent variables ('dimensions') are
# duplicated here.
self.columns = [c for c in self.columns if c not in self.all_dim_fields]
# ## DECIDE WHICH KIND OF PLOT CAN BE USED ## #
self._set_up_plots(doc, stream_name)
# ## TABLE ## #
if stream_name == self.dim_stream:
if self._table_enabled:
# plot everything, independent or dependent variables
self._table = LiveTable(list(self.all_dim_fields) + self.columns, separator_lines=False)
self._table('start', self._start_doc)
self._table('descriptor', doc)
def event(self, doc):
print( "\n docHandler.event\n -----\n")
pprint( doc)
descriptor = self._descriptors[doc['descriptor']]
if descriptor.get('name') == 'primary':
if self._table is not None:
self._table('event', doc)
# Show the baseline readings.
if descriptor.get('name') == 'baseline':
columns = hinted_fields(descriptor)
self._baseline_toggle = not self._baseline_toggle
if self._baseline_toggle:
file = self._buffer
subject = 'End-of-run'
else:
file = sys.stdout
subject = 'Start-of-run'
if self._baseline_enabled:
print('{} baseline readings:'.format(subject), file=file)
border = '+' + '-' * 32 + '+' + '-' * 32 + '+'
print(border, file=file)
for k, v in doc['data'].items():
if k not in columns:
continue
print('| {:>30} | {:<30} |'.format(k, v), file=file)
print(border, file=file)
return
#
# no pyspMonitor running
#
if self.x_key is None:
print( "+++docHandler.event: no pyspMonitor running")
return
for y_key in self.columns:
index = doc[ 'seq_num'] - 1
x = doc[ 'data'][ self.x_key]
y = doc[ 'data'][ y_key]
if hasattr( y, "__len__"): # list or nu,py arrays
x = np.linspace( 0, len( y) - 1, num = len( y))
y = np.array( y)
hsh = {'Scan': {'name': y_key, 'flagMCA': True, 'lineColor': 'blue',
'y': y,
'x': x}}
PySpectra.toPyspViewer( hsh)
else:
PySpectra.toPyspViewer( { 'command': ["setXY %s %d %g %g" % ( y_key, index, x, y)]})
PySpectra.toPyspViewer( { 'command': ["display"]})
return
def stop(self, doc):
if self._table is not None:
self._table('stop', doc)
if self._baseline_enabled:
# Print baseline below bottom border of table.
self._buffer.seek(0)
print(self._buffer.read())
print('\n')
def clear(self):
self._start_doc = None
self._descriptors.clear()
self._stream_names_seen.clear()
self._table = None
self._buffer = StringIO()
self._baseline_toggle = True
def hinted_fields(descriptor):
# Figure out which columns to put in the table.
obj_names = list(descriptor['object_keys'])
# We will see if these objects hint at whether
# a subset of their data keys ('fields') are interesting. If they
# did, we'll use those. If these didn't, we know that the RunEngine
# *always* records their complete list of fields, so we can use
# them all unselectively.
columns = []
for obj_name in obj_names:
try:
fields = descriptor.get('hints', {}).get(obj_name, {})['fields']
except KeyError:
fields = descriptor['object_keys'][obj_name]
columns.extend(fields)
return columns