#!/usr/bin/env python
#########################################################################################
# Copyright 2017 SKA South Africa (http://ska.ac.za/) #
# #
# BSD license - see LICENSE.txt for details #
#########################################################################################
"""
Simlib library generic simulator generator utility to be used to generate an actual
TANGO device that exhibits the behaviour defined in the data description file.
"""
from __future__ import absolute_import, division, print_function
from future import standard_library
standard_library.install_aliases() # noqa: E402
import argparse
import logging
import os
import time
import weakref
from builtins import map, object, range
from functools import partial
import numpy as np
from tango import (
Attr,
AttrDataFormat,
AttrQuality,
AttrWriteType,
CmdArgType,
DevState,
UserDefaultAttrProp,
)
from tango.server import Device, attribute, device_property
from tango_simlib.model import (
INITIAL_CONSTANT_VALUE_TYPES,
Model,
PopulateModelActions,
PopulateModelProperties,
PopulateModelQuantities,
)
from future.utils import with_metaclass
from future.utils import itervalues
from tango_simlib.sim_test_interface import TangoTestDeviceServerBase
from tango_simlib.utilities import helper_module
from tango_simlib.utilities.fandango_json_parser import FandangoExportDeviceParser
from tango_simlib.utilities.sim_sdd_xml_parser import SDDParser
from tango_simlib.utilities.sim_xmi_parser import XmiParser
from tango_simlib.utilities.simdd_json_parser import SimddParser
MODULE_LOGGER = logging.getLogger(__name__)
class TangoDeviceServerBase(Device):
instances = weakref.WeakValueDictionary()
def init_device(self):
super(TangoDeviceServerBase, self).init_device()
name = self.get_name()
self.model = None
self.instances[name] = self
self.set_state(DevState.ON)
def always_executed_hook(self):
self.model.update()
def read_attributes(self, attr):
"""Method reading an attribute value.
Parameters
----------
attr : PyTango.DevAttr
The attribute to read from.
"""
if self.get_state() != DevState.OFF:
name = attr.get_name()
value, update_time = self.model.quantity_state[name]
quality = AttrQuality.ATTR_VALID
attr.set_value_date_quality(value, update_time, quality)
def write_attributes(self, attr):
"""Method writing an attribute value.
Parameters
----------
attr : PyTango.DevAttr
The attribute to write to.
"""
if self.get_state() != DevState.OFF:
name = attr.get_name()
data = attr.get_write_value()
self.model.sim_quantities[name].set_val(data, self.model.time_func())
[docs]def add_static_attribute(tango_device_class, attr_name, attr_meta):
"""Add any TANGO attribute of to the device server before start-up.
Parameters
----------
cls: class
class object that the device server will inherit from
attr_name: str
Tango attribute name
attr_meta: dict
Meta data that enables the creation of a well configured attribute
Note
====
This is needed for DevEnum and spectrum type attributes
"""
enum_labels = attr_meta.get("enum_labels", "")
attr = attribute(
label=attr_meta.get("label", attr_name),
dtype=attr_meta["data_type"],
enum_labels=enum_labels,
doc=attr_meta.get("description", ""),
dformat=attr_meta["data_format"],
max_dim_x=attr_meta["max_dim_x"],
max_dim_y=attr_meta["max_dim_y"],
access=getattr(AttrWriteType, attr_meta["writable"]),
polling_period=int(attr_meta.get("period", "-1")),
min_value=attr_meta.get("min_value", ""),
max_value=attr_meta.get("max_value", ""),
min_alarm=attr_meta.get("min_alarm", ""),
max_alarm=attr_meta.get("max_alarm", ""),
min_warning=attr_meta.get("min_warning", ""),
max_warning=attr_meta.get("max_warning", ""),
delta_val=attr_meta.get("delta_val", ""),
delta_t=attr_meta.get("delta_t", ""),
abs_change=attr_meta.get("abs_change", ""),
rel_change=attr_meta.get("rel_change", ""),
event_period=attr_meta.get("event_period", ""),
archive_abs_change=attr_meta.get("archive_abs_change", ""),
archive_rel_change=attr_meta.get("archive_rel_change", ""),
archive_period=attr_meta.get("archive_period", ""),
)
attr.__name__ = attr_name
# Attribute read method
def read_meth(tango_device_instance, attr):
name = attr.get_name()
value, update_time = tango_device_instance.model.quantity_state[name]
quality = AttrQuality.ATTR_VALID
# For attributes that have a SPECTRUM data format, there is no need to
# type cast them to an integer data type. we need assign the list of values
# to the attribute value parameter.
if type(value) in (list, np.ndarray):
attr.set_value_date_quality(value, update_time, quality)
else:
attr.set_value_date_quality(int(value), update_time, quality)
# Attribute write method for writable attributes
if str(attr_meta["writable"]) in ("READ_WRITE", "WRITE"):
@attr.write
def attr(tango_device_instance, new_val):
# When selecting a model quantity we use the enum labels list indexing
# to return the string value corresponding to the respective enum value
# since an integer value is returned by device server when
# attribute value is read
_sim_quantities = tango_device_instance.model.sim_quantities
tango_device_instance.model_quantity = _sim_quantities[attr_name]
tango_device_instance.model_quantity.set_val(
new_val, tango_device_instance.model.time_func()
)
read_meth.__name__ = "read_{}".format(attr_name)
# Add the read method and the attribute to the class object
setattr(tango_device_class, read_meth.__name__, read_meth)
setattr(tango_device_class, attr.__name__, attr)
MODULE_LOGGER.info("Adding static attribute {} to the device.".format(attr_name))
def _create_sim_test_interface_atttribute(models, class_instance):
# Pick the first model instance in the dict.
controllable_attribute_names = list(itervalues(models))[0].sim_quantities.keys()
attr_control_meta = {}
attr_control_meta["enum_labels"] = sorted(controllable_attribute_names)
attr_control_meta["data_format"] = AttrDataFormat.SCALAR
attr_control_meta["data_type"] = CmdArgType.DevEnum
attr_control_meta["label"] = "Attribute name"
attr_control_meta["description"] = "Attribute name to control"
attr_control_meta["max_dim_x"] = 1
attr_control_meta["max_dim_y"] = 0
attr_control_meta["writable"] = "READ_WRITE"
enum_labels = attr_control_meta.get("enum_labels", "")
attr = attribute(
label=attr_control_meta["label"],
dtype=attr_control_meta["data_type"],
enum_labels=enum_labels,
doc=attr_control_meta["description"],
dformat=attr_control_meta["data_format"],
max_dim_x=attr_control_meta["max_dim_x"],
max_dim_y=attr_control_meta["max_dim_y"],
access=getattr(AttrWriteType, attr_control_meta["writable"]),
fget=class_instance.read_fn,
fset=class_instance.write_fn,
)
return attr
[docs]def get_tango_device_server(models, sim_data_files):
"""Declares a tango device class that inherits the Device class and then
adds tango attributes (DevEnum and Spectrum type).
Parameters
----------
models: dict
A dictionary of model.Model instances.
e.g. {'model-name': model.Model}
sim_data_files: list
A list of direct paths to either xmi/xml/json data files.
Returns
-------
TangoDeviceServer : PyTango.Device
Tango device that has the commands dictionary populated.
"""
# Declare a Tango Device class for specifically adding static
# attributes prior running the device server and controller
class TangoDeviceServerStaticAttrs(object):
pass
class TangoTestDeviceServerStaticAttrs(object):
pass
def read_fn(tango_device_instance):
return tango_device_instance._attribute_name_index
def write_fn(tango_device_instance, val):
tango_device_instance._attribute_name_index = val
tango_device_instance.model_quantity = tango_device_instance.model.sim_quantities[
sorted(tango_device_instance.model.sim_quantities.keys())[val]
]
# Sim test interface static attribute `attribute_name` info
TangoTestDeviceServerStaticAttrs.read_fn = read_fn
TangoTestDeviceServerStaticAttrs.write_fn = write_fn
attr = _create_sim_test_interface_atttribute(models, TangoTestDeviceServerStaticAttrs)
attr.setter(TangoTestDeviceServerStaticAttrs.write_fn)
TangoTestDeviceServerStaticAttrs.attribute_name = attr
# We use the `add_static_attribute` method to add DevEnum and Spectrum type
# attributes statically to the tango device before start-up since the
# cannot be well configured when added dynamically. This is suspected
# to be a bug.
# TODO(AR 02-03-2017): Ask the tango community on the upcoming Stack
# Exchange community (AskTango) and also make follow ups on the next tango
# releases.
for quantity_name, quantity in list(itervalues(models))[0].sim_quantities.items():
d_type = str(quantity.meta["data_type"])
d_format = str(quantity.meta["data_format"])
if d_type == "DevEnum" or d_format == "SPECTRUM":
add_static_attribute(
TangoDeviceServerStaticAttrs, quantity_name, quantity.meta
)
class TangoDeviceServer(TangoDeviceServerBase, TangoDeviceServerStaticAttrs):
_models = models
min_update_period = device_property(
dtype=float,
default_value=0.99,
doc="Minimum time before model update method can be called again [seconds].",
)
def init_device(self):
super(TangoDeviceServer, self).init_device()
self.model = self._models[self.get_name()]
self._not_added_attributes = []
write_device_properties_to_db(self.get_name(), self.model)
self.model.reset_model_state()
self.model.min_update_period = self.min_update_period
self.initialize_dynamic_commands()
# Only the .fgo file has the State as an attribute. The .xmi files has it as
# a command, so it won't have an initial value. And in some other data
# description files the State attribute is not specified.
if "State" in self.model.sim_quantities:
# Set default device state
state_quantity = self.model.sim_quantities["State"].meta
state_value = int(state_quantity["value"])
self.set_state(DevState.values[state_value])
def initialize_dynamic_commands(self):
for action_name, action_handler in self.model.sim_actions.items():
cmd_handler = helper_module.generate_cmd_handler(
self.model, action_name, action_handler
)
setattr(TangoDeviceServer, action_name, cmd_handler)
self.add_command(cmd_handler, device_level=True)
def initialize_dynamic_attributes(self):
model_sim_quants = self.model.sim_quantities
attribute_list = set([attr for attr in model_sim_quants.keys()])
for attribute_name in attribute_list:
meta_data = model_sim_quants[attribute_name].meta
# Dynamically add all attributes except those with DevEnum data type,
# and SPECTRUM data format since they are added statically to the device
# class prior to start-up. Also exclude attributes with a data format
# 'IMAGE' as we currently do not handle them.
if not self._is_attribute_addable_dynamically(meta_data):
continue
# The return value of rwType is a string and it is required as a
# PyTango data type when passed to the Attr function.
# e.g. 'READ' -> tango._tango.AttrWriteType.READ
rw_type = meta_data["writable"]
rw_type = getattr(AttrWriteType, rw_type)
attr = self._create_attribute(
attribute_name, meta_data["data_type"], rw_type
)
if attr is None:
continue
self._configure_attribute_default_properties(attr, meta_data)
self._add_dynamic_attribute(attr, rw_type)
MODULE_LOGGER.debug("Added dynamic {} attribute".format(attribute_name))
def _add_dynamic_attribute(self, attribute, read_write_type):
if read_write_type in (AttrWriteType.READ, AttrWriteType.READ_WITH_WRITE):
self.add_attribute(attribute, r_meth=self.read_attributes)
elif read_write_type == AttrWriteType.WRITE:
self.add_attribute(attribute, w_meth=self.write_attributes)
elif read_write_type == AttrWriteType.READ_WRITE:
self.add_attribute(
attribute, r_meth=self.read_attributes, w_meth=self.write_attributes
)
def _is_attribute_addable_dynamically(self, quantity_meta_data):
attr_dtype = quantity_meta_data["data_type"]
d_format = quantity_meta_data["data_format"]
if str(attr_dtype) == "DevEnum" or str(d_format) == "SPECTRUM":
return False
elif str(d_format) == "IMAGE":
self._not_added_attributes.append(quantity_meta_data["name"])
return False
return True
def _create_attribute(self, attribute_name, attr_dtype, rw_type):
attribute = None
# Add a try/except clause when creating an instance of Attr class
# as PyTango may raise an error when things go wrong.
try:
attribute = Attr(attribute_name, attr_dtype, rw_type)
except Exception as e:
self._not_added_attributes.append(attribute_name)
MODULE_LOGGER.debug(
"Attribute %s could not be added dynamically"
" due to an error raised %s.",
attribute_name,
str(e),
)
return attribute
def _configure_attribute_default_properties(self, attribute, quantity_meta_data):
attribute_properties = UserDefaultAttrProp()
for prop, prop_value in quantity_meta_data.items():
# NB: Calling 'set_enum_labels' or setting the 'enum_labels' results
# in a error, and we do not need to do anyway as DevEnum attributes are
# handled by the `add_static_attribute` method.
if prop == "enum_labels":
continue
# UserDefaultAttrProp does not have the property 'event_period' but does
# have a setter method for it.
if prop == "event_period":
attribute_properties.set_event_period(prop_value)
continue
if hasattr(attribute_properties, prop):
try:
setattr(attribute_properties, prop, prop_value)
except Exception as e:
attribute_name = quantity_meta_data["name"]
MODULE_LOGGER.error(
"The attribute '%s's property '%s' could not be set to "
"value '%s' due to an error raised %s.",
attribute_name,
prop,
prop_value,
str(e),
)
else:
MODULE_LOGGER.debug(
"UserDefaultAttrProp has no attribute named '%s'", prop
)
attribute.set_default_properties(attribute_properties)
@attribute(
dtype=(str,),
doc="List of attributes that were not added to the "
"device due to an error.",
max_dim_x=10000,
)
def AttributesNotAdded(self):
return self._not_added_attributes
@attribute(
dtype=int,
doc="Number of attributes not added to the device due to an error.",
)
def NumAttributesNotAdded(self):
return len(self._not_added_attributes)
class SimControl(TangoTestDeviceServerBase, TangoTestDeviceServerStaticAttrs):
instances = weakref.WeakValueDictionary()
def init_device(self):
super(SimControl, self).init_device()
name = self.get_name()
self.instances[name] = self
klass_name = get_device_class(sim_data_files)
TangoDeviceServer.TangoClassName = klass_name
TangoDeviceServer.__name__ = klass_name
SimControl.TangoClassName = "%sSimControl" % klass_name
SimControl.__name__ = "%sSimControl" % klass_name
return [TangoDeviceServer, SimControl]
[docs]def write_device_properties_to_db(device_name, model, db_instance=None):
"""Writes device properties, including optional default value, to tango DB.
Parameters
----------
device_name : str
A TANGO device name
model : model.Model instance
Device model instance
db_instance : tango._tango.Database instance
Tango database instance
"""
if not db_instance:
db_instance = helper_module.get_database()
for prop_name, prop_meta in model.sim_properties.items():
db_instance.put_device_property(
device_name, {prop_name: prop_meta["DefaultPropValue"]}
)
[docs]def get_parser_instance(sim_datafile):
"""This method returns an appropriate parser instance to generate a Tango device.
Parameters
----------
sim_datafile : str
A direct path to the xmi/xml/json/fgo file.
Returns
------
parser_instance: Parser instance
The Parser object which reads an xmi/xml/json/fgo file and parses it into device
attributes, commands, and properties.
"""
extension = os.path.splitext(sim_datafile)[-1]
extension = extension.lower()
parser_instance = None
if extension in [".xmi"]:
parser_instance = XmiParser()
parser_instance.parse(sim_datafile)
elif extension in [".json"]:
parser_instance = SimddParser()
parser_instance.parse(sim_datafile)
elif extension in [".xml"]:
parser_instance = SDDParser()
parser_instance.parse(sim_datafile)
elif extension in [".fgo"]:
parser_instance = FandangoExportDeviceParser()
parser_instance.parse(sim_datafile)
return parser_instance
[docs]def generate_device_server(server_name, sim_data_files, directory=""):
"""Create a tango device server python file.
Parameters
----------
server_name: str
Tango device server name
sim_data_files: list
A list of direct paths to either xmi/xml/json data files.
"""
lines = [
"#!/usr/bin/env python",
"from tango.server import server_run",
(
"from tango_simlib.tango_sim_generator import ("
"configure_device_models, get_tango_device_server)"
),
"\n\n# File generated on {} by tango-simlib-generator".format(time.ctime()),
"\n\ndef main():",
" sim_data_files = {}".format(sim_data_files),
" models = configure_device_models(sim_data_files)",
" TangoDeviceServers = get_tango_device_server(models, sim_data_files)",
" server_run(TangoDeviceServers)",
'\nif __name__ == "__main__":',
" main()\n",
]
with open(os.path.join(directory, "%s" % server_name), "w") as dserver:
dserver.write("\n".join(lines))
# Make the script executable
os.chmod(os.path.join(directory, "%s" % server_name), 477)
[docs]def get_device_class(sim_data_files):
"""Get device class name from specified xmi/simdd description file.
Parameters
----------
sim_data_files: list
A list of direct paths to either xmi/xml/json/fgo data files.
Returns
-------
klass_name: str
Tango device class name
"""
if len(sim_data_files) < 1:
raise Exception("No simulator data file specified.")
parser_instance = None
klass_name = ""
precedence_map = {".xmi": 1, ".fgo": 2, ".json": 3}
def get_precedence(file_name):
extension = os.path.splitext(file_name)[-1]
extension = extension.lower()
return precedence_map.get(extension, 100)
sorted_files = sorted(sim_data_files, key=get_precedence)
parser_instance = get_parser_instance(sorted_files[0])
# Since at the current moment the class name of the tango simulator to be
# generated must be specified in the xmi data file, if no xmi if provided
# the simulator will be given a default name.
if parser_instance:
klass_name = parser_instance.device_class_name
else:
klass_name = "TangoDeviceServer"
return klass_name
[docs]def get_argparser():
parser = argparse.ArgumentParser(
description="Generate a tango data driven simulator, handling"
" registration as needed. Supports multiple device per process."
)
required_argument = partial(parser.add_argument, required=True)
required_argument(
"--sim-data-file",
action="append",
help="Simulator description data files(s) " ".i.e. can specify multiple files",
)
required_argument("--directory", help="TANGO server executable path", default="")
required_argument("--dserver-name", help="TANGO server executable command")
return parser
[docs]def main():
arg_parser = get_argparser()
opts = arg_parser.parse_args()
generate_device_server(
opts.dserver_name, opts.sim_data_file, directory=opts.directory
)
if __name__ == "__main__":
main()