We can extend the previous script by Dynamic components for Sardana channels selected by Component Selector1.5
#!/usr/bin/env python """ Client for NeXus Writer """ import PyTango import json from datetime import datetime from pytz import timezone import numpy import pprint import HasyUtils class ScanComp(object): ## constructor def __init__(self, selector): # Selector Server self.selector = PyTango.DeviceProxy(selector) # NeXus Writer self.writer = PyTango.DeviceProxy(self.selector.WriterDevice) self.writer.set_timeout_millis(30000) # Selected Channels # datasources = self.selector.DataSources self.appendEntry = self.selector.AppendEntry # user data self.datarecord = self.selector.UserData self.berlin = timezone('Europe/Berlin') self.fmt = '%Y-%m-%dT%H:%M:%S.%f%z' # name of a dynamic component self.dynamicCP = "__dynamic_component__" self.mcl = {"INIT": {}, "STEP": {}, "FINAL": {}} self.globalDct = {} self.components = [] self.proxies = {} self.clientSources = [] self.attributeNames = {} self.configurationXML = "" self.attrsToCheck = ["Value", "Position", "Counts", "Data", "Voltage", "Energy", "SampleTime"] self.tTn = {PyTango.DevLong64: "NX_INT64", PyTango.DevLong: "NX_INT32", PyTango.DevShort: "NX_INT16", PyTango.DevUChar: "NX_UINT8", PyTango.DevULong64: "NX_UINT64", PyTango.DevULong: "NX_UINT32", PyTango.DevUShort: "NX_UINT16", PyTango.DevDouble: "NX_FLOAT64", PyTango.DevFloat: "NX_FLOAT32", PyTango.DevString: "NX_CHAR", PyTango.DevBoolean: "NX_BOOLEAN"} def openFile(self, nxsfile): # open the file self.writer.Init() self.writer.FileName = nxsfile self.writer.OpenFile() def getClientSources(self): return self.clientSources # # mostly "Value", but ... # def findAttributeName(self, proxy): result = None for at in self.attrsToCheck: if hasattr(proxy, at): result = at break return result @classmethod def isTangoDevice(cls, devName): try: dp = PyTango.DeviceProxy(str(devName)) dp.ping() return dp except: print devName, " is not Tango device" return None def getShapeType(self, source): vl = None shp = [] dt = None ap = PyTango.AttributeProxy(source) da = None ac = None try: ac = ap.get_config() if ac.data_format != PyTango.AttrDataFormat.SCALAR: da = ap.read() vl = da.value except Exception: if ac and ac.data_format != PyTango.AttrDataFormat.SCALAR \ and da is None: raise if vl is not None: shp = list(numpy.shape(vl)) elif ac is not None: if ac.data_format != PyTango.AttrDataFormat.SCALAR: if da.dim_x and da.dim_x > 1: shp = [da.dim_y, da.dim_x] \ if da.dim_y \ else [da.dim_x] if ac is not None: dt = self.tTn[ac.data_type] return (shp, dt) def fetchClientSources(self): self.clientSources = json.loads( self.selector.componentClientSources([])) dynClientSources = json.loads( self.selector.componentClientSources([self.dynamicCP])) \ if self.dynamicCP else [] self.clientSources.extend(dynClientSources) for elm in self.clientSources: print pprint.pformat(elm, indent=1) self.mcl = {"INIT": {}, "STEP": {}, "FINAL": {}} shapes = json.loads(self.selector.channelProperties("shape")) types = json.loads(self.selector.channelProperties("data_type")) for cs in self.clientSources: self.mcl[cs["strategy"]][cs["record"]] = cs["dsname"] dp = self.isTangoDevice(cs["record"]) self.proxies[cs["record"]] = dp if dp: self.attributeNames[cs["record"]] = self.findAttributeName(dp) if self.attributeNames[cs["record"]]: source = "%s:%s/%s/%s" % ( dp.get_db_host(), dp.get_db_port(), dp.name(), self.attributeNames[cs["record"]]) shape, nxstype = self.getShapeType(source) print "Source:", source, shape, nxstype if shape is not None: shapes[cs["dsname"]] = shape if nxstype is not None: types[cs["dsname"]] = nxstype self.selector.setChannelProperties(["shape", json.dumps(shapes)]) self.selector.setChannelProperties(["data_type", json.dumps(types)]) def createConfiguration(self, motors=None): # Selected components self.components = list(self.selector.Components or []) # # dynamicCP contains all dynamic data sources # scandatasources = self.selector.selectedDataSources() or [] if motors: self.selector.stepdatasources = json.dumps(motors) scandatasources.extend(motors) self.dynamicCP = str(self.selector.createDynamicComponent([ json.dumps(scandatasources)])) self.fetchClientSources() self.selector.RemoveDynamicComponent(self.dynamicCP) self.dynamicCP = str(self.selector.createDynamicComponent([ json.dumps(scandatasources)])) self.selector.updateConfigVariables() if self.dynamicCP: self.components.append(self.dynamicCP) self.configurationXML = str( self.selector.CreateWriterConfiguration(self.components)) self.selector.RemoveDynamicComponent(self.dynamicCP) return def openEntry(self, data=None, motors=None): self.createConfiguration(motors=motors) self.writer.XMLSettings = self.configurationXML self.globalDct = json.loads(self.datarecord) # get start_time starttime = self.berlin.localize(datetime.now()) self.globalDct["start_time"] = str( starttime.strftime(self.fmt)) if isinstance(data, dict): self.globalDct.update(data) missing = list( set(self.mcl['INIT'].keys()) - set(self.globalDct.keys())) if missing: raise Exception("Missing INIT CLIENT data: %s" % missing) self.writer.JSONRecord = json.dumps({"data": self.globalDct}) self.writer.OpenEntry() def execStep(self, data=None): localDct = data if isinstance(data, dict) else {} missing = list( (set(self.mcl['STEP'].keys()) - set(self.globalDct.keys()) - set(localDct.keys()))) # if the record name is Tango device we can try to read its Value for dv in missing: if self.proxies[str(dv)]: value = self.proxies[str(dv)].read_attribute( self.attributeNames[str(dv)]).value if hasattr(value, "tolist"): value = value.tolist() localDct[dv] = value else: localDct[dv] = "SomeValue" missing = list( (set(self.mcl['STEP'].keys())) - set(localDct.keys()) - set(self.globalDct.keys())) if missing: raise Exception("Missing STEP CLIENT data: %s" % missing) # print("RECORD: %s" % localDct) print pprint.pformat(localDct, indent=1, depth=1) self.writer.Record(json.dumps({"data": localDct})) def closeEntry(self, data=None): endtime = self.berlin.localize(datetime.now()) self.globalDct["end_time"] = str(endtime.strftime(self.fmt)) if isinstance(data, dict): self.globalDct.update(data) missing = list( set(self.mcl['FINAL'].keys()) - set(self.globalDct.keys())) if missing: raise Exception("Missing FINAL CLIENT data: %s" % missing) self.writer.JSONRecord = json.dumps({"data": self.globalDct}) self.writer.CloseEntry() def closeFile(self): self.writer.CloseFile() if __name__ == '__main__': scanmotors = ["exp_mot04", "exp_mot17"] scan = ScanComp("p09/nxsrecselector/hastodt") if scan.appendEntry: scan.openFile("/tmp/hasylab.nxs") else: scan.openFile(HasyUtils.createScanName("/tmp/hasylab") + ".nxs") scan.openEntry(motors=scanmotors) for el in scan.getClientSources(): print "client sources", pprint.pformat(el, indent=1) ## experimental loop for li in range(3): scan.execStep({"point_nb": li}) scan.closeEntry() scan.closeFile()