Absorber scans using general hooks and conditions

The following code in an implementation of absorber scans using general hooks and conditions. The procedure is controlled by the environment variables SignalCounter and absInfo and by the hooks selector absorber.

#!/usr/bin/env python
"""
The general hooks and condition function for controlling an absorber. 
The output of the SignalCounter has to be between countsMin and countsMax. 
Otherwise the absorber is moved by delta, in the range between absMin and absMax.

Environment variables: 
  SignalCounter
  absInfo, e.g.: senv absInfo 'absorber:d1_mot01,absDelta:0.1,absMin:0,absMax:10,countsMin:1,countsMax:200'

The file is imported by handleGeneralFunctions.py
"""

import PyTango
import HasyUtils

def prepare_absorber_scan( pmacro):
    #
    # this index is incremented during the scan
    #
    pmacro.output( "prepare_absorber_scan: ")
    pmacro.indexScan = 1
    pmacro.signalCounter = pmacro.getEnv( 'SignalCounter')
    if not pmacro.signalCounter:
        pmacro.output( "prepare_absorber_scan: no SignalCounter")
        return 0
    pmacro.signalCounterDevName = HasyUtils.getDeviceNameByAlias( pmacro.signalCounter)
    absInfo = pmacro.getEnv( 'absInfo')
    pmacro.absDct = {}
    for pair in absInfo.split(','):
        k,v = pair.split( ':')
        pmacro.absDct[k] = v

    for i in ['absorber', 'absDelta', 'countsMin', 'countsMax', 'absMin', 'absMax']:
        if i not in pmacro.absDct.keys():
            pmacro.output( "prepare_absorber_scan: absInfo does not contain %s" % i)
            return 0

    for i in ['absDelta', 'countsMin', 'countsMax', 'absMin', 'absMax']:
        pmacro.absDct[ i] = float( pmacro.absDct[ i])

    try:
        pmacro.absorberProxy = PyTango.DeviceProxy( pmacro.absDct[ 'absorber'])
    except:
        pmacro.output( "prepare_absorber_scan: failed to create a proxy to %s " % pmacro.absDct[ 'absorber'])
        return 0

def prepare_scan( pmacro):
    #
    # this is how the selector is used
    #
    if __builtins__.has_key( 'gh_selector'):
        if __builtins__[ 'gh_selector'] == "absorber":    
            prepare_absorber_scan( pmacro)
        else:
            pass

def getSignal( pmacro):
    """
    extract the current value of SignalCounter
    """
    #
    # if signalCounter hasn't been found in prepare_absorber_scan, return 0
    #
    if not pmacro.signalCounter:
        return 0
    lst = pmacro.data.records
    if len(lst) == 0:
        pmacro.output( "getSignal: no data")
        return None
    flag = 0
    for k in lst[-1].data.keys():
        if k.find( pmacro.signalCounterDevName) >= 0: 
            return float( lst[-1].data[k])
    pmacro.output( "getSignal: failed to find data for %s" % pmacro.signalCounterDevName)
    return None

def check_condition_absorber( pmacro):
    signal = getSignal( pmacro)
    if signal is None:
        pmacro.output("check_condition_absorber: no data")
        return 0
    #
    # signal is too big, move absorber into the beam
    #
    if signal > pmacro.absDct[ 'countsMax']:
        pmacro.output( "signal %g > max %g" % (signal, pmacro.absDct[ 'countsMax']))
        if (pmacro.absorberProxy.Position + pmacro.absDct[ 'absDelta']) > pmacro.absDct[ 'absMax']:
            pmacro.output( "check_condition: absorber at the limit %g " % pmacro.absDct[ 'absMax'])
            return None
        pmacro.output( "check_condition: move %s to %g" % 
                     (pmacro.absDct[ 'absorber'], pmacro.absorberProxy.Position + pmacro.absDct[ 'absDelta']))
        pmacro.mvr( pmacro.absDct[ 'absorber'], pmacro.absDct[ 'absDelta']) 
        argout = 1
    #
    # signal is too small, move absorber out of the beam
    #
    elif signal < pmacro.absDct[ 'countsMin']:
        pmacro.output( "signal %g < min %g" % (signal, pmacro.absDct[ 'countsMin']))
        if (pmacro.absorberProxy.Position - pmacro.absDct[ 'absDelta']) < pmacro.absDct[ 'absMin']:
            pmacro.output( "check_condition: absorber at the limit %g " % pmacro.absDct[ 'absMin'])
            return None
        pmacro.output( "check_condition: move %s to %g" % 
                     (pmacro.absDct[ 'absorber'], pmacro.absorberProxy.Position - pmacro.absDct[ 'absDelta']))
        pmacro.mvr( pmacro.absDct[ 'absorber'], -pmacro.absDct[ 'absDelta']) 
        argout = 1
    #
    # signal is ok
    #
    else:
        pmacro.output( "signal %g [%g, %g] ok" % (signal, pmacro.absDct[ 'countsMin'], pmacro.absDct[ 'countsMax']))
        argout = 0

    return argout
#
#--- 
#
# the hooks
#
def gh_pre_scan(pmacro):
    pmacro.output( "general-pre-scan hook ")
    if not hasattr( pmacro, 'indexScan'):
        prepare_scan(pmacro)

def gh_pre_move(pmacro):
    pmacro.indexScan = pmacro.indexScan + 1
    pmacro.output( "Scan pre move, indexScan %d" % pmacro.indexScan)

def gh_post_move(pmacro):
    pmacro.output( "Scan post move")

def gh_pre_acq(pmacro):
    pmacro.output( "Scan pre acq")

def gh_post_acq(pmacro):
    pmacro.output( "Scan post acq")

def gh_post_scan(pmacro):
    pmacro.output( "Scan post scan")
#
# the condition function
#
def check_condition(pmacro):
    pmacro.output("Scan check_condition")
    #
    # indexScan is checked to see whether everything has been prepared
    #
    if not hasattr( pmacro, 'indexScan'):
        prepare_scan(pmacro)

    if  __builtins__.has_key( 'gc_selector') and \
            __builtins__[ 'gc_selector'] == "absorber":    
        return check_condition_absorber( pmacro)
    return 0
#
# the general stop function
#
def general_on_stop(pmacro):

    if __builtins__.has_key( 'gs_selector'):
        #
        # == 'scan' if we are in a scan, prepared in gscan.py
        #
        if __builtins__[ 'gs_selector'] == "scan":
            pmacro.output( "general_on_stop for %s" % __builtins__[ 'gs_selector'])
        #
        # == 'general' for all other macros, mv, wa, etc.
        #
        elif __builtins__[ 'gs_selector'] == "general":
            pmacro.output( "general on_stop for %s" % __builtins__[ 'gs_selector'])
        else:
            pmacro.output( "general on_stop, %s" % __builtins__[ 'gs_selector'] )
    else:
        pmacro.output("General on_stop is called without selector")



2019-11-13