#!/bin/env python # # This VmExecutor moves the monochromator to some energy and # optimizes another motor (TILT) using a counter and a timer. # The movement and the optimization is performed in a thread. # The motor stays MOVING until the optimization is done. # import PyTango import time, os import HasyUtils from threading import Thread import numpy as np NAME_MONO = "hasep99oh:10000/p99/dcmener/oh.01" #NAME_PITCH = "exp_dmy01" NAME_PITCH = "hasep99oh:10000/p99/motor/oh.02" NAME_TIMER = "hasep99oh:10000/p99/dgg2/eh.01" NAME_COUNTER = "hasep99oh:10000/p99/counter/eh.01" flagTTy = False flagStabilizing = False flagStopRequested = False # # Defaults for scanRange, sampleTime and nPoints # SCAN_RANGE = 0.006 SAMPLE_TIME = 0.1 N_POINTS = 30 PITCH_OFFSET = 0. class MostabThread( Thread): def __init__( self, nameMono, target, namePitch, nameTimer, nameCounter, sampleTime, scanRange, nPoints, pitchOffset): Thread.__init__( self) self.proxyMono = PyTango.DeviceProxy( nameMono) self.proxyPitch = PyTango.DeviceProxy( namePitch) self.proxyTimer = PyTango.DeviceProxy( nameTimer) self.proxyCounter = PyTango.DeviceProxy( nameCounter) self.target = target self.pitchOffset = pitchOffset self.sampleTime = sampleTime self.scanRange = scanRange self.nPoints = nPoints # def __del__( self): global flagStabilizing flagStabilizing = False # def moveTo( self, proxy, pos): ”' move proxy to pos, sensing flagStopRequested ”' if flagTTY: print( "moveTo %s %g " % ( proxy.name(), pos)) proxy.position = pos while proxy.state() != PyTango.DevState.ON: if flagStopRequested: proxy.command_inout( "StopMove") if flagTTY: print9 "moveTo: aborting") return False time.sleep(0.1) return True # def getSignal( self): ”' use proxyCounter and proxyTimer to measure a signal, return normalized ”' self.proxyCounter.command_inout( "Reset") self.proxyTimer.sampleTime = self.sampleTime self.proxyTimer.command_inout( "Start") while self.proxyTimer.state() != PyTango.DevState.ON: time.sleep( 0.01) res = self.proxyCounter.Counts/self.sampleTime return res # def getPosMax( self): ”' use SSA to calculate the maximum ”' hsh = HasyUtils.ssa( np.array(self.pos), np.array(self.sig)) print( "ssa results %s" % repr( hsh)) if hsh['status'] == 1: return hsh['midpoint'] else: return None # def stabilizeMono( self): ”' make a relative scan (scanRange, nPoints, sampleTime) to find the highest signal and move the motor there ”' if flagTTY: print( "stabilizeMono: range %g, np %d, st %g" % (self.scanRange, self.nPoints, self.sampleTime)) posOld = self.proxyPitch.Position delta = self.scanRange/(self.nPoints - 1.) posStart = posOld - self.scanRange/2. if not self.moveTo( self.proxyPitch, posStart): return False self.pos = [] self.sig = [] for i in range( self.nPoints): posNew = posStart + delta*i if not self.moveTo( self.proxyPitch, posNew): return False sigTemp = self.getSignal() print( "pos %g, signal %g " % (posNew, sigTemp)) self.pos.append( posNew) self.sig.append( sigTemp) posMax = self.getPosMax() if flagTTY: print( "peak at %g" % posMax) if posMax is None: if flagTTY: print( "no peak found") if not self.moveTo( self.proxyPitch, posOld): return False return False else: if not self.moveTo( self.proxyPitch, posMax + self.pitchOffset): return False return True def run(self): global flagStabilizing flagStabilizing = True monoOld = self.proxyMono.position # # move the mono # if flagTTY: print( "mono to %g" % self.target) if not self.moveTo( self.proxyMono, self.target): PyTango.Except.throw_exception( "mostab", "failed to move mono to %g" % self.target, "VmExecutor") # # find the maximum for proxyPitch # if not self.stabilizeMono(): flagStabilizing = False self.moveTo( self.proxyMono, monoOld) PyTango.Except.throw_exception( "mostab", "stabilizing failed, mono back to %g" % monoOld, "VmExecutor") if flagTTY: print( "mostab: stabilization successful") flagStabilizing = False return class VM: # # init_device # def __init__( self): global flagTTY self.ResultSim = None self.PositionSim = None self.proxyMono = PyTango.DeviceProxy( NAME_MONO) self.proxyPitch = PyTango.DeviceProxy( NAME_PITCH) self.proxyCounter = PyTango.DeviceProxy( NAME_COUNTER) self.proxyTimer = PyTango.DeviceProxy( NAME_TIMER) self.pitchOffset = PITCH_OFFSET self.sampleTime = SAMPLE_TIME self.scanRange = SCAN_RANGE self.nPoints = N_POINTS flagStabilizing = False flagTTY = os.isatty( 1) return # # # def read_DynamicAttr( self, name): if name == 'sampleTime': return self.sampleTime elif name == 'pitchOffset': return self.pitchOffset elif name == 'scanRange': return self.scanRange elif name == 'nPoints': return self.nPoints # # # def write_DynamicAttr( self, name, value): if name == 'sampleTime': self.sampleTime = float( value) elif name == 'pitchOffset': self.pitchOffset = float( value) elif name == 'scanRange': self.scanRange = float( value) elif name == 'nPoints': self.nPoints = int( value) # # dev_state # def dev_state( self): if flagStabilizing: return PyTango.DevState.MOVING return self.proxyPitch.state() # # read position # def read_Position( self): pos = self.proxyMono.Position return pos # # write position # def write_Position( self, argin): global flagStopRequested flagStopRequested = False # # check mono limits # if( argin < self.proxyMono.UnitLimitMin or argin > self.proxyMono.UnitLimitMax): PyTango.Except.throw_exception( "mostab", "requested position outside limits %g %g " % (self.proxyMono.UnitLimitMin, self.proxyMono.UnitLimitMax), "VmExecutor") if self.sampleTime < 0.001 or self.sampleTime > 1.: PyTango.Except.throw_exception( "mostab", "sampleTime < 0.001 or > 1. %g " % ( self.sampleTime), "VmExecutor") if self.nPoints < 5 or self.nPoints > 100: PyTango.Except.throw_exception( "mostab", "nPoints < 5 or > 100 %d " % ( self.nPoints), "VmExecutor") mvThread = MostabThread( self.proxyMono.name(), argin, self.proxyPitch.name(), self.proxyTimer.name(), self.proxyCounter.name(), self.sampleTime, self.scanRange, self.nPoints, self.pitchOffset) mvThread.start() return 1 # # UnitLimitMax # def read_UnitLimitMax( self): return self.proxyMono.UnitLimitMax def write_UnitLimitMax( self, argin): PyTango.Except.throw_exception( "mostab", "write_UnitLimitMax: not allowed", "VmExecutor") # # UnitLimitMin # def read_UnitLimitMin( self): return self.proxyMono.UnitLimitMin def write_UnitLimitMin( self, argin): PyTango.Except.throw_exception( "mostab", "write_UnitLimitMin: not allowed", "VmExecutor") # # CwLimit, CcwLimit # def read_CwLimit( self): return 0 def read_CcwLimit( self): return 0 # # PositionSim # def read_PositionSim( self): PyTango.Except.throw_exception( "mostab", "PositionSim does not apply", "VmExecutor") def write_PositionSim( self, argin): PyTango.Except.throw_exception( "mostab", "PositionSim does not apply", "VmExecutor") def read_ResultSim( self): PyTango.Except.throw_exception( "mostab", "PositionSim does not apply", "VmExecutor") def StopMove( self): global flagStopRequested flagStopRequested = True return self.proxyMono.StopMove()