Sweep Scans (p02)

#!/bin/env python

"""sweep scan p02"""

__all__ = ["sweep_p02","sweep_senv"]

import PyTango
from sardana.macroserver.macro import *
from sardana.macroserver.macro import macro
import json
import time
import pytz
import datetime
import numpy
import math

angleOffsets = {
"polar_angle_channel_1" : 0., 
"polar_angle_channel_2" : 1., 
"polar_angle_channel_3" : 2., 
"polar_angle_channel_4" : 3., 
"polar_angle_channel_5" : 4., 
"polar_angle_channel_6" : 5., 
"polar_angle_channel_7" : 6., 
"polar_angle_channel_8" : 7., 
"polar_angle_channel_9" : 8., 
"polar_angle_channel_10" : 9.
} 


class sweep_p02(Macro):
    """Perfoms a sweep scan. Counters are read during the sweep (they are always active) """


    param_def = [
       ['motor',           Type.Motor,      None, 'Sweep motor'],
       ['sweep_start_pos', Type.Float,      None, 'Sweep start position'],
       ['sweep_distance',  Type.Float,      None, 'Sweep distance'],
       ['sweep_offset',    Type.Float,      None, 'Sweep offset'],
       ['nb_intervals',    Type.Integer,    None, 'Sweep number of intervals'],
       ['sample_time',     Type.Float,      None, 'Sweep sample time']
    ]

    def run(self, motor, sweep_start_pos, sweep_distance, sweep_offset, nb_intervals, sample_time):

        sweep_counters_prefix = self.getEnv('SweepCountersPrefix')
        sweep_counters = []
        for i in range(0,9):
            counter_name = sweep_counters_prefix + "0" + str(i+1)
            sweep_counters.append(counter_name)
        counter_name = sweep_counters_prefix + "10"
        sweep_counters.append(counter_name)
        counter_name = sweep_counters_prefix + "32"
        clock_name = counter_name
        sweep_counters.append(counter_name)

        db = PyTango.Database()

        hshCounters = {}
        for counter in sweep_counters:
                hshCounters[counter] = {}
                hshCounters[counter]['name'] = counter
                full_sardana_name = db.get_device_alias(counter)
                # The properties can not be found using the alias name
                b = db.get_device_property( full_sardana_name, ['__SubDevices'])
                dName = b['__SubDevices'][0]
                # The device to the Tango device and not to the Sardana device is need for 
                # writing in the Counts attribute
                hshCounters[counter]['proxy'] = PyTango.DeviceProxy(dName)
                

        nexusconfig_device_name = self.getEnv('NeXusConfigDevice')   # class XMLConfigServer
        nexuswriter_device_name = self.getEnv('NeXusWriterDevice') # class TangoDataServer

        self.nexusconfig_device = PyTango.DeviceProxy(nexusconfig_device_name)
        self.nexuswriter_device = PyTango.DeviceProxy(nexuswriter_device_name)


        motor_device   = PyTango.DeviceProxy(motor.name)

        # For the timer one has to use the corresponding tango device and not the device in the pool
        # because it can not be started outside a measurement group
        
        timer_device_name = self.getEnv('SweepTimerTangoDevice')
        timer_device   = PyTango.DeviceProxy(timer_device_name)

    
        slew_orig = motor_device.Velocity


        self.sweep_motor = motor.name
        self.sweep_start = sweep_start_pos
        self.sweep_distance = sweep_distance
        self.sweep_offset = sweep_offset
        self.nb_intervals = nb_intervals
        self.sample_time = sample_time

       # Compute sweep velocity

        sweep_time = nb_intervals*sample_time
        sweep_full_distance = sweep_distance + 2*sweep_offset
        move_time_orig = math.fabs( (sweep_full_distance *  motor_device.Step_per_unit)/slew_orig)
        self.output("sweep_time %g move_time_orig %g " % (sweep_time, move_time_orig))

        slew_sweep = int(sweep_full_distance/sweep_time)

        if slew_sweep > slew_orig or slew_sweep < motor_device.Base_rate:
            raise ValueError("slew_sweep %d out of range [%d,%d]" % (slew_sweep, slew_orig, motor_device.Base_rate))

  
        self._openNxFile()
        self._openNxEntry( "ten_channel_detector")
         
    
        #
        # Move the motor to the start position
        #

        self.output("p02sweep: moving %s to start position %g" % (motor.name, (sweep_start_pos - sweep_offset)))
        
        motor_device.write_attribute( "position", (sweep_start_pos - sweep_offset))

        while motor_device.State() == PyTango.DevState.MOVING:
            time.sleep(1)

        #
        # Reset the counters
        #

        for cnt in hshCounters.keys():
            hshCounters[cnt]['proxy'].Counts = 0

        #
        # Start the timer
        #
 
        timer_device.SampleTime = 2000
        timer_device.Start()
        time_start_timer = time.time()      
   
        #
        # Start the sweep
        #

        sweep_final_pos = sweep_start_pos + sweep_distance + sweep_offset

        self.output("Sweeping from %g to %g" % (motor_device.position, sweep_final_pos))

        motor_device.write_attribute( "position", sweep_final_pos)

        time_overhead = 0.
        hshCounterReadings = {}
        for cnt in hshCounters.keys():
            hshCounterReadings[cnt] = {}
            hshCounterReadings[cnt]['counts'] = 0
            hshCounterReadings[cnt]['counts_old'] = 0

        pos_old = motor_device.position

        hshMain = {}
        hshRecord = {}
        hshMain['data'] = hshRecord
        
        count = 1
        
        time_cycle_start = time.time()
        time_total_start = time.time()

        while( motor_device.state() == PyTango.DevState.MOVING):
            if sample_time > time_overhead:
                time.sleep(sample_time - time_overhead)
            time_overhead_start = time.time()
            pos_before = motor_device.position
            #
            # read all counters
            #
            for cnt in hshCounters.keys():
                self.output(cnt)
                hshCounterReadings[cnt]['counts'] = hshCounters[cnt]['proxy'].Counts
                self.output(hshCounterReadings[cnt]['counts'])
            #
            # clear the clock because we might run out of 32 bit
            #
            hshCounters[clock_name]['proxy'].Counts = 0
            hshCounterReadings[clock_name]['counts_old'] = 0 
            #
            # the position is the mean value - before and after the counter reading
            #
            pos = (pos_before + motor_device.position)/2.
            
            if hshCounterReadings[clock_name]['counts'] == hshCounterReadings[clock_name]['counts_old']:
                self.output("No clock counts")
                break

            corr = 1000000.*sample_time/(hshCounterReadings[clock_name]["counts"] -  hshCounterReadings[clock_name]["counts_old"])

            for mot in sorted( angleOffsets.keys()):
                hshRecord[mot] = pos + angleOffsets[mot]
        
            for cnt in sorted(hshCounters.keys()):
                temp = corr*(hshCounterReadings[cnt]["counts"] - hshCounterReadings[cnt]["counts_old"])
                if temp < 0:
                    temp = 0
                hshRecord[hshCounters[cnt]['name']] = temp
                hshCounterReadings[cnt]['counts_old'] = hshCounterReadings[cnt]['counts'] 

            hshRecord['correction'] = corr
            hshRecord['delta_position'] = pos - pos_old
        
            pos_old = pos

            self._sendRecordToNxWriter( hshMain)

            time_cycle = time.time() - time_cycle_start
            time_cycle_start = time.time()
            self.output("%d Ovrhd %g, timeCycle %g, corr %g" % (count, time_overhead, time_cycle, corr))
            
            count += 1
            if (count % 20) == 0:
                self.output(" %d motor at %g " % (count, motor_device.position))
            
            #
            # make sure that the timer does not expire
            #
            if (time.time() - time_start_timer) > 1900:
                timer_device.Stop()
                timer_device.SampleTime = 2000
                timer_device.Start()
                time_start_timer = time.time()

            time_overhead = time.time() - time_overhead_start

        self.output("sweep ended, total time %gs (%gs) " % ((time.time() - time_total_start), nb_intervals*sample_time))

        self._closeNxEntry()
        self._closeNxFile()

    def _openNxFile(self):
        sweep_id = self.getEnv('SweepID')
        fileNameNx = self.getEnv('SweepFileName') + "_" + str(sweep_id) + ".h5"
        self.setEnv("SweepID", sweep_id + 1)
        self.nexuswriter_device.Init()
        self.output(fileNameNx)
        self.nexuswriter_device.FileName = str(fileNameNx)
        self.nexuswriter_device.OpenFile()
        return 1

    def _openNxEntry(self, componentName):
        
        self._sendGlobalDictionaryBefore()
        
        self.nexusconfig_device.Open()

        cmps = self.nexusconfig_device.AvailableComponents()
        cmp_list = []
        if componentName not in cmps:
            self.output("_openNxEntry: %s not in configuration server", componentName)
        else:
            cmp_list.append(componentName)


        self.nexusconfig_device.CreateConfiguration(cmp_list)
        xmlconfig = self.nexusconfig_device.XMLString
        self.nexuswriter_device.XMLSettings = str(xmlconfig)
        try:
            self.nexuswriter_device.OpenEntry()
        except:
            pass

        return 1


    def _closeNxFile(self):
        self.nexuswriter_device.CloseFile()
        return 1


    def _closeNxEntry(self):
        self._sendGlobalDictionaryAfter()
        self.nexuswriter_device.CloseEntry()
        return 1

    def _sendGlobalDictionaryBefore(self):
        hsh = {}
        hshSub = {}
        hshSub['sweep_motor'] =  self.sweep_motor
        hshSub['sweep_start'] = self.sweep_start
        hshSub['sweep_distance'] = self.sweep_distance
        hshSub['sweep_offset'] = self.sweep_offset
        hshSub['interval'] = self.nb_intervals
        hshSub['sample_time'] = self.sample_time
        amsterdam = pytz.timezone('Europe/Amsterdam')
        fmt = '%Y-%m-%dT%H:%M:%S.%f%z'
        starttime = amsterdam.localize(datetime.datetime.now())
        hshSub['start_time'] = str(starttime.strftime(fmt))
        try:
            title = self.getEnv('SweepTitle')
        except:
            title = ""
        hshSub['title'] = title
        try:
            sample_name = self.getEnv('SweepSampleName')
        except:
            sample_name = ""
        hshSub['sample_name'] = sample_name
        try:
            chemical_formula = self.getEnv('SweepChemicalFormula')
        except:
            chemical_formula = ""
        hshSub['chemical_formula'] = chemical_formula
        try:
            beamtime_id = self.getEnv('SweepBeamtimeId')
        except:
            beamtime_id = ""
        hshSub['beamtime_id'] = beamtime_id
        hsh['data'] = hshSub
        self._setParameter( hsh)
        return 1

    def _sendGlobalDictionaryAfter(self):
        hsh = {}
        hshSub = {}
        amsterdam = pytz.timezone('Europe/Amsterdam')
        fmt = '%Y-%m-%dT%H:%M:%S.%f%z'
        starttime = amsterdam.localize(datetime.datetime.now())
        hshSub['end_time'] = str(starttime.strftime(fmt))
        hshSub['comments'] = "some comment"
        hsh['data'] = hshSub
        self._setParameter( hsh)
        return 1

    def _setParameter(self, hsh):
        jsondata = json.dumps( hsh)
        self.nexuswriter_device.JSONRecord = str(jsondata)
        return 1

    def _sendRecordToNxWriter(self, hsh):
        mstr = json.dumps( hsh)
        self.nexuswriter_device.Record(mstr)
        return 1

 

class sweep_senv(Macro):
    """ Sets default environment variables """

    def run(self):
        file_name = "/home/tnunez/sweep_files/test_sweep_output"
        self.setEnv("SweepFileName", file_name)
        self.output("Setting SweepFileName to %s" % file_name)
        nexus_config_device = "haso111tb:10000/test/xmlconfigserver/01"
        self.setEnv("NeXusConfigDevice",  nexus_config_device)
        self.output("Setting NeXusConfigDevice to  %s" % nexus_config_device)
        nexus_writer_device = "haso111tb:10000/test/tangodataserver/01"
        self.setEnv("NeXusWriterDevice", nexus_writer_device)
        self.output("Setting NeXusWriterDevice to %s" % nexus_writer_device)
        self.setEnv("SweepID", 0)
        self.output("Setting SweepID to 0")
        sweep_counters_prefix = "exp_c"
        self.setEnv("SweepCountersPrefix",  sweep_counters_prefix)
        self.output("Setting SweepCountersPrefix to %s" % sweep_counters_prefix)
        sweep_timer_tango_device = "haso111tb:10000/p09/dgg2/exp.01"
        self.setEnv("SweepTimerTangoDevice", sweep_timer_tango_device)
        self.output("Setting SweepTimerTangoDevice to %s" % sweep_timer_tango_device)

Notice that mot1.write_attribute( "position", target) has been used to move the motor. We use this syntax and not mot1.position = target because the latter form was partially not supported.



2019-11-13