We can extend the previous script by Dynamic components for Sardana channels selected by Component Selector1.5
#!/usr/bin/env python
""" Client for NeXus Writer """
import PyTango
import json
from datetime import datetime
from pytz import timezone
import numpy
import pprint
import HasyUtils
class ScanComp(object):
## constructor
def __init__(self, selector):
# Selector Server
self.selector = PyTango.DeviceProxy(selector)
# NeXus Writer
self.writer = PyTango.DeviceProxy(self.selector.WriterDevice)
self.writer.set_timeout_millis(30000)
# Selected Channels
# datasources = self.selector.DataSources
self.appendEntry = self.selector.AppendEntry
# user data
self.datarecord = self.selector.UserData
self.berlin = timezone('Europe/Berlin')
self.fmt = '%Y-%m-%dT%H:%M:%S.%f%z'
# name of a dynamic component
self.dynamicCP = "__dynamic_component__"
self.mcl = {"INIT": {}, "STEP": {}, "FINAL": {}}
self.globalDct = {}
self.components = []
self.proxies = {}
self.clientSources = []
self.attributeNames = {}
self.configurationXML = ""
self.attrsToCheck = ["Value", "Position", "Counts", "Data",
"Voltage", "Energy", "SampleTime"]
self.tTn = {PyTango.DevLong64: "NX_INT64",
PyTango.DevLong: "NX_INT32",
PyTango.DevShort: "NX_INT16",
PyTango.DevUChar: "NX_UINT8",
PyTango.DevULong64: "NX_UINT64",
PyTango.DevULong: "NX_UINT32",
PyTango.DevUShort: "NX_UINT16",
PyTango.DevDouble: "NX_FLOAT64",
PyTango.DevFloat: "NX_FLOAT32",
PyTango.DevString: "NX_CHAR",
PyTango.DevBoolean: "NX_BOOLEAN"}
def openFile(self, nxsfile):
# open the file
self.writer.Init()
self.writer.FileName = nxsfile
self.writer.OpenFile()
def getClientSources(self):
return self.clientSources
#
# mostly "Value", but ...
#
def findAttributeName(self, proxy):
result = None
for at in self.attrsToCheck:
if hasattr(proxy, at):
result = at
break
return result
@classmethod
def isTangoDevice(cls, devName):
try:
dp = PyTango.DeviceProxy(str(devName))
dp.ping()
return dp
except:
print devName, " is not Tango device"
return None
def getShapeType(self, source):
vl = None
shp = []
dt = None
ap = PyTango.AttributeProxy(source)
da = None
ac = None
try:
ac = ap.get_config()
if ac.data_format != PyTango.AttrDataFormat.SCALAR:
da = ap.read()
vl = da.value
except Exception:
if ac and ac.data_format != PyTango.AttrDataFormat.SCALAR \
and da is None:
raise
if vl is not None:
shp = list(numpy.shape(vl))
elif ac is not None:
if ac.data_format != PyTango.AttrDataFormat.SCALAR:
if da.dim_x and da.dim_x > 1:
shp = [da.dim_y, da.dim_x] \
if da.dim_y \
else [da.dim_x]
if ac is not None:
dt = self.tTn[ac.data_type]
return (shp, dt)
def fetchClientSources(self):
self.clientSources = json.loads(
self.selector.componentClientSources([]))
dynClientSources = json.loads(
self.selector.componentClientSources([self.dynamicCP])) \
if self.dynamicCP else []
self.clientSources.extend(dynClientSources)
for elm in self.clientSources:
print pprint.pformat(elm, indent=1)
self.mcl = {"INIT": {}, "STEP": {}, "FINAL": {}}
shapes = json.loads(self.selector.channelProperties("shape"))
types = json.loads(self.selector.channelProperties("data_type"))
for cs in self.clientSources:
self.mcl[cs["strategy"]][cs["record"]] = cs["dsname"]
dp = self.isTangoDevice(cs["record"])
self.proxies[cs["record"]] = dp
if dp:
self.attributeNames[cs["record"]] = self.findAttributeName(dp)
if self.attributeNames[cs["record"]]:
source = "%s:%s/%s/%s" % (
dp.get_db_host(), dp.get_db_port(),
dp.name(), self.attributeNames[cs["record"]])
shape, nxstype = self.getShapeType(source)
print "Source:", source, shape, nxstype
if shape is not None:
shapes[cs["dsname"]] = shape
if nxstype is not None:
types[cs["dsname"]] = nxstype
self.selector.setChannelProperties(["shape", json.dumps(shapes)])
self.selector.setChannelProperties(["data_type", json.dumps(types)])
def createConfiguration(self, motors=None):
# Selected components
self.components = list(self.selector.Components or [])
#
# dynamicCP contains all dynamic data sources
#
scandatasources = self.selector.selectedDataSources() or []
if motors:
self.selector.stepdatasources = json.dumps(motors)
scandatasources.extend(motors)
self.dynamicCP = str(self.selector.createDynamicComponent([
json.dumps(scandatasources)]))
self.fetchClientSources()
self.selector.RemoveDynamicComponent(self.dynamicCP)
self.dynamicCP = str(self.selector.createDynamicComponent([
json.dumps(scandatasources)]))
self.selector.updateConfigVariables()
if self.dynamicCP:
self.components.append(self.dynamicCP)
self.configurationXML = str(
self.selector.CreateWriterConfiguration(self.components))
self.selector.RemoveDynamicComponent(self.dynamicCP)
return
def openEntry(self, data=None, motors=None):
self.createConfiguration(motors=motors)
self.writer.XMLSettings = self.configurationXML
self.globalDct = json.loads(self.datarecord)
# get start_time
starttime = self.berlin.localize(datetime.now())
self.globalDct["start_time"] = str(
starttime.strftime(self.fmt))
if isinstance(data, dict):
self.globalDct.update(data)
missing = list(
set(self.mcl['INIT'].keys()) - set(self.globalDct.keys()))
if missing:
raise Exception("Missing INIT CLIENT data: %s" % missing)
self.writer.JSONRecord = json.dumps({"data": self.globalDct})
self.writer.OpenEntry()
def execStep(self, data=None):
localDct = data if isinstance(data, dict) else {}
missing = list(
(set(self.mcl['STEP'].keys()) - set(self.globalDct.keys())
- set(localDct.keys())))
# if the record name is Tango device we can try to read its Value
for dv in missing:
if self.proxies[str(dv)]:
value = self.proxies[str(dv)].read_attribute(
self.attributeNames[str(dv)]).value
if hasattr(value, "tolist"):
value = value.tolist()
localDct[dv] = value
else:
localDct[dv] = "SomeValue"
missing = list(
(set(self.mcl['STEP'].keys())) - set(localDct.keys())
- set(self.globalDct.keys()))
if missing:
raise Exception("Missing STEP CLIENT data: %s" % missing)
# print("RECORD: %s" % localDct)
print pprint.pformat(localDct, indent=1, depth=1)
self.writer.Record(json.dumps({"data": localDct}))
def closeEntry(self, data=None):
endtime = self.berlin.localize(datetime.now())
self.globalDct["end_time"] = str(endtime.strftime(self.fmt))
if isinstance(data, dict):
self.globalDct.update(data)
missing = list(
set(self.mcl['FINAL'].keys()) - set(self.globalDct.keys()))
if missing:
raise Exception("Missing FINAL CLIENT data: %s" % missing)
self.writer.JSONRecord = json.dumps({"data": self.globalDct})
self.writer.CloseEntry()
def closeFile(self):
self.writer.CloseFile()
if __name__ == '__main__':
scanmotors = ["exp_mot04", "exp_mot17"]
scan = ScanComp("p09/nxsrecselector/hastodt")
if scan.appendEntry:
scan.openFile("/tmp/hasylab.nxs")
else:
scan.openFile(HasyUtils.createScanName("/tmp/hasylab") + ".nxs")
scan.openEntry(motors=scanmotors)
for el in scan.getClientSources():
print "client sources", pprint.pformat(el, indent=1)
## experimental loop
for li in range(3):
scan.execStep({"point_nb": li})
scan.closeEntry()
scan.closeFile()