docCallback

Below you find the code of the docCallback function. It is called from the RunEngine

#!/usr/bin/env python3

from bluesky.callbacks import CallbackBase
from pprint import pprint
import PySpectra
from . import env
import PySpectra

from datetime import datetime
from io import StringIO
from pprint import pformat
import sys
import time
from warnings import warn
import numpy as np

from bluesky.callbacks import LiveTable
from bluesky.callbacks.mpl_plotting import QtAwareCallback

class docCallback(QtAwareCallback):
    """
    derived from /usr/lib/python3/dist-packages/bluesky/callbacks/best_effort.py
    """
    def __init__(self, *, fig_factory=None, table_enabled=True,
                 calc_derivative_and_stats=False, **kwargs):
        super().__init__(**kwargs)
        # internal state
        self._start_doc = None
        self._descriptors = {}
        self._table = None
        self._heading_enabled = True
        self._table_enabled = table_enabled
        self._baseline_enabled = True
        self._plots_enabled = True
        # axes supplied from outside
        self._fig_factory = fig_factory
        # maps descriptor uid to dict which maps data key to LivePlot instance
        self._cleanup_motor_heuristic = False
        self._stream_names_seen = set()

        # public options
        self.noplot_streams = ['baseline']
        self.omit_single_point_plot = True

        # hack to handle the bottom border of the table
        self._buffer = StringIO()
        self._baseline_toggle = True

    def enable_heading(self):
        "Print timestamp and IDs at the top of a run."
        self._heading_enabled = True

    def disable_heading(self):
        "Opposite of enable_heading()"
        self._heading_enabled = False

    def enable_table(self):
        "Print hinted readings from the 'primary' stream in a LiveTable."
        self._table_enabled = True

    def disable_table(self):
        "Opposite of enable_table()"
        self._table_enabled = False

    def enable_baseline(self):
        "Print hinted fields from the 'baseline' stream."
        self._baseline_enabled = True

    def disable_baseline(self):
        "Opposite of enable_baseline()"
        self._baseline_enabled = False

    def enable_plots(self):
        "Plot hinted fields from all streams not in “noplot_streams“."
        self._plots_enabled = True

    def disable_plots(self):
        "Do not plot anything."
        self._plots_enabled = False

    def __call__(self, name, doc, *args, **kwargs):
        if not (self._table_enabled or self._baseline_enabled or
                self._plots_enabled):
            return
        super().__call__(name, doc, *args, **kwargs)

    def start(self, doc):
        print( "\n docHandler.start\n -----\n") 
        pprint( doc)
        self.clear()
        self._start_doc = doc
        self.plan_hints = doc.get('hints', {})

        # Prepare a guess about the dimensions (independent variables) in case
        # we need it.
        # motors ('eh_mot70',) 
        self.motors = self._start_doc.get('motors')
        if self.motors is not None:
            GUESS = [([motor], 'primary') for motor in self.motors]
        else:
            GUESS = [(['time'], 'primary')]

        # Ues the guess if there is not hint about dimensions.
        dimensions = self.plan_hints.get('dimensions')
        # dimensions [(['eh_mot70_position_readback'], 'primary')]
        if dimensions is None:
            self._cleanup_motor_heuristic = True
            dimensions = GUESS

        # We can only cope with all the dimensions belonging to the same
        # stream unless we resample. We are not doing to handle that yet.
        if len(set(d[1] for d in dimensions)) != 1:
            self._cleanup_motor_heuristic = True
            dimensions = GUESS  # Fall back on our GUESS.
            warn("We are ignoring the dimensions hinted because we cannot "
                 "combine streams.")

        # for each dimension, choose one field only
        # the plan can supply a list of fields. It's assumed the first
        # of the list is always the one plotted against
        #
        # dim_fields ['eh_mot70_position_readback']
        self.dim_fields = [fields[0]
                           for fields, stream_name in dimensions]

        # make distinction between flattened fields and plotted fields
        # motivation for this is that when plotting, we find dependent variable
        # by finding elements that are not independent variables
        #
        # all_dim_fields ['eh_mot70_position_readback']
        self.all_dim_fields = [field
                               for fields, stream_name in dimensions
                               for field in fields]

        _, self.dim_stream = dimensions[0]

        # Print heading.
        tt = datetime.fromtimestamp(self._start_doc['time']).utctimetuple()
        if self._heading_enabled:
            print("\n\nTransient Scan ID: {0}     Time: {1}".format(
                self._start_doc.get('scan_id', ”),
                time.strftime("%Y-%m-%d %H:%M:%S", tt)))
            print("Persistent Unique Scan ID: '{0}'".format(
                self._start_doc['uid']))

    def _set_up_plots(self, doc, stream_name):
        """Using the descriptor doc"""
        plot_data = True

        print( "+++docHandler.set_up_plots, stream %s, columns %s" % (repr( stream_name), repr( self.columns)))
        
        if not self._plots_enabled:
            plot_data = False
        if stream_name in self.noplot_streams: # e.g. baseline
            plot_data = False
        if not self.columns:
            plot_data = False
        if ((self._start_doc.get('num_points') == 1) and
                (stream_name == self.dim_stream) and
                self.omit_single_point_plot):
            plot_data = False

        if not plot_data:
            return 

        # This is a heuristic approach until we think of how to hint this in a
        # generalizable way.
        if stream_name == self.dim_stream:
            dim_fields = self.dim_fields
        else:
            dim_fields = ['time']  # 'time' once LivePlot can do that

        ndims = len(dim_fields)
        if not 0 < ndims < 3:
            # we need 1 or 2 dims to do anything, do not make empty figures
            warn("Plots are only made for 1 or 2 dimensions. "
                 "Adjust the metadata hints field for BestEffortCallback to produce plots.")
            return

        if ndims != 1:
            raise NotImplementedError( "docHandler.set_up_plot: ndim3 == %d" % ndims)

        ret = PySpectra.toPyspViewer( { 'command': ["cls", 
                                                     "delete"]})
        if ret[ 'result'].upper() != 'DONE':
            print( "*** error from pyspMonitor-1 %s" % ret[ 'result'])
            self.x_key = None
            return

        xMin = self._start_doc.get('plan_args')[ 'args'][1]
        xMax = self._start_doc.get('plan_args')[ 'args'][2]
        self.num_points = self._start_doc.get('num_points')
        self.plan_name = self._start_doc.get('plan_name')
        ret = PySpectra.toPyspViewer( { 'command': ["setTitle \"%s %s %g %g %g\"" % \
                                                    (self.plan_name, \
                                                     repr( self._start_doc.get( 'motors')), \
                                                     xMin, \
                                                     xMax, \
                                                     env.getSampleTime())]})
        if ret[ 'result'].upper() != 'DONE':
            print( "*** error from pyspMonitor %s" % ret[ 'result'])
            self.x_key = None
            return
        #
        # dim_fiels ['eh_mot70_position_readback']
        #
        self.x_key, = dim_fields
        #
        # columns ['MG1_eh_c01', 'MG1_eh_c02', 'MG1_eh_c03']
        #
        for y_key in self.columns:
            #print( "docHandler.set_up_plot: %s %s" % ( repr( self.x_key), repr( y_key)))
            dtype = doc['data_keys'][y_key]['dtype']
            if dtype not in ('number', 'integer'):
                warn("Omitting {} from plot because dtype is {}"
                     "".format(y_key, dtype))
                continue
            #
            # MCAs have a non-zeor shape
            #
            if len( doc[ 'data_keys'][ y_key]['shape']) != 0: 
                continue
            ret = PySpectra.toPyspViewer( {'Scan': { 'name': y_key,
                                                      'xMin': xMin, 
                                                      'xMax': xMax,
                                                      'xLabel': self.x_key,
                                                      'autoscaleX': False,
                                                      'nPts': self.num_points}})
            PySpectra.toPyspViewer( { 'command': ['setCurrentIndex %s 0' % y_key]})


    def descriptor(self, doc):
        self._descriptors[doc['uid']] = doc
        stream_name = doc.get('name', 'primary')  # fall back for old docs

        if stream_name not in self._stream_names_seen:
            self._stream_names_seen.add(stream_name)
            if self._table_enabled:
                print("docHandler.New stream: {!r}".format(stream_name))

        self.columns = hinted_fields( doc)
    
        print( "+++docHandler.descriptor: columns %s" % repr( self.columns))
        # ## This deals with old documents. ## #
        if stream_name == 'primary' and self._cleanup_motor_heuristic:
            # We stashed object names in self.dim_fields, which we now need to
            # look up the actual fields for.
            self._cleanup_motor_heuristic = False
            fixed_dim_fields = []
            for obj_name in self.dim_fields:
                # Special case: 'time' can be a dim_field, but it's not an
                # object name. Just add it directly to the list of fields.
                if obj_name == 'time':
                    fixed_dim_fields.append('time')
                    continue
                try:
                    fields = doc.get('hints', {}).get(obj_name, {})['fields']
                except KeyError:
                    fields = doc['object_keys'][obj_name]
                fixed_dim_fields.extend(fields)
            self.dim_fields = fixed_dim_fields

        # Ensure that no independent variables ('dimensions') are
        # duplicated here.
        self.columns = [c for c in self.columns if c not in self.all_dim_fields]

        # ## DECIDE WHICH KIND OF PLOT CAN BE USED ## #
        self._set_up_plots(doc, stream_name)

        # ## TABLE ## #
        if stream_name == self.dim_stream:
            if self._table_enabled:
                # plot everything, independent or dependent variables
                self._table = LiveTable(list(self.all_dim_fields) + self.columns, separator_lines=False)
                self._table('start', self._start_doc)
                self._table('descriptor', doc)

    def event(self, doc):
        print( "\n docHandler.event\n -----\n") 
        pprint( doc)
        descriptor = self._descriptors[doc['descriptor']]
        if descriptor.get('name') == 'primary':
            if self._table is not None:
                self._table('event', doc)

        # Show the baseline readings.
        if descriptor.get('name') == 'baseline':
            columns = hinted_fields(descriptor)
            self._baseline_toggle = not self._baseline_toggle
            if self._baseline_toggle:
                file = self._buffer
                subject = 'End-of-run'
            else:
                file = sys.stdout
                subject = 'Start-of-run'
            if self._baseline_enabled:
                print('{} baseline readings:'.format(subject), file=file)
                border = '+' + '-' * 32 + '+' + '-' * 32 + '+'
                print(border, file=file)
                for k, v in doc['data'].items():
                    if k not in columns:
                        continue
                    print('| {:>30} | {:<30} |'.format(k, v), file=file)
                print(border, file=file)
            return
        #
        # no pyspMonitor running
        #
        if self.x_key is None: 
            print( "+++docHandler.event: no pyspMonitor running") 
            return 

        for y_key in self.columns:
            index = doc[ 'seq_num'] - 1
            x = doc[ 'data'][ self.x_key]
            y = doc[ 'data'][ y_key]
            if hasattr( y, "__len__"): # list or nu,py arrays
                x = np.linspace( 0, len( y) - 1, num = len( y))
                y = np.array( y)
                hsh = {'Scan': {'name': y_key, 'flagMCA': True, 'lineColor': 'blue', 
                                'y': y,
                                'x': x}} 
                PySpectra.toPyspViewer( hsh)
            else: 
                PySpectra.toPyspViewer( { 'command': ["setXY %s %d %g %g" % ( y_key, index, x, y)]})
        PySpectra.toPyspViewer( { 'command': ["display"]})

        return 

    def stop(self, doc):
        if self._table is not None:
            self._table('stop', doc)

        if self._baseline_enabled:
            # Print baseline below bottom border of table.
            self._buffer.seek(0)
            print(self._buffer.read())
            print('\n')

    def clear(self):
        self._start_doc = None
        self._descriptors.clear()
        self._stream_names_seen.clear()
        self._table = None
        self._buffer = StringIO()
        self._baseline_toggle = True

def hinted_fields(descriptor):
    # Figure out which columns to put in the table.
    obj_names = list(descriptor['object_keys'])
    # We will see if these objects hint at whether
    # a subset of their data keys ('fields') are interesting. If they
    # did, we'll use those. If these didn't, we know that the RunEngine
    # *always* records their complete list of fields, so we can use
    # them all unselectively.
    columns = []
    for obj_name in obj_names:
        try:
            fields = descriptor.get('hints', {}).get(obj_name, {})['fields']
        except KeyError:
            fields = descriptor['object_keys'][obj_name]
        columns.extend(fields)
    
    return columns