#!/bin/env python """sweep scan p02""" __all__ = ["sweep_p02","sweep_senv"] import PyTango from sardana.macroserver.macro import * from sardana.macroserver.macro import macro import json import time import pytz import datetime import numpy import math angleOffsets = { "polar_angle_channel_1" : 0., "polar_angle_channel_2" : 1., "polar_angle_channel_3" : 2., "polar_angle_channel_4" : 3., "polar_angle_channel_5" : 4., "polar_angle_channel_6" : 5., "polar_angle_channel_7" : 6., "polar_angle_channel_8" : 7., "polar_angle_channel_9" : 8., "polar_angle_channel_10" : 9. } class sweep_p02(Macro): """Perfoms a sweep scan. Counters are read during the sweep (they are always active) """ param_def = [ ['motor', Type.Motor, None, 'Sweep motor'], ['sweep_start_pos', Type.Float, None, 'Sweep start position'], ['sweep_distance', Type.Float, None, 'Sweep distance'], ['sweep_offset', Type.Float, None, 'Sweep offset'], ['nb_intervals', Type.Integer, None, 'Sweep number of intervals'], ['sample_time', Type.Float, None, 'Sweep sample time'] ] def run(self, motor, sweep_start_pos, sweep_distance, sweep_offset, nb_intervals, sample_time): sweep_counters_prefix = self.getEnv('SweepCountersPrefix') sweep_counters = [] for i in range(0,9): counter_name = sweep_counters_prefix + "0" + str(i+1) sweep_counters.append(counter_name) counter_name = sweep_counters_prefix + "10" sweep_counters.append(counter_name) counter_name = sweep_counters_prefix + "32" clock_name = counter_name sweep_counters.append(counter_name) db = PyTango.Database() hshCounters = {} for counter in sweep_counters: hshCounters[counter] = {} hshCounters[counter]['name'] = counter full_sardana_name = db.get_device_alias(counter) # The properties can not be found using the alias name b = db.get_device_property( full_sardana_name, ['__SubDevices']) dName = b['__SubDevices'][0] # The device to the Tango device and not to the Sardana device is need for # writing in the Counts attribute hshCounters[counter]['proxy'] = PyTango.DeviceProxy(dName) nexusconfig_device_name = self.getEnv('NeXusConfigDevice') # class XMLConfigServer nexuswriter_device_name = self.getEnv('NeXusWriterDevice') # class TangoDataServer self.nexusconfig_device = PyTango.DeviceProxy(nexusconfig_device_name) self.nexuswriter_device = PyTango.DeviceProxy(nexuswriter_device_name) motor_device = PyTango.DeviceProxy(motor.name) # For the timer one has to use the corresponding tango device and not the device in the pool # because it can not be started outside a measurement group timer_device_name = self.getEnv('SweepTimerTangoDevice') timer_device = PyTango.DeviceProxy(timer_device_name) slew_orig = motor_device.Velocity self.sweep_motor = motor.name self.sweep_start = sweep_start_pos self.sweep_distance = sweep_distance self.sweep_offset = sweep_offset self.nb_intervals = nb_intervals self.sample_time = sample_time # Compute sweep velocity sweep_time = nb_intervals*sample_time sweep_full_distance = sweep_distance + 2*sweep_offset move_time_orig = math.fabs( (sweep_full_distance * motor_device.Step_per_unit)/slew_orig) self.output("sweep_time %g move_time_orig %g " % (sweep_time, move_time_orig)) slew_sweep = int(sweep_full_distance/sweep_time) if slew_sweep > slew_orig or slew_sweep < motor_device.Base_rate: raise ValueError("slew_sweep %d out of range [%d,%d]" % (slew_sweep, slew_orig, motor_device.Base_rate)) self._openNxFile() self._openNxEntry( "ten_channel_detector") # # Move the motor to the start position # self.output("p02sweep: moving %s to start position %g" % (motor.name, (sweep_start_pos - sweep_offset))) motor_device.write_attribute( "position", (sweep_start_pos - sweep_offset)) while motor_device.State() == PyTango.DevState.MOVING: time.sleep(1) # # Reset the counters # for cnt in hshCounters.keys(): hshCounters[cnt]['proxy'].Counts = 0 # # Start the timer # timer_device.SampleTime = 2000 timer_device.Start() time_start_timer = time.time() # # Start the sweep # sweep_final_pos = sweep_start_pos + sweep_distance + sweep_offset self.output("Sweeping from %g to %g" % (motor_device.position, sweep_final_pos)) motor_device.write_attribute( "position", sweep_final_pos) time_overhead = 0. hshCounterReadings = {} for cnt in hshCounters.keys(): hshCounterReadings[cnt] = {} hshCounterReadings[cnt]['counts'] = 0 hshCounterReadings[cnt]['counts_old'] = 0 pos_old = motor_device.position hshMain = {} hshRecord = {} hshMain['data'] = hshRecord count = 1 time_cycle_start = time.time() time_total_start = time.time() while( motor_device.state() == PyTango.DevState.MOVING): if sample_time > time_overhead: time.sleep(sample_time - time_overhead) time_overhead_start = time.time() pos_before = motor_device.position # # read all counters # for cnt in hshCounters.keys(): self.output(cnt) hshCounterReadings[cnt]['counts'] = hshCounters[cnt]['proxy'].Counts self.output(hshCounterReadings[cnt]['counts']) # # clear the clock because we might run out of 32 bit # hshCounters[clock_name]['proxy'].Counts = 0 hshCounterReadings[clock_name]['counts_old'] = 0 # # the position is the mean value - before and after the counter reading # pos = (pos_before + motor_device.position)/2. if hshCounterReadings[clock_name]['counts'] == hshCounterReadings[clock_name]['counts_old']: self.output("No clock counts") break corr = 1000000.*sample_time/(hshCounterReadings[clock_name]["counts"] - hshCounterReadings[clock_name]["counts_old"]) for mot in sorted( angleOffsets.keys()): hshRecord[mot] = pos + angleOffsets[mot] for cnt in sorted(hshCounters.keys()): temp = corr*(hshCounterReadings[cnt]["counts"] - hshCounterReadings[cnt]["counts_old"]) if temp < 0: temp = 0 hshRecord[hshCounters[cnt]['name']] = temp hshCounterReadings[cnt]['counts_old'] = hshCounterReadings[cnt]['counts'] hshRecord['correction'] = corr hshRecord['delta_position'] = pos - pos_old pos_old = pos self._sendRecordToNxWriter( hshMain) time_cycle = time.time() - time_cycle_start time_cycle_start = time.time() self.output("%d Ovrhd %g, timeCycle %g, corr %g" % (count, time_overhead, time_cycle, corr)) count += 1 if (count % 20) == 0: self.output(" %d motor at %g " % (count, motor_device.position)) # # make sure that the timer does not expire # if (time.time() - time_start_timer) > 1900: timer_device.Stop() timer_device.SampleTime = 2000 timer_device.Start() time_start_timer = time.time() time_overhead = time.time() - time_overhead_start self.output("sweep ended, total time %gs (%gs) " % ((time.time() - time_total_start), nb_intervals*sample_time)) self._closeNxEntry() self._closeNxFile() def _openNxFile(self): sweep_id = self.getEnv('SweepID') fileNameNx = self.getEnv('SweepFileName') + "_" + str(sweep_id) + ".h5" self.setEnv("SweepID", sweep_id + 1) self.nexuswriter_device.Init() self.output(fileNameNx) self.nexuswriter_device.FileName = str(fileNameNx) self.nexuswriter_device.OpenFile() return 1 def _openNxEntry(self, componentName): self._sendGlobalDictionaryBefore() self.nexusconfig_device.Open() cmps = self.nexusconfig_device.AvailableComponents() cmp_list = [] if componentName not in cmps: self.output("_openNxEntry: %s not in configuration server", componentName) else: cmp_list.append(componentName) self.nexusconfig_device.CreateConfiguration(cmp_list) xmlconfig = self.nexusconfig_device.XMLString self.nexuswriter_device.XMLSettings = str(xmlconfig) try: self.nexuswriter_device.OpenEntry() except: pass return 1 def _closeNxFile(self): self.nexuswriter_device.CloseFile() return 1 def _closeNxEntry(self): self._sendGlobalDictionaryAfter() self.nexuswriter_device.CloseEntry() return 1 def _sendGlobalDictionaryBefore(self): hsh = {} hshSub = {} hshSub['sweep_motor'] = self.sweep_motor hshSub['sweep_start'] = self.sweep_start hshSub['sweep_distance'] = self.sweep_distance hshSub['sweep_offset'] = self.sweep_offset hshSub['interval'] = self.nb_intervals hshSub['sample_time'] = self.sample_time amsterdam = pytz.timezone('Europe/Amsterdam') fmt = '%Y-%m-%dT%H:%M:%S.%f%z' starttime = amsterdam.localize(datetime.datetime.now()) hshSub['start_time'] = str(starttime.strftime(fmt)) try: title = self.getEnv('SweepTitle') except: title = "" hshSub['title'] = title try: sample_name = self.getEnv('SweepSampleName') except: sample_name = "" hshSub['sample_name'] = sample_name try: chemical_formula = self.getEnv('SweepChemicalFormula') except: chemical_formula = "" hshSub['chemical_formula'] = chemical_formula try: beamtime_id = self.getEnv('SweepBeamtimeId') except: beamtime_id = "" hshSub['beamtime_id'] = beamtime_id hsh['data'] = hshSub self._setParameter( hsh) return 1 def _sendGlobalDictionaryAfter(self): hsh = {} hshSub = {} amsterdam = pytz.timezone('Europe/Amsterdam') fmt = '%Y-%m-%dT%H:%M:%S.%f%z' starttime = amsterdam.localize(datetime.datetime.now()) hshSub['end_time'] = str(starttime.strftime(fmt)) hshSub['comments'] = "some comment" hsh['data'] = hshSub self._setParameter( hsh) return 1 def _setParameter(self, hsh): jsondata = json.dumps( hsh) self.nexuswriter_device.JSONRecord = str(jsondata) return 1 def _sendRecordToNxWriter(self, hsh): mstr = json.dumps( hsh) self.nexuswriter_device.Record(mstr) return 1 class sweep_senv(Macro): """ Sets default environment variables """ def run(self): file_name = "/home/tnunez/sweep_files/test_sweep_output" self.setEnv("SweepFileName", file_name) self.output("Setting SweepFileName to %s" % file_name) nexus_config_device = "haso111tb:10000/test/xmlconfigserver/01" self.setEnv("NeXusConfigDevice", nexus_config_device) self.output("Setting NeXusConfigDevice to %s" % nexus_config_device) nexus_writer_device = "haso111tb:10000/test/tangodataserver/01" self.setEnv("NeXusWriterDevice", nexus_writer_device) self.output("Setting NeXusWriterDevice to %s" % nexus_writer_device) self.setEnv("SweepID", 0) self.output("Setting SweepID to 0") sweep_counters_prefix = "exp_c" self.setEnv("SweepCountersPrefix", sweep_counters_prefix) self.output("Setting SweepCountersPrefix to %s" % sweep_counters_prefix) sweep_timer_tango_device = "haso111tb:10000/p09/dgg2/exp.01" self.setEnv("SweepTimerTangoDevice", sweep_timer_tango_device) self.output("Setting SweepTimerTangoDevice to %s" % sweep_timer_tango_device)
Notice that mot1.write_attribute( "position", target) has been used to move the motor. We use this syntax and not mot1.position = target because the latter form was partially not supported.