Using the Selector Server with Sardana in a user scan macro

It is much easier to use advantages of Sardana and write NeXus files with NXS_FileRecorder using scan macros like ascan, dscan or extend them if it is needed to add new features or write a simple Sardana scan macro, e.g.

#!/bin/env python
""" NeXus recorder macros """

import time
import os
import datetime

from sardana.macroserver.macro import Macro, Type
from sardana.macroserver.msexception import UnknownEnv
from sardana.macroserver.scan.recorder import DataHandler
from sardana.macroserver.scan.scandata import \
    ScanData, ScanDataEnvironment, ColumnDesc


# test user data of ch1, ch2 and ch3 channels
data = {
    'ch1': [10., 23.4, 12., 234., 16.],
    'ch2': [20., 21., 22., 23., 24.],
    'ch3': [32., 33., 34., 35., 36.]
}


class nxsscan(Macro):
    """ Two motor scan with a direct Recorder usage
    """

    param_def = [
        ['motor1', Type.String, ”, 'first motor name'],
        ['motor2', Type.String, ”, 'second motor name'],
    ]

    def prepare(self, motor1, motor2):

        # get output file name from the environment
        scan_dir = self.getEnv("ScanDir")
        scan_file = self.getEnv("ScanFile")
        if isinstance(scan_file, list):
            scan_file = scan_file[0]
        file_name = os.path.join(scan_dir, scan_file)

        # construct NXS_FileRecorder
        rec_manager = self.getMacroServer().recorder_manager
        nx_recorder = rec_manager.getRecorderClass(
             "NXS_FileRecorder")(file_name, macro=self)

        # construct data handler
        data_handler = DataHandler()
        data_handler.addRecorder(nx_recorder)

        # increase the scanID
        try:
            serialno = self.getEnv("ScanID") + 1
        except UnknownEnv:
            serialno = 1
        self.setEnv("ScanID", serialno)

        # prepare scan data environment
        env = ScanDataEnvironment(
            {'serialno': self.getEnv("ScanID"),
             'title': "NeXus scan example"}
        )

        env['ScanDir'] = scan_dir
        env['ScanFile'] = scan_file
        env['starttime'] = datetime.datetime.fromtimestamp(time.time())

        # add motors
        ref_moveables = []
        if motor1:
            ref_moveables.append(motor1)
        if motor2:
            ref_moveables.append(motor2)
        env['ref_moveables'] = ref_moveables

        # add channel description
        data_desc = []
        data_desc.append(
            ColumnDesc(name='point_nb', label='#Pt No', dtype='int64'))
        data_desc.append(
            ColumnDesc(name='timestamp', label='dt', dtype='float64'))

        # add user channels
        for i, col in enumerate(data.keys()):
            data_desc.append(
                ColumnDesc(name=col, label=col, dtype='float64'))

        env['datadesc'] = data_desc
        self._env = env

        # construct ScanData object
        self._data = ScanData(environment=env,
                              data_handler=data_handler,
                              apply_interpolation=False)
        self._data.initial_data = {}

    def run(self, motor1, motor2):
        #  write data in the INIT mode
        self._data.start()

        # STEP scan loop
        for i in range(5):
            self.output("Point No: %s" % self._data.recordno)
            self._data.initial_data[i] = {"timestamp": time.time()}
            for name, v in data.items():
                self._data.addData(
                    {"index": [i], "label": name, "value": [v[i]]})

                time.sleep(0.1)

        # write data in the FINAL mode
        self._env['endtime'] = datetime.datetime.fromtimestamp(time.time())
        self._data.end()

The above macro works for sardana $>=$ 3.0 (debian 10).