#!/bin/env python
#
# This VmExecutor moves the monochromator to some energy and
# optimizes another motor (TILT) using a counter and a timer.
# The movement and the optimization is performed in a thread.
# The motor stays MOVING until the optimization is done.
#
import PyTango
import time, os
import HasyUtils
from threading import Thread
import numpy as np
NAME_MONO = "hasep99oh:10000/p99/dcmener/oh.01"
#NAME_PITCH = "exp_dmy01"
NAME_PITCH = "hasep99oh:10000/p99/motor/oh.02"
NAME_TIMER = "hasep99oh:10000/p99/dgg2/eh.01"
NAME_COUNTER = "hasep99oh:10000/p99/counter/eh.01"
flagTTy = False
flagStabilizing = False
flagStopRequested = False
#
# Defaults for scanRange, sampleTime and nPoints
#
SCAN_RANGE = 0.006
SAMPLE_TIME = 0.1
N_POINTS = 30
PITCH_OFFSET = 0.
class MostabThread( Thread):
def __init__( self, nameMono, target, namePitch, nameTimer, nameCounter,
sampleTime, scanRange, nPoints, pitchOffset):
Thread.__init__( self)
self.proxyMono = PyTango.DeviceProxy( nameMono)
self.proxyPitch = PyTango.DeviceProxy( namePitch)
self.proxyTimer = PyTango.DeviceProxy( nameTimer)
self.proxyCounter = PyTango.DeviceProxy( nameCounter)
self.target = target
self.pitchOffset = pitchOffset
self.sampleTime = sampleTime
self.scanRange = scanRange
self.nPoints = nPoints
#
def __del__( self):
global flagStabilizing
flagStabilizing = False
#
def moveTo( self, proxy, pos):
”'
move proxy to pos, sensing flagStopRequested
”'
if flagTTY:
print( "moveTo %s %g " % ( proxy.name(), pos))
proxy.position = pos
while proxy.state() != PyTango.DevState.ON:
if flagStopRequested:
proxy.command_inout( "StopMove")
if flagTTY:
print9 "moveTo: aborting")
return False
time.sleep(0.1)
return True
#
def getSignal( self):
”'
use proxyCounter and proxyTimer to measure a signal, return normalized
”'
self.proxyCounter.command_inout( "Reset")
self.proxyTimer.sampleTime = self.sampleTime
self.proxyTimer.command_inout( "Start")
while self.proxyTimer.state() != PyTango.DevState.ON:
time.sleep( 0.01)
res = self.proxyCounter.Counts/self.sampleTime
return res
#
def getPosMax( self):
”'
use SSA to calculate the maximum
”'
hsh = HasyUtils.ssa( np.array(self.pos), np.array(self.sig))
print( "ssa results %s" % repr( hsh))
if hsh['status'] == 1:
return hsh['midpoint']
else:
return None
#
def stabilizeMono( self):
”'
make a relative scan (scanRange, nPoints, sampleTime)
to find the highest signal and move the motor there
”'
if flagTTY:
print( "stabilizeMono: range %g, np %d, st %g" % (self.scanRange, self.nPoints, self.sampleTime))
posOld = self.proxyPitch.Position
delta = self.scanRange/(self.nPoints - 1.)
posStart = posOld - self.scanRange/2.
if not self.moveTo( self.proxyPitch, posStart):
return False
self.pos = []
self.sig = []
for i in range( self.nPoints):
posNew = posStart + delta*i
if not self.moveTo( self.proxyPitch, posNew):
return False
sigTemp = self.getSignal()
print( "pos %g, signal %g " % (posNew, sigTemp))
self.pos.append( posNew)
self.sig.append( sigTemp)
posMax = self.getPosMax()
if flagTTY:
print( "peak at %g" % posMax)
if posMax is None:
if flagTTY: print( "no peak found")
if not self.moveTo( self.proxyPitch, posOld):
return False
return False
else:
if not self.moveTo( self.proxyPitch, posMax + self.pitchOffset):
return False
return True
def run(self):
global flagStabilizing
flagStabilizing = True
monoOld = self.proxyMono.position
#
# move the mono
#
if flagTTY:
print( "mono to %g" % self.target)
if not self.moveTo( self.proxyMono, self.target):
PyTango.Except.throw_exception( "mostab", "failed to move mono to %g" % self.target, "VmExecutor")
#
# find the maximum for proxyPitch
#
if not self.stabilizeMono():
flagStabilizing = False
self.moveTo( self.proxyMono, monoOld)
PyTango.Except.throw_exception( "mostab", "stabilizing failed, mono back to %g" % monoOld, "VmExecutor")
if flagTTY:
print( "mostab: stabilization successful")
flagStabilizing = False
return
class VM:
#
# init_device
#
def __init__( self):
global flagTTY
self.ResultSim = None
self.PositionSim = None
self.proxyMono = PyTango.DeviceProxy( NAME_MONO)
self.proxyPitch = PyTango.DeviceProxy( NAME_PITCH)
self.proxyCounter = PyTango.DeviceProxy( NAME_COUNTER)
self.proxyTimer = PyTango.DeviceProxy( NAME_TIMER)
self.pitchOffset = PITCH_OFFSET
self.sampleTime = SAMPLE_TIME
self.scanRange = SCAN_RANGE
self.nPoints = N_POINTS
flagStabilizing = False
flagTTY = os.isatty( 1)
return
#
#
#
def read_DynamicAttr( self, name):
if name == 'sampleTime':
return self.sampleTime
elif name == 'pitchOffset':
return self.pitchOffset
elif name == 'scanRange':
return self.scanRange
elif name == 'nPoints':
return self.nPoints
#
#
#
def write_DynamicAttr( self, name, value):
if name == 'sampleTime':
self.sampleTime = float( value)
elif name == 'pitchOffset':
self.pitchOffset = float( value)
elif name == 'scanRange':
self.scanRange = float( value)
elif name == 'nPoints':
self.nPoints = int( value)
#
# dev_state
#
def dev_state( self):
if flagStabilizing:
return PyTango.DevState.MOVING
return self.proxyPitch.state()
#
# read position
#
def read_Position( self):
pos = self.proxyMono.Position
return pos
#
# write position
#
def write_Position( self, argin):
global flagStopRequested
flagStopRequested = False
#
# check mono limits
#
if( argin < self.proxyMono.UnitLimitMin or
argin > self.proxyMono.UnitLimitMax):
PyTango.Except.throw_exception(
"mostab",
"requested position outside limits %g %g " % (self.proxyMono.UnitLimitMin, self.proxyMono.UnitLimitMax),
"VmExecutor")
if self.sampleTime < 0.001 or self.sampleTime > 1.:
PyTango.Except.throw_exception( "mostab", "sampleTime < 0.001 or > 1. %g " % ( self.sampleTime), "VmExecutor")
if self.nPoints < 5 or self.nPoints > 100:
PyTango.Except.throw_exception( "mostab", "nPoints < 5 or > 100 %d " % ( self.nPoints), "VmExecutor")
mvThread = MostabThread( self.proxyMono.name(), argin,
self.proxyPitch.name(), self.proxyTimer.name(), self.proxyCounter.name(),
self.sampleTime, self.scanRange, self.nPoints, self.pitchOffset)
mvThread.start()
return 1
#
# UnitLimitMax
#
def read_UnitLimitMax( self):
return self.proxyMono.UnitLimitMax
def write_UnitLimitMax( self, argin):
PyTango.Except.throw_exception( "mostab", "write_UnitLimitMax: not allowed", "VmExecutor")
#
# UnitLimitMin
#
def read_UnitLimitMin( self):
return self.proxyMono.UnitLimitMin
def write_UnitLimitMin( self, argin):
PyTango.Except.throw_exception( "mostab", "write_UnitLimitMin: not allowed", "VmExecutor")
#
# CwLimit, CcwLimit
#
def read_CwLimit( self):
return 0
def read_CcwLimit( self):
return 0
#
# PositionSim
#
def read_PositionSim( self):
PyTango.Except.throw_exception( "mostab", "PositionSim does not apply", "VmExecutor")
def write_PositionSim( self, argin):
PyTango.Except.throw_exception( "mostab", "PositionSim does not apply", "VmExecutor")
def read_ResultSim( self):
PyTango.Except.throw_exception( "mostab", "PositionSim does not apply", "VmExecutor")
def StopMove( self):
global flagStopRequested
flagStopRequested = True
return self.proxyMono.StopMove()