VmExecutor example

 
#!/bin/env python3
"""
Properties
  PlcIpAddress:   '192.168.999.999.1.1'
  PlcPort:   851
  AdsMotor:  MAIN.temp_water
    MAIN.temp_water_status
    MAIN.temp_water_read
    MAIN.temp_water_set
    MAIN.temp_water_min
    MAIN.temp_water_max
"""
import PyTango
import sys, os, time, inspect
import pyads
#
#
# 
def getDeviceProperty( devName, propName, tangoHost = None):
    ”'
    return a list containing property values
      devName:  'p17/macroserver/haspp17.01'
      propName: 'MacroPath'
    tangoHost e.g.: "haspp99:10000"
    ”'
    db = PyTango.Database()

    try:
        dct = db.get_device_property( devName, [ propName])
    except PyTango.DevFailed as e:
        PyTango.Except.print_exception( e)
        print( "getDeviceProperty: %s " % (propName))
        return False
    return list(dct[ propName])

class VM:
    #
    # init_device
    #
    def __init__( self):
        self.parent = inspect.currentframe().f_back.f_locals['self']
        try: 
            self.plcIpAddress = getDeviceProperty( self.parent.get_name(), "PlcIpAddress")[0]
            self.plcPort = getDeviceProperty( self.parent.get_name(), "PlcPort")[0]
            self.adsMotor = getDeviceProperty( self.parent.get_name(), "AdsMotor")[0]
        except Exception as e: 
            print( "pyAdsMotor.py: expecting the properties: PlcIpAddress, PlcPort, AdsMotor")
            sys.exit( 255)
        print( "pyAdsMotor.__init__: connecting to %s, %s" % ( self.plcIpAddress, repr( self.plcPort)))

        self.plc = pyads.Connection( self.plcIpAddress, int( self.plcPort)) 
        self.plc.open()

        self.isatty = os.isatty( 1)
        return
    #
    # dev_state
    #
    def dev_state( self):
        argout = self.plc.read_by_name( "%s_status" % self.adsMotor, pyads.PLCTYPE_INT)

        argout = PyTango.DevState.ON

        if self.isatty: 
            print( "pyAdsMotor.state, %s" % repr( argout))
        return argout
    #
    # Position
    #
    def read_Position( self):
        argout = self.plc.read_by_name( "%s_read" % self.adsMotor, pyads.PLCTYPE_INT)
        if self.isatty: 
            print( "pyAdsMotor.read_position: %s " % repr( argout))
        return float( argout)

    def write_Position( self, argin):
        argout = self.plc.write_by_name( "%s_set" % self.adsMotor, int( argin), pyads.PLCTYPE_INT)
        if self.isatty: 
            print( "pyAdsMotor.write_position %s, %s " % (repr( argin), repr( argout)))
        return 1
    #
    # UnitLimitMax
    #
    def read_UnitLimitMax( self):
        argout = self.plc.read_by_name( "%s_max" % self.adsMotor, pyads.PLCTYPE_INT)
        if self.isatty: 
            print( "pyAdsMotor.read_UnitLimitMax %s " % repr( argout))
        return argout

    def write_UnitLimitMax( self, argin):
        argout = self.plc.write_by_name( "%s_max" % self.adsMotor, int( argin), pyads.PLCTYPE_INT)
        if self.isatty: 
            print( "pyAdsMotor.write_UnitLimitMax %s, %s " % ( repr( argin), repr( argout)))
        return
    #
    # UnitLimitMin
    #
    def read_UnitLimitMin( self):
        argout = self.plc.read_by_name( "%s_min" % self.adsMotor, pyads.PLCTYPE_INT)
        if self.isatty: 
            print( "pyAdsMotor.read_UnitLimitMin %s " % repr( argout))
        return argout

    def write_UnitLimitMin( self, argin):
        argout = self.plc.write_by_name( "%s_min" % self.adsMotor, int( argin), pyads.PLCTYPE_INT)
        if self.isatty: 
            print( "pyAdsMotor.write_UnitLimitMin %s, %s " % ( repr( argin), repr( argout)))
        return
    #
    # CwLimit, CcwLimit
    #
    def read_CwLimit( self):
        return 0
    def read_CcwLimit( self):
        return 0

    def StopMove( self):
        if self.isatty: 
            print( "pyAdsMotor.stopMove: no action")
        return 1