Macros for performing continuos scans taking data with the XIA and/or SIS3820 MCS triggered by the Zebra. The data is saved in NeXus files using XMLConfigServer und TangoDataServer devices. Macros for setting the requiered spock enviroment variables are also provided.
#!/bin/env python
"""continuous scan macros"""
__all__ = ["cscan_zebra_mcs", "cscan_zebra_mcs_xia", "cscan_zebra_xia", "cscan_zebra_mcs_senv", "cscan_zebra_mcs_xia_senv", "cscan_zebra_xia_senv"]
import PyTango
from sardana.macroserver.macro import *
from sardana.macroserver.macro import macro
import json
import time
import pytz
import datetime
import numpy
class cscan_zebra_mcs(Macro):
"""Perfoms a continuous scan with the zebra triggering the mcs"""
param_def = [
['motor', Type.Motor, None, 'Motor to move'],
['start_pos', Type.Float, None, 'Scan start position'],
['final_pos', Type.Float, None, 'Scan final position'],
['nb_triggers', Type.Integer, None, 'Nb of triggers generated by the zebra'],
['trigger_interval', Type.Float, None, 'Time between consecutive triggers'],
['nb_mcs_channels', Type.Integer, None, 'Nb of mcs channels to recorder']
]
result_def = [ [ "result", Type.String, None, "the cscan object" ]]
def run(self, motor, start_pos, final_pos, nb_triggers, trigger_interval, nb_mcs_channels):
zebra_device_name = self.getEnv('ZebraDevice')
mcs_device_name = self.getEnv('MCSDevice')
nexusconfig_device_name = self.getEnv('NeXusConfigDevice') # class XMLConfigServer
nexuswriter_device_name = self.getEnv('NeXusWriterDevice') # class TangoDataServer
zebra_device = PyTango.DeviceProxy(zebra_device_name)
mcs_device = PyTango.DeviceProxy(mcs_device_name)
motor_device = PyTango.DeviceProxy(motor.name)
self.nexusconfig_device = PyTango.DeviceProxy(nexusconfig_device_name)
self.nexuswriter_device = PyTango.DeviceProxy(nexuswriter_device_name)
# Set the zebra device
zebra_device.NbTriggers = nb_triggers
zebra_device.TriggerInterval = trigger_interval
# Set MCS
mcs_device.NbAcquisitions = 0 # Continuous mode
mcs_device.NbChannels = nb_mcs_channels
nb_mcs_taken_triggers = nb_triggers
# Clear and setup mcs
mcs_device.ClearSetupMCS()
# Compute motor slewrate for the continuous scan
old_slewrate = motor_device.Velocity
scan_slewrate = abs((final_pos - start_pos)/motor_device.Step_per_unit)/(trigger_interval * (nb_triggers - 1))
motor_device.Velocity = int(scan_slewrate)
# Move motor to start position minus an offset
pos_offset = 0.1
motor_device.write_attribute( "position", (start_pos - pos_offset))
while motor_device.State() == PyTango.DevState.MOVING:
time.sleep(1)
# Start motor movement to final position (the macro mv can not be used because it blocks)
motor_device.write_attribute( "position", (final_pos + pos_offset))
# Check when the motor is in the start_position for starting the trigger
while(motor_device.position < start_pos):
time.sleep(0.001)
# Open the nexus file for saving data
self._openNxFile()
self._openNxEntry(motor.name, start_pos, final_pos, nb_mcs_channels, nb_mcs_taken_triggers, trigger_interval)
# Start zebra triggering
zebra_device.Arm = 1
# Check when the triggering is done
while zebra_device.State() == PyTango.DevState.MOVING:
time.sleep(1)
# Reset motor slewrate
motor_device.Velocity = old_slewrate
# Read MCS
mcs_device.ReadMCS()
# MCS Data
mcs_data = mcs_device.CountsArray
# Zebra encoder Data
zebra_data = zebra_device.EncoderSpectrum
# Prepare and write the data to the NeXus File
hshMain = {}
hshRecord = {}
hshMain['data'] = hshRecord
for i in range(0, nb_mcs_taken_triggers):
# numpy data are not json serializable. They have to be
# transformed to native types
hshRecord["encoder_pos"] = numpy.asscalar(zebra_data[i])
for j in range(0, nb_mcs_channels):
if j < 9:
data_name = "counts_mcs_ch0" + str(j+1)
else:
data_name = "counts_mcs_ch" + str(j+1)
hshRecord[data_name] = numpy.asscalar(mcs_data[i][j])
self._sendRecordToNxWriter(hshMain)
# Close NeXus file
self._closeNxEntry()
self._closeNxFile()
result = "None"
return result
def _openNxFile(self):
cscan_id = self.getEnv('CScanID')
fileNameNx = self.getEnv('CScanFileName') + "_" + str(cscan_id) + ".h5"
self.setEnv("CScanID", cscan_id + 1)
self.nexuswriter_device.Init()
self.nexuswriter_device.FileName = str(fileNameNx)
self.nexuswriter_device.OpenFile()
return 1
def _openNxEntry(self, motor_name, start_pos, final_pos, nb_mcs_channels, nb_triggers, trigger_interval):
self._sendGlobalDictionaryBefore(motor_name, start_pos, final_pos, nb_triggers, trigger_interval)
self.nexusconfig_device.Open()
cmps = self.nexusconfig_device.AvailableComponents()
cmp_list = []
if "zebra_mcs" not in cmps:
self.output("_openNxEntry: zebra_mcs not in configuration server")
else:
cmp_list.append("zebra_mcs")
for i in range (1,nb_mcs_channels + 1):
if i < 10:
cmp_ch = "mcs_ch0" + str(i)
else:
cmp_ch = "mcs_ch" + str(i)
if cmp_ch not in cmps:
self.output("_openNxEntry: %s not in configuration server" % cmp_ch)
else:
cmp_list.append(cmp_ch)
self.nexusconfig_device.CreateConfiguration(cmp_list)
xmlconfig = self.nexusconfig_device.XMLString
self.nexuswriter_device.TheXMLSettings = str(xmlconfig)
self.nexuswriter_device.OpenEntry()
return 1
def _closeNxFile(self):
self.nexuswriter_device.CloseFile()
return 1
def _closeNxEntry(self):
self._sendGlobalDictionaryAfter()
self.nexuswriter_device.CloseEntry()
return 1
def _sendGlobalDictionaryBefore(self, motor_name, start_pos, final_pos, nb_triggers, trigger_interval):
hsh = {}
hshSub = {}
hshSub['motor_name'] = str(motor_name)
hshSub['start_pos'] = start_pos
hshSub['final_pos'] = final_pos
hshSub['nb_triggers'] = nb_triggers
hshSub['sample_time'] = trigger_interval
amsterdam = pytz.timezone('Europe/Amsterdam')
fmt = '%Y-%m-%dT%H:%M:%S.%f%z'
starttime = amsterdam.localize(datetime.datetime.now())
hshSub['start_time'] = str(starttime.strftime(fmt))
hsh['data'] = hshSub
self._setParameter( hsh)
return 1
def _sendGlobalDictionaryAfter(self):
hsh = {}
hshSub = {}
amsterdam = pytz.timezone('Europe/Amsterdam')
fmt = '%Y-%m-%dT%H:%M:%S.%f%z'
starttime = amsterdam.localize(datetime.datetime.now())
hshSub['end_time'] = str(starttime.strftime(fmt))
hshSub['comments'] = "some comment"
hsh['data'] = hshSub
self._setParameter( hsh)
return 1
def _setParameter(self, hsh):
jsondata = json.dumps( hsh)
self.nexuswriter_device.TheJSONRecord = str(jsondata)
return 1
def _sendRecordToNxWriter(self, hsh):
mstr = json.dumps( hsh)
self.nexuswriter_device.Record(mstr)
return 1
class cscan_zebra_mcs_senv(Macro):
""" Sets default environment variables """
def run(self):
self.setEnv("MCSDevice", "haso107klx:10000/p09/mcs/exp.01")
self.output("Setting MCSDevice to ")
self.setEnv("ZebraDevice", "haso111n:10000/test/zebra/01")
self.output("Setting ZebraDevice to haso111n:10000/test/zebra/01")
self.setEnv("CScanFileName", "test_cscan_output")
self.output("Setting CScanFileName to test_cscan_output")
self.setEnv("NeXusConfigDevice", "haso111tb:10000/test/xmlconfigserver/01")
self.output("Setting NeXusConfigDevice to haso111tb:10000/test/xmlconfigserver/01")
self.setEnv("NeXusWriterDevice", "haso111tb:10000/test/tangodataserver/01")
self.output("Setting NeXusWriterDevice to haso111tb:10000/test/tangodataserver/01")
self.setEnv("CScanID", 0)
self.output("Setting CScanID to 0")
class cscan_zebra_mcs_xia(Macro):
"""Perfoms a continuous scan with the zebra triggering the mcs and the xia"""
param_def = [
['motor', Type.Motor, None, 'Motor to move'],
['start_pos', Type.Float, None, 'Scan start position'],
['final_pos', Type.Float, None, 'Scan final position'],
['nb_triggers', Type.Integer, None, 'Nb of triggers generated by the zebra'],
['trigger_interval', Type.Float, None, 'Time between consecutive triggers'],
['nb_mcs_channels', Type.Integer, None, 'Nb of mcs channels to recorder']
]
result_def = [ [ "result", Type.String, None, "the cscan object" ]]
def run(self, motor, start_pos, final_pos, nb_triggers, trigger_interval, nb_mcs_channels):
zebra_device_name = self.getEnv('ZebraDevice')
mcs_device_name = self.getEnv('MCSDevice')
xia_device_name = self.getEnv('XIADevice')
nexusconfig_device_name = self.getEnv('NeXusConfigDevice') # class XMLConfigServer
nexuswriter_device_name = self.getEnv('NeXusWriterDevice') # class TangoDataServer
zebra_device = PyTango.DeviceProxy(zebra_device_name)
mcs_device = PyTango.DeviceProxy(mcs_device_name)
xia_device = PyTango.DeviceProxy(xia_device_name)
motor_device = PyTango.DeviceProxy(motor.name)
self.nexusconfig_device = PyTango.DeviceProxy(nexusconfig_device_name)
self.nexuswriter_device = PyTango.DeviceProxy(nexuswriter_device_name)
# Set the zebra device
zebra_device.NbTriggers = nb_triggers
zebra_device.TriggerInterval = trigger_interval
# Set MCS
mcs_device.NbAcquisitions = 0 # Continuous mode
mcs_device.NbChannels = nb_mcs_channels
nb_mcs_taken_triggers = nb_triggers
# Clear and setup mcs
mcs_device.ClearSetupMCS()
# Set XIA
xia_device.MappingMode = 1
xia_device.GateMaster = 1
xia_device.NumberMcaChannels = 2048
xia_device.NumMapPixels = nb_triggers
xia_device.NumMapPixelsPerBuffer = -1
xia_device.MaskMapChannels = 1 # change if one wants to work with more XIA channels
nb_xia_channels = xia_device.NumberMcaChannels
xia_device.StartMapping()
# Compute motor slewrate for the continuous scan
old_slewrate = motor_device.Velocity
scan_slewrate = abs((final_pos - start_pos)/motor_device.Step_per_unit)/(trigger_interval * (nb_triggers - 1))
motor_device.Velocity = int(scan_slewrate)
# Move motor to start position minus an offset
pos_offset = 0.1
motor_device.write_attribute( "position", (start_pos - pos_offset))
while motor_device.State() == PyTango.DevState.MOVING:
time.sleep(1)
# Start motor movement to final position (the macro mv can not be used because it blocks)
motor_device.write_attribute( "position", (final_pos + pos_offset))
# Check when the motor is in the start_position for starting the trigger
while(motor_device.position < start_pos):
time.sleep(0.001)
# Open the nexus file for saving data
self._openNxFile()
self._openNxEntry(motor.name, start_pos, final_pos, nb_mcs_channels, nb_mcs_taken_triggers, trigger_interval, nb_xia_channels)
# Start zebra triggering
zebra_device.Arm = 1
# Check when the triggering is done
while zebra_device.State() == PyTango.DevState.MOVING:
time.sleep(1)
# Check that the XIA is done
while xia_device.State() == PyTango.DevState.MOVING:
time.sleep(1)
# Reset motor slewrate
motor_device.Velocity = old_slewrate
# Read MCS
mcs_device.ReadMCS()
# MCS Data
nb_mcs_taken_triggers = mcs_device.AcquiredTriggers
nb_mcs_channels = mcs_device.NbChannels
mcs_data = mcs_device.CountsArray
# XIA data is in the attribute Buffer
time.sleep(1)
xia_buffer = xia_device.Buffer
# Zebra encoder Data
zebra_data = zebra_device.EncoderSpectrum
# Prepare and write the data to the NeXus File
hshMain = {}
hshRecord = {}
hshMain['data'] = hshRecord
for i in range(0, nb_mcs_taken_triggers):
# numpy data are not json serializable. They have to be
# transformed to native types
hshRecord["encoder_pos"] = numpy.asscalar(zebra_data[i])
for j in range(0, nb_mcs_channels):
if j < 9:
data_name = "counts_mcs_ch0" + str(j+1)
else:
data_name = "counts_mcs_ch" + str(j+1)
hshRecord[data_name] = numpy.asscalar(mcs_data[i][j])
xia_spec = []
for j in range (0, xia_spec_length):
xia_spec.append(numpy.asscalar(xia_buffer[i * xia_spec_length + j]))
hshRecord["xia"] = xia_spec
self._sendRecordToNxWriter(hshMain)
# Close NeXus file
self._closeNxEntry()
self._closeNxFile()
result = "None"
return result
def _openNxFile(self):
cscan_id = self.getEnv('CScanID')
fileNameNx = self.getEnv('CScanFileName') + "_" + str(cscan_id) + ".h5"
self.setEnv("CScanID", cscan_id + 1)
self.nexuswriter_device.Init()
self.nexuswriter_device.FileName = str(fileNameNx)
self.nexuswriter_device.OpenFile()
return 1
def _openNxEntry(self, motor_name, start_pos, final_pos, nb_mcs_channels, nb_triggers, trigger_interval, xia_spec_length):
self._sendGlobalDictionaryBefore(motor_name, start_pos, final_pos, nb_triggers, trigger_interval, xia_spec_length)
self.nexusconfig_device.Open()
cmps = self.nexusconfig_device.AvailableComponents()
cmp_list = []
if "zebra" not in cmps:
self.output("_openNxEntry: zebra not in configuration server")
else:
cmp_list.append("zebra")
for i in range (1,nb_mcs_channels + 1):
if i < 10:
cmp_ch = "mcs_ch0" + str(i)
else:
cmp_ch = "mcs_ch" + str(i)
if cmp_ch not in cmps:
self.output("_openNxEntry: %s not in configuration server" % cmp_ch)
else:
cmp_list.append(cmp_ch)
# Add component for the XIA. The Data will be directly readout from the xia tango device when the entry is opened
if "xia" not in cmps:
self.output("_openNxEntry: xia not in configuration server")
else:
cmp_list.append("xia")
self.nexusconfig_device.CreateConfiguration(cmp_list)
xmlconfig = self.nexusconfig_device.XMLString
self.nexuswriter_device.TheXMLSettings = str(xmlconfig)
self.nexuswriter_device.OpenEntry()
return 1
def _closeNxFile(self):
self.nexuswriter_device.CloseFile()
return 1
def _closeNxEntry(self):
self._sendGlobalDictionaryAfter()
self.nexuswriter_device.CloseEntry()
return 1
def _sendGlobalDictionaryBefore(self, motor_name, start_pos, final_pos, nb_triggers, trigger_interval):
hsh = {}
hshSub = {}
hshSub['motor_name'] = str(motor_name)
hshSub['start_pos'] = start_pos
hshSub['final_pos'] = final_pos
hshSub['nb_triggers'] = nb_triggers
hshSub['sample_time'] = trigger_interval
amsterdam = pytz.timezone('Europe/Amsterdam')
fmt = '%Y-%m-%dT%H:%M:%S.%f%z'
starttime = amsterdam.localize(datetime.datetime.now())
hshSub['start_time'] = str(starttime.strftime(fmt))
hsh['data'] = hshSub
xia_fake_spec = [0] * xia_spec_length # faking a xia spectrum for setting the size
hsh['xia'] = xia_fake_spec
self._setParameter( hsh)
return 1
def _sendGlobalDictionaryAfter(self):
hsh = {}
hshSub = {}
amsterdam = pytz.timezone('Europe/Amsterdam')
fmt = '%Y-%m-%dT%H:%M:%S.%f%z'
starttime = amsterdam.localize(datetime.datetime.now())
hshSub['end_time'] = str(starttime.strftime(fmt))
hshSub['comments'] = "some comment"
hsh['data'] = hshSub
self._setParameter( hsh)
return 1
def _setParameter(self, hsh):
jsondata = json.dumps( hsh)
self.nexuswriter_device.TheJSONRecord = str(jsondata)
return 1
def _sendRecordToNxWriter(self, hsh):
mstr = json.dumps( hsh)
self.nexuswriter_device.Record(mstr)
return 1
class cscan_zebra_mcs_xia_senv(Macro):
""" Sets default environment variables """
def run(self):
self.setEnv("MCSDevice", "haso107klx:10000/p09/mcs/exp.01")
self.output("Setting MCSDevice to ")
self.setEnv("ZebraDevice", "haso111n:10000/test/zebra/01")
self.output("Setting ZebraDevice to haso111n:10000/test/zebra/01")
self.setEnv("XIADevice", "hasp029rack:10000/test/xia/01")
self.output("Setting XIADevice to hasp029rack:10000/test/xia/01")
self.setEnv("CScanFileName", "test_cscan_output")
self.output("Setting CScanFileName to test_cscan_output")
self.setEnv("NeXusConfigDevice", "haso111tb:10000/test/xmlconfigserver/01")
self.output("Setting NeXusConfigDevice to haso111tb:10000/test/xmlconfigserver/01")
self.setEnv("NeXusWriterDevice", "haso111tb:10000/test/tangodataserver/01")
self.output("Setting NeXusWriterDevice to haso111tb:10000/test/tangodataserver/01")
self.setEnv("CScanID", 0)
self.output("Setting CScanID to 0")
class cscan_zebra_xia(Macro):
"""Perfoms a continuous scan with the zebra triggering the xia"""
param_def = [
['motor', Type.Motor, None, 'Motor to move'],
['start_pos', Type.Float, None, 'Scan start position'],
['final_pos', Type.Float, None, 'Scan final position'],
['nb_triggers', Type.Integer, None, 'Nb of triggers generated by the zebra'],
['trigger_interval', Type.Float, None, 'Time between consecutive triggers']
]
result_def = [ [ "result", Type.String, None, "the cscan object" ]]
def run(self, motor, start_pos, final_pos, nb_triggers, trigger_interval):
zebra_device_name = self.getEnv('ZebraDevice')
xia_device_name = self.getEnv('XIADevice')
nexusconfig_device_name = self.getEnv('NeXusConfigDevice') # class XMLConfigServer
nexuswriter_device_name = self.getEnv('NeXusWriterDevice') # class TangoDataServer
zebra_device = PyTango.DeviceProxy(zebra_device_name)
xia_device = PyTango.DeviceProxy(xia_device_name)
motor_device = PyTango.DeviceProxy(motor.name)
self.nexusconfig_device = PyTango.DeviceProxy(nexusconfig_device_name)
self.nexuswriter_device = PyTango.DeviceProxy(nexuswriter_device_name)
self._openNxFile()
# Set the zebra device
zebra_device.NbTriggers = nb_triggers
zebra_device.TriggerInterval = trigger_interval
# Set XIA
xia_device.MappingMode = 1
xia_device.GateMaster = 1
xia_device.NumberMcaChannels = 2048
xia_device.NumMapPixels = nb_triggers
xia_device.NumMapPixelsPerBuffer = -1
xia_device.MaskMapChannels = 1 # change if one wants to work with more XIA channels
nb_xia_channels = xia_device.NumberMcaChannels
xia_device.StartMapping()
# Compute motor slewrate for the continuous scan
old_slewrate = motor_device.Velocity
scan_slewrate = abs((final_pos - start_pos)/motor_device.Step_per_unit)/(trigger_interval * (nb_triggers - 1))
motor_device.Velocity = scan_slewrate
# Move motor to start position minus an offset
pos_offset = 0.1
motor_device.write_attribute( "position", (start_pos - pos_offset))
while motor_device.State() == PyTango.DevState.MOVING:
time.sleep(1)
# Start motor movement to final position (the macro mv can not be used because it blocks)
motor_device.write_attribute( "position", (final_pos + pos_offset))
# Check when the motor is in the start_position for starting the trigger
while(motor_device.position < start_pos):
time.sleep(0.001)
# Open the nexus file for saving data
self._openNxFile()
self._openNxEntry(motor.name, start_pos, final_pos, nb_triggers, trigger_interval, nb_xia_channels)
# Start zebra triggering
zebra_device.Arm = 1
# Check when the triggering is done
while zebra_device.State() == PyTango.DevState.MOVING:
time.sleep(1)
# Check that the XIA is done
while xia_device.State() == PyTango.DevState.MOVING:
time.sleep(1)
# Reset motor slewrate
motor_device.Velocity = old_slewrate
# XIA data is in the attribute Buffer
xia_buffer = xia_device.Buffer
# Zebra encoder Data
zebra_data = zebra_device.EncoderSpectrum
# Prepare and write the data to the NeXus File
hshMain = {}
hshRecord = {}
hshMain['data'] = hshRecord
xia_spec_length = xia_device.NumberMcaChannels
for i in range(0, nb_triggers):
# numpy data are not json serializable. They have to be
# transformed to native types
hshRecord["encoder_pos"] = numpy.asscalar(zebra_data[i])
xia_spec = []
for j in range (0, xia_spec_length):
xia_spec.append(numpy.asscalar(xia_buffer[i * xia_spec_length + j]))
hshRecord["xia"] = xia_spec
self._sendRecordToNxWriter(hshMain)
# Close NeXus file
self._closeNxEntry()
self._closeNxFile()
result = "None"
return result
def _openNxFile(self):
cscan_id = self.getEnv('CScanID')
fileNameNx = self.getEnv('CScanFileName') + "_" + str(cscan_id) + ".h5"
self.setEnv("CScanID", cscan_id + 1)
self.nexuswriter_device.Init()
self.nexuswriter_device.FileName = str(fileNameNx)
self.nexuswriter_device.OpenFile()
return 1
def _openNxEntry(self, motor_name, start_pos, final_pos, nb_triggers, trigger_interval, xia_spec_length):
self._sendGlobalDictionaryBefore(motor_name, start_pos, final_pos, nb_triggers, trigger_interval, xia_spec_length)
self.nexusconfig_device.Open()
cmps = self.nexusconfig_device.AvailableComponents()
cmp_list = []
if "zebra" not in cmps:
self.output("_openNxEntry: zebra not in configuration server")
else:
cmp_list.append("zebra")
# Add component for the XIA.
if "xia" not in cmps:
self.output("_openNxEntry: xia not in configuration server")
else:
cmp_list.append("xia")
self.nexusconfig_device.CreateConfiguration(cmp_list)
xmlconfig = self.nexusconfig_device.XMLString
self.nexuswriter_device.TheXMLSettings = str(xmlconfig)
self.nexuswriter_device.OpenEntry()
return 1
def _closeNxFile(self):
self.nexuswriter_device.CloseFile()
return 1
def _closeNxEntry(self):
self._sendGlobalDictionaryAfter()
self.nexuswriter_device.CloseEntry()
return 1
def _sendGlobalDictionaryBefore(self, motor_name, start_pos, final_pos, nb_triggers, trigger_interval, xia_spec_length):
hsh = {}
hshSub = {}
hshSub['motor_name'] = str(motor_name)
hshSub['start_pos'] = start_pos
hshSub['final_pos'] = final_pos
hshSub['nb_triggers'] = nb_triggers
hshSub['sample_time'] = trigger_interval
amsterdam = pytz.timezone('Europe/Amsterdam')
fmt = '%Y-%m-%dT%H:%M:%S.%f%z'
starttime = amsterdam.localize(datetime.datetime.now())
hshSub['start_time'] = str(starttime.strftime(fmt))
hsh['data'] = hshSub
self._setParameter( hsh)
return 1
def _sendGlobalDictionaryAfter(self):
hsh = {}
hshSub = {}
amsterdam = pytz.timezone('Europe/Amsterdam')
fmt = '%Y-%m-%dT%H:%M:%S.%f%z'
starttime = amsterdam.localize(datetime.datetime.now())
hshSub['end_time'] = str(starttime.strftime(fmt))
hshSub['comments'] = "some comment"
hsh['data'] = hshSub
self._setParameter( hsh)
return 1
def _setParameter(self, hsh):
jsondata = json.dumps( hsh)
self.nexuswriter_device.TheJSONRecord = str(jsondata)
return 1
def _sendRecordToNxWriter(self, hsh):
mstr = json.dumps( hsh)
self.nexuswriter_device.Record(mstr)
return 1
class cscan_zebra_xia_senv(Macro):
""" Sets default environment variables """
def run(self):
self.setEnv("ZebraDevice", "haso111n:10000/test/zebra/01")
self.output("Setting ZebraDevice to haso111n:10000/test/zebra/01")
self.setEnv("XIADevice", "hasp029rack:10000/test/xia/01")
self.output("Setting XIADevice to hasp029rack:10000/test/xia/01")
self.setEnv("CScanFileName", "test_cscan_output")
self.output("Setting CScanFileName to test_cscan_output")
self.setEnv("NeXusConfigDevice", "haso111tb:10000/test/xmlconfigserver/01")
self.output("Setting NeXusConfigDevice to haso111tb:10000/test/xmlconfigserver/01")
self.setEnv("NeXusWriterDevice", "haso111tb:10000/test/tangodataserver/01")
self.output("Setting NeXusWriterDevice to haso111tb:10000/test/tangodataserver/01")
self.setEnv("CScanID", 0)
self.output("Setting CScanID to 0")
Notice that mot1.write_attribute( "position", target) has been used to move the motor. We use this syntax and not mot1.position = target because the latter form was partially not supported.