Macros for performing continuos scans taking data with the XIA and/or SIS3820 MCS triggered by the Zebra. The data is saved in NeXus files using XMLConfigServer und TangoDataServer devices. Macros for setting the requiered spock enviroment variables are also provided.
#!/bin/env python """continuous scan macros""" __all__ = ["cscan_zebra_mcs", "cscan_zebra_mcs_xia", "cscan_zebra_xia", "cscan_zebra_mcs_senv", "cscan_zebra_mcs_xia_senv", "cscan_zebra_xia_senv"] import PyTango from sardana.macroserver.macro import * from sardana.macroserver.macro import macro import json import time import pytz import datetime import numpy class cscan_zebra_mcs(Macro): """Perfoms a continuous scan with the zebra triggering the mcs""" param_def = [ ['motor', Type.Motor, None, 'Motor to move'], ['start_pos', Type.Float, None, 'Scan start position'], ['final_pos', Type.Float, None, 'Scan final position'], ['nb_triggers', Type.Integer, None, 'Nb of triggers generated by the zebra'], ['trigger_interval', Type.Float, None, 'Time between consecutive triggers'], ['nb_mcs_channels', Type.Integer, None, 'Nb of mcs channels to recorder'] ] result_def = [ [ "result", Type.String, None, "the cscan object" ]] def run(self, motor, start_pos, final_pos, nb_triggers, trigger_interval, nb_mcs_channels): zebra_device_name = self.getEnv('ZebraDevice') mcs_device_name = self.getEnv('MCSDevice') nexusconfig_device_name = self.getEnv('NeXusConfigDevice') # class XMLConfigServer nexuswriter_device_name = self.getEnv('NeXusWriterDevice') # class TangoDataServer zebra_device = PyTango.DeviceProxy(zebra_device_name) mcs_device = PyTango.DeviceProxy(mcs_device_name) motor_device = PyTango.DeviceProxy(motor.name) self.nexusconfig_device = PyTango.DeviceProxy(nexusconfig_device_name) self.nexuswriter_device = PyTango.DeviceProxy(nexuswriter_device_name) # Set the zebra device zebra_device.NbTriggers = nb_triggers zebra_device.TriggerInterval = trigger_interval # Set MCS mcs_device.NbAcquisitions = 0 # Continuous mode mcs_device.NbChannels = nb_mcs_channels nb_mcs_taken_triggers = nb_triggers # Clear and setup mcs mcs_device.ClearSetupMCS() # Compute motor slewrate for the continuous scan old_slewrate = motor_device.Velocity scan_slewrate = abs((final_pos - start_pos)/motor_device.Step_per_unit)/(trigger_interval * (nb_triggers - 1)) motor_device.Velocity = int(scan_slewrate) # Move motor to start position minus an offset pos_offset = 0.1 motor_device.write_attribute( "position", (start_pos - pos_offset)) while motor_device.State() == PyTango.DevState.MOVING: time.sleep(1) # Start motor movement to final position (the macro mv can not be used because it blocks) motor_device.write_attribute( "position", (final_pos + pos_offset)) # Check when the motor is in the start_position for starting the trigger while(motor_device.position < start_pos): time.sleep(0.001) # Open the nexus file for saving data self._openNxFile() self._openNxEntry(motor.name, start_pos, final_pos, nb_mcs_channels, nb_mcs_taken_triggers, trigger_interval) # Start zebra triggering zebra_device.Arm = 1 # Check when the triggering is done while zebra_device.State() == PyTango.DevState.MOVING: time.sleep(1) # Reset motor slewrate motor_device.Velocity = old_slewrate # Read MCS mcs_device.ReadMCS() # MCS Data mcs_data = mcs_device.CountsArray # Zebra encoder Data zebra_data = zebra_device.EncoderSpectrum # Prepare and write the data to the NeXus File hshMain = {} hshRecord = {} hshMain['data'] = hshRecord for i in range(0, nb_mcs_taken_triggers): # numpy data are not json serializable. They have to be # transformed to native types hshRecord["encoder_pos"] = numpy.asscalar(zebra_data[i]) for j in range(0, nb_mcs_channels): if j < 9: data_name = "counts_mcs_ch0" + str(j+1) else: data_name = "counts_mcs_ch" + str(j+1) hshRecord[data_name] = numpy.asscalar(mcs_data[i][j]) self._sendRecordToNxWriter(hshMain) # Close NeXus file self._closeNxEntry() self._closeNxFile() result = "None" return result def _openNxFile(self): cscan_id = self.getEnv('CScanID') fileNameNx = self.getEnv('CScanFileName') + "_" + str(cscan_id) + ".h5" self.setEnv("CScanID", cscan_id + 1) self.nexuswriter_device.Init() self.nexuswriter_device.FileName = str(fileNameNx) self.nexuswriter_device.OpenFile() return 1 def _openNxEntry(self, motor_name, start_pos, final_pos, nb_mcs_channels, nb_triggers, trigger_interval): self._sendGlobalDictionaryBefore(motor_name, start_pos, final_pos, nb_triggers, trigger_interval) self.nexusconfig_device.Open() cmps = self.nexusconfig_device.AvailableComponents() cmp_list = [] if "zebra_mcs" not in cmps: self.output("_openNxEntry: zebra_mcs not in configuration server") else: cmp_list.append("zebra_mcs") for i in range (1,nb_mcs_channels + 1): if i < 10: cmp_ch = "mcs_ch0" + str(i) else: cmp_ch = "mcs_ch" + str(i) if cmp_ch not in cmps: self.output("_openNxEntry: %s not in configuration server" % cmp_ch) else: cmp_list.append(cmp_ch) self.nexusconfig_device.CreateConfiguration(cmp_list) xmlconfig = self.nexusconfig_device.XMLString self.nexuswriter_device.TheXMLSettings = str(xmlconfig) self.nexuswriter_device.OpenEntry() return 1 def _closeNxFile(self): self.nexuswriter_device.CloseFile() return 1 def _closeNxEntry(self): self._sendGlobalDictionaryAfter() self.nexuswriter_device.CloseEntry() return 1 def _sendGlobalDictionaryBefore(self, motor_name, start_pos, final_pos, nb_triggers, trigger_interval): hsh = {} hshSub = {} hshSub['motor_name'] = str(motor_name) hshSub['start_pos'] = start_pos hshSub['final_pos'] = final_pos hshSub['nb_triggers'] = nb_triggers hshSub['sample_time'] = trigger_interval amsterdam = pytz.timezone('Europe/Amsterdam') fmt = '%Y-%m-%dT%H:%M:%S.%f%z' starttime = amsterdam.localize(datetime.datetime.now()) hshSub['start_time'] = str(starttime.strftime(fmt)) hsh['data'] = hshSub self._setParameter( hsh) return 1 def _sendGlobalDictionaryAfter(self): hsh = {} hshSub = {} amsterdam = pytz.timezone('Europe/Amsterdam') fmt = '%Y-%m-%dT%H:%M:%S.%f%z' starttime = amsterdam.localize(datetime.datetime.now()) hshSub['end_time'] = str(starttime.strftime(fmt)) hshSub['comments'] = "some comment" hsh['data'] = hshSub self._setParameter( hsh) return 1 def _setParameter(self, hsh): jsondata = json.dumps( hsh) self.nexuswriter_device.TheJSONRecord = str(jsondata) return 1 def _sendRecordToNxWriter(self, hsh): mstr = json.dumps( hsh) self.nexuswriter_device.Record(mstr) return 1 class cscan_zebra_mcs_senv(Macro): """ Sets default environment variables """ def run(self): self.setEnv("MCSDevice", "haso107klx:10000/p09/mcs/exp.01") self.output("Setting MCSDevice to ") self.setEnv("ZebraDevice", "haso111n:10000/test/zebra/01") self.output("Setting ZebraDevice to haso111n:10000/test/zebra/01") self.setEnv("CScanFileName", "test_cscan_output") self.output("Setting CScanFileName to test_cscan_output") self.setEnv("NeXusConfigDevice", "haso111tb:10000/test/xmlconfigserver/01") self.output("Setting NeXusConfigDevice to haso111tb:10000/test/xmlconfigserver/01") self.setEnv("NeXusWriterDevice", "haso111tb:10000/test/tangodataserver/01") self.output("Setting NeXusWriterDevice to haso111tb:10000/test/tangodataserver/01") self.setEnv("CScanID", 0) self.output("Setting CScanID to 0") class cscan_zebra_mcs_xia(Macro): """Perfoms a continuous scan with the zebra triggering the mcs and the xia""" param_def = [ ['motor', Type.Motor, None, 'Motor to move'], ['start_pos', Type.Float, None, 'Scan start position'], ['final_pos', Type.Float, None, 'Scan final position'], ['nb_triggers', Type.Integer, None, 'Nb of triggers generated by the zebra'], ['trigger_interval', Type.Float, None, 'Time between consecutive triggers'], ['nb_mcs_channels', Type.Integer, None, 'Nb of mcs channels to recorder'] ] result_def = [ [ "result", Type.String, None, "the cscan object" ]] def run(self, motor, start_pos, final_pos, nb_triggers, trigger_interval, nb_mcs_channels): zebra_device_name = self.getEnv('ZebraDevice') mcs_device_name = self.getEnv('MCSDevice') xia_device_name = self.getEnv('XIADevice') nexusconfig_device_name = self.getEnv('NeXusConfigDevice') # class XMLConfigServer nexuswriter_device_name = self.getEnv('NeXusWriterDevice') # class TangoDataServer zebra_device = PyTango.DeviceProxy(zebra_device_name) mcs_device = PyTango.DeviceProxy(mcs_device_name) xia_device = PyTango.DeviceProxy(xia_device_name) motor_device = PyTango.DeviceProxy(motor.name) self.nexusconfig_device = PyTango.DeviceProxy(nexusconfig_device_name) self.nexuswriter_device = PyTango.DeviceProxy(nexuswriter_device_name) # Set the zebra device zebra_device.NbTriggers = nb_triggers zebra_device.TriggerInterval = trigger_interval # Set MCS mcs_device.NbAcquisitions = 0 # Continuous mode mcs_device.NbChannels = nb_mcs_channels nb_mcs_taken_triggers = nb_triggers # Clear and setup mcs mcs_device.ClearSetupMCS() # Set XIA xia_device.MappingMode = 1 xia_device.GateMaster = 1 xia_device.NumberMcaChannels = 2048 xia_device.NumMapPixels = nb_triggers xia_device.NumMapPixelsPerBuffer = -1 xia_device.MaskMapChannels = 1 # change if one wants to work with more XIA channels nb_xia_channels = xia_device.NumberMcaChannels xia_device.StartMapping() # Compute motor slewrate for the continuous scan old_slewrate = motor_device.Velocity scan_slewrate = abs((final_pos - start_pos)/motor_device.Step_per_unit)/(trigger_interval * (nb_triggers - 1)) motor_device.Velocity = int(scan_slewrate) # Move motor to start position minus an offset pos_offset = 0.1 motor_device.write_attribute( "position", (start_pos - pos_offset)) while motor_device.State() == PyTango.DevState.MOVING: time.sleep(1) # Start motor movement to final position (the macro mv can not be used because it blocks) motor_device.write_attribute( "position", (final_pos + pos_offset)) # Check when the motor is in the start_position for starting the trigger while(motor_device.position < start_pos): time.sleep(0.001) # Open the nexus file for saving data self._openNxFile() self._openNxEntry(motor.name, start_pos, final_pos, nb_mcs_channels, nb_mcs_taken_triggers, trigger_interval, nb_xia_channels) # Start zebra triggering zebra_device.Arm = 1 # Check when the triggering is done while zebra_device.State() == PyTango.DevState.MOVING: time.sleep(1) # Check that the XIA is done while xia_device.State() == PyTango.DevState.MOVING: time.sleep(1) # Reset motor slewrate motor_device.Velocity = old_slewrate # Read MCS mcs_device.ReadMCS() # MCS Data nb_mcs_taken_triggers = mcs_device.AcquiredTriggers nb_mcs_channels = mcs_device.NbChannels mcs_data = mcs_device.CountsArray # XIA data is in the attribute Buffer time.sleep(1) xia_buffer = xia_device.Buffer # Zebra encoder Data zebra_data = zebra_device.EncoderSpectrum # Prepare and write the data to the NeXus File hshMain = {} hshRecord = {} hshMain['data'] = hshRecord for i in range(0, nb_mcs_taken_triggers): # numpy data are not json serializable. They have to be # transformed to native types hshRecord["encoder_pos"] = numpy.asscalar(zebra_data[i]) for j in range(0, nb_mcs_channels): if j < 9: data_name = "counts_mcs_ch0" + str(j+1) else: data_name = "counts_mcs_ch" + str(j+1) hshRecord[data_name] = numpy.asscalar(mcs_data[i][j]) xia_spec = [] for j in range (0, xia_spec_length): xia_spec.append(numpy.asscalar(xia_buffer[i * xia_spec_length + j])) hshRecord["xia"] = xia_spec self._sendRecordToNxWriter(hshMain) # Close NeXus file self._closeNxEntry() self._closeNxFile() result = "None" return result def _openNxFile(self): cscan_id = self.getEnv('CScanID') fileNameNx = self.getEnv('CScanFileName') + "_" + str(cscan_id) + ".h5" self.setEnv("CScanID", cscan_id + 1) self.nexuswriter_device.Init() self.nexuswriter_device.FileName = str(fileNameNx) self.nexuswriter_device.OpenFile() return 1 def _openNxEntry(self, motor_name, start_pos, final_pos, nb_mcs_channels, nb_triggers, trigger_interval, xia_spec_length): self._sendGlobalDictionaryBefore(motor_name, start_pos, final_pos, nb_triggers, trigger_interval, xia_spec_length) self.nexusconfig_device.Open() cmps = self.nexusconfig_device.AvailableComponents() cmp_list = [] if "zebra" not in cmps: self.output("_openNxEntry: zebra not in configuration server") else: cmp_list.append("zebra") for i in range (1,nb_mcs_channels + 1): if i < 10: cmp_ch = "mcs_ch0" + str(i) else: cmp_ch = "mcs_ch" + str(i) if cmp_ch not in cmps: self.output("_openNxEntry: %s not in configuration server" % cmp_ch) else: cmp_list.append(cmp_ch) # Add component for the XIA. The Data will be directly readout from the xia tango device when the entry is opened if "xia" not in cmps: self.output("_openNxEntry: xia not in configuration server") else: cmp_list.append("xia") self.nexusconfig_device.CreateConfiguration(cmp_list) xmlconfig = self.nexusconfig_device.XMLString self.nexuswriter_device.TheXMLSettings = str(xmlconfig) self.nexuswriter_device.OpenEntry() return 1 def _closeNxFile(self): self.nexuswriter_device.CloseFile() return 1 def _closeNxEntry(self): self._sendGlobalDictionaryAfter() self.nexuswriter_device.CloseEntry() return 1 def _sendGlobalDictionaryBefore(self, motor_name, start_pos, final_pos, nb_triggers, trigger_interval): hsh = {} hshSub = {} hshSub['motor_name'] = str(motor_name) hshSub['start_pos'] = start_pos hshSub['final_pos'] = final_pos hshSub['nb_triggers'] = nb_triggers hshSub['sample_time'] = trigger_interval amsterdam = pytz.timezone('Europe/Amsterdam') fmt = '%Y-%m-%dT%H:%M:%S.%f%z' starttime = amsterdam.localize(datetime.datetime.now()) hshSub['start_time'] = str(starttime.strftime(fmt)) hsh['data'] = hshSub xia_fake_spec = [0] * xia_spec_length # faking a xia spectrum for setting the size hsh['xia'] = xia_fake_spec self._setParameter( hsh) return 1 def _sendGlobalDictionaryAfter(self): hsh = {} hshSub = {} amsterdam = pytz.timezone('Europe/Amsterdam') fmt = '%Y-%m-%dT%H:%M:%S.%f%z' starttime = amsterdam.localize(datetime.datetime.now()) hshSub['end_time'] = str(starttime.strftime(fmt)) hshSub['comments'] = "some comment" hsh['data'] = hshSub self._setParameter( hsh) return 1 def _setParameter(self, hsh): jsondata = json.dumps( hsh) self.nexuswriter_device.TheJSONRecord = str(jsondata) return 1 def _sendRecordToNxWriter(self, hsh): mstr = json.dumps( hsh) self.nexuswriter_device.Record(mstr) return 1 class cscan_zebra_mcs_xia_senv(Macro): """ Sets default environment variables """ def run(self): self.setEnv("MCSDevice", "haso107klx:10000/p09/mcs/exp.01") self.output("Setting MCSDevice to ") self.setEnv("ZebraDevice", "haso111n:10000/test/zebra/01") self.output("Setting ZebraDevice to haso111n:10000/test/zebra/01") self.setEnv("XIADevice", "hasp029rack:10000/test/xia/01") self.output("Setting XIADevice to hasp029rack:10000/test/xia/01") self.setEnv("CScanFileName", "test_cscan_output") self.output("Setting CScanFileName to test_cscan_output") self.setEnv("NeXusConfigDevice", "haso111tb:10000/test/xmlconfigserver/01") self.output("Setting NeXusConfigDevice to haso111tb:10000/test/xmlconfigserver/01") self.setEnv("NeXusWriterDevice", "haso111tb:10000/test/tangodataserver/01") self.output("Setting NeXusWriterDevice to haso111tb:10000/test/tangodataserver/01") self.setEnv("CScanID", 0) self.output("Setting CScanID to 0") class cscan_zebra_xia(Macro): """Perfoms a continuous scan with the zebra triggering the xia""" param_def = [ ['motor', Type.Motor, None, 'Motor to move'], ['start_pos', Type.Float, None, 'Scan start position'], ['final_pos', Type.Float, None, 'Scan final position'], ['nb_triggers', Type.Integer, None, 'Nb of triggers generated by the zebra'], ['trigger_interval', Type.Float, None, 'Time between consecutive triggers'] ] result_def = [ [ "result", Type.String, None, "the cscan object" ]] def run(self, motor, start_pos, final_pos, nb_triggers, trigger_interval): zebra_device_name = self.getEnv('ZebraDevice') xia_device_name = self.getEnv('XIADevice') nexusconfig_device_name = self.getEnv('NeXusConfigDevice') # class XMLConfigServer nexuswriter_device_name = self.getEnv('NeXusWriterDevice') # class TangoDataServer zebra_device = PyTango.DeviceProxy(zebra_device_name) xia_device = PyTango.DeviceProxy(xia_device_name) motor_device = PyTango.DeviceProxy(motor.name) self.nexusconfig_device = PyTango.DeviceProxy(nexusconfig_device_name) self.nexuswriter_device = PyTango.DeviceProxy(nexuswriter_device_name) self._openNxFile() # Set the zebra device zebra_device.NbTriggers = nb_triggers zebra_device.TriggerInterval = trigger_interval # Set XIA xia_device.MappingMode = 1 xia_device.GateMaster = 1 xia_device.NumberMcaChannels = 2048 xia_device.NumMapPixels = nb_triggers xia_device.NumMapPixelsPerBuffer = -1 xia_device.MaskMapChannels = 1 # change if one wants to work with more XIA channels nb_xia_channels = xia_device.NumberMcaChannels xia_device.StartMapping() # Compute motor slewrate for the continuous scan old_slewrate = motor_device.Velocity scan_slewrate = abs((final_pos - start_pos)/motor_device.Step_per_unit)/(trigger_interval * (nb_triggers - 1)) motor_device.Velocity = scan_slewrate # Move motor to start position minus an offset pos_offset = 0.1 motor_device.write_attribute( "position", (start_pos - pos_offset)) while motor_device.State() == PyTango.DevState.MOVING: time.sleep(1) # Start motor movement to final position (the macro mv can not be used because it blocks) motor_device.write_attribute( "position", (final_pos + pos_offset)) # Check when the motor is in the start_position for starting the trigger while(motor_device.position < start_pos): time.sleep(0.001) # Open the nexus file for saving data self._openNxFile() self._openNxEntry(motor.name, start_pos, final_pos, nb_triggers, trigger_interval, nb_xia_channels) # Start zebra triggering zebra_device.Arm = 1 # Check when the triggering is done while zebra_device.State() == PyTango.DevState.MOVING: time.sleep(1) # Check that the XIA is done while xia_device.State() == PyTango.DevState.MOVING: time.sleep(1) # Reset motor slewrate motor_device.Velocity = old_slewrate # XIA data is in the attribute Buffer xia_buffer = xia_device.Buffer # Zebra encoder Data zebra_data = zebra_device.EncoderSpectrum # Prepare and write the data to the NeXus File hshMain = {} hshRecord = {} hshMain['data'] = hshRecord xia_spec_length = xia_device.NumberMcaChannels for i in range(0, nb_triggers): # numpy data are not json serializable. They have to be # transformed to native types hshRecord["encoder_pos"] = numpy.asscalar(zebra_data[i]) xia_spec = [] for j in range (0, xia_spec_length): xia_spec.append(numpy.asscalar(xia_buffer[i * xia_spec_length + j])) hshRecord["xia"] = xia_spec self._sendRecordToNxWriter(hshMain) # Close NeXus file self._closeNxEntry() self._closeNxFile() result = "None" return result def _openNxFile(self): cscan_id = self.getEnv('CScanID') fileNameNx = self.getEnv('CScanFileName') + "_" + str(cscan_id) + ".h5" self.setEnv("CScanID", cscan_id + 1) self.nexuswriter_device.Init() self.nexuswriter_device.FileName = str(fileNameNx) self.nexuswriter_device.OpenFile() return 1 def _openNxEntry(self, motor_name, start_pos, final_pos, nb_triggers, trigger_interval, xia_spec_length): self._sendGlobalDictionaryBefore(motor_name, start_pos, final_pos, nb_triggers, trigger_interval, xia_spec_length) self.nexusconfig_device.Open() cmps = self.nexusconfig_device.AvailableComponents() cmp_list = [] if "zebra" not in cmps: self.output("_openNxEntry: zebra not in configuration server") else: cmp_list.append("zebra") # Add component for the XIA. if "xia" not in cmps: self.output("_openNxEntry: xia not in configuration server") else: cmp_list.append("xia") self.nexusconfig_device.CreateConfiguration(cmp_list) xmlconfig = self.nexusconfig_device.XMLString self.nexuswriter_device.TheXMLSettings = str(xmlconfig) self.nexuswriter_device.OpenEntry() return 1 def _closeNxFile(self): self.nexuswriter_device.CloseFile() return 1 def _closeNxEntry(self): self._sendGlobalDictionaryAfter() self.nexuswriter_device.CloseEntry() return 1 def _sendGlobalDictionaryBefore(self, motor_name, start_pos, final_pos, nb_triggers, trigger_interval, xia_spec_length): hsh = {} hshSub = {} hshSub['motor_name'] = str(motor_name) hshSub['start_pos'] = start_pos hshSub['final_pos'] = final_pos hshSub['nb_triggers'] = nb_triggers hshSub['sample_time'] = trigger_interval amsterdam = pytz.timezone('Europe/Amsterdam') fmt = '%Y-%m-%dT%H:%M:%S.%f%z' starttime = amsterdam.localize(datetime.datetime.now()) hshSub['start_time'] = str(starttime.strftime(fmt)) hsh['data'] = hshSub self._setParameter( hsh) return 1 def _sendGlobalDictionaryAfter(self): hsh = {} hshSub = {} amsterdam = pytz.timezone('Europe/Amsterdam') fmt = '%Y-%m-%dT%H:%M:%S.%f%z' starttime = amsterdam.localize(datetime.datetime.now()) hshSub['end_time'] = str(starttime.strftime(fmt)) hshSub['comments'] = "some comment" hsh['data'] = hshSub self._setParameter( hsh) return 1 def _setParameter(self, hsh): jsondata = json.dumps( hsh) self.nexuswriter_device.TheJSONRecord = str(jsondata) return 1 def _sendRecordToNxWriter(self, hsh): mstr = json.dumps( hsh) self.nexuswriter_device.Record(mstr) return 1 class cscan_zebra_xia_senv(Macro): """ Sets default environment variables """ def run(self): self.setEnv("ZebraDevice", "haso111n:10000/test/zebra/01") self.output("Setting ZebraDevice to haso111n:10000/test/zebra/01") self.setEnv("XIADevice", "hasp029rack:10000/test/xia/01") self.output("Setting XIADevice to hasp029rack:10000/test/xia/01") self.setEnv("CScanFileName", "test_cscan_output") self.output("Setting CScanFileName to test_cscan_output") self.setEnv("NeXusConfigDevice", "haso111tb:10000/test/xmlconfigserver/01") self.output("Setting NeXusConfigDevice to haso111tb:10000/test/xmlconfigserver/01") self.setEnv("NeXusWriterDevice", "haso111tb:10000/test/tangodataserver/01") self.output("Setting NeXusWriterDevice to haso111tb:10000/test/tangodataserver/01") self.setEnv("CScanID", 0) self.output("Setting CScanID to 0")
Notice that mot1.write_attribute( "position", target) has been used to move the motor. We use this syntax and not mot1.position = target because the latter form was partially not supported.