Source code for tango_simlib.model

#########################################################################################
# Copyright 2017 SKA South Africa (http://ska.ac.za/)                                   #
#                                                                                       #
# BSD license - see LICENSE.txt for details                                             #
#########################################################################################
from __future__ import absolute_import, division, print_function
from future import standard_library

standard_library.install_aliases()  # noqa: E402
from future.utils import iteritems

import importlib
import logging
import sys
import time
import weakref
from builtins import map, object, range
from functools import partial

from tango import CmdArgType
from tango_simlib import quantities

MODULE_LOGGER = logging.getLogger(__name__)

model_registry = weakref.WeakValueDictionary()

DEFAULT_TANGO_COMMANDS = frozenset(["State", "Status", "Init"])
MAX_NUM_OF_CLASS_ATTR_OCCURENCE = 1
ARBITRARY_DATA_TYPE_RETURN_VALUES = {
    CmdArgType.DevString: "Ok!",
    CmdArgType.DevBoolean: True,
    CmdArgType.DevDouble: 4.05,
    CmdArgType.DevFloat: 8.1,
    CmdArgType.DevLong: 3,
    CmdArgType.DevVoid: None,
}

# In the case where an attribute with constant quantity simulation type is
# specified, this dict is used to convert the initial value if specified to
# the data-type corresponding to the attribute data-type.
INITIAL_CONSTANT_VALUE_TYPES = {
    CmdArgType.DevString: (str, ""),
    CmdArgType.DevFloat: (float, 0.0),
    CmdArgType.DevDouble: (float, 0.0),
    CmdArgType.DevBoolean: (bool, False),
    CmdArgType.DevEnum: (int, 0),
    CmdArgType.DevUChar: (int, 0),
    CmdArgType.DevShort: (int, 0),
    CmdArgType.DevUShort: (int, 0),
    CmdArgType.DevLong: (int, 0),
    CmdArgType.DevULong: (int, 0),
    CmdArgType.DevLong64: (int, 0),
    CmdArgType.DevULong64: (int, 0),
    CmdArgType.DevVoid: (None, None),
    CmdArgType.DevState: (int, 0),
    CmdArgType.DevEncoded: (bytearray, 0),
}


[docs]class Model(object): """Tango Device main model with quantities and actions. Parameters ---------- name : str Model name identifier start_time : float Time at instantiation of the model min_update_period : float Minimum update period of the quantities in the model time_func : time function Function that return current time i.e. time.time """ def __init__( self, name, start_time=None, min_update_period=0.99, time_func=time.time, logger=None, ): self.name = name model_registry[self.name] = self self.min_update_period = min_update_period self.time_func = time_func self.start_time = start_time or time_func() self.last_update_time = self.start_time self.sim_quantities = {} self.sim_actions = {} self.sim_properties = {} self.test_sim_actions = {} self.sim_actions_meta = {} self._sim_state = {} self.setup_sim_quantities() self.override_pre_updates = [] self.override_post_updates = [] self.paused = False # Flag to pause updates # Making a public reference to _sim_state. Allows us to hook read-only views # or updates or whatever the future requires of this humble public attribute. self.quantity_state = self._sim_state self.logger = logger if logger else MODULE_LOGGER
[docs] def setup_sim_quantities(self): """ Set up self.sim_quantities with simulated quantities. Subclasses should implement this method. Should place simulated quantities in self.sim_quantities dict. Keyed by name of quantity, value must be instances satisfying the :class:`quantities.Quantity` interface. Notes ===== - Must use self.start_time to set initial time values. - Must call super method after setting up `sim_quantities` """ self._sim_state.update( { var: (quant.last_val, quant.last_update_time) for var, quant in self.sim_quantities.items() } )
[docs] def update(self): sim_time = self.time_func() dt = sim_time - self.last_update_time if dt < self.min_update_period or self.paused: # Updating the sim_state in case the test interface or external command # updated the quantities. for var, quant in self.sim_quantities.items(): self._sim_state[var] = (quant.last_val, quant.last_update_time) self.logger.debug( "Sim {} skipping update at {}, dt {} < {} and pause {}".format( self.name, sim_time, dt, self.min_update_period, self.paused ) ) return for override_update in self.override_pre_updates: override_update(self, sim_time, dt) self.logger.debug("Stepping at {}, dt: {}".format(sim_time, dt)) self.last_update_time = sim_time try: for var, quant in self.sim_quantities.items(): self._sim_state[var] = (quant.next_val(sim_time), sim_time) except Exception: self.logger.exception("Exception in update loop") for override_update in self.override_post_updates: override_update(self, sim_time, dt)
[docs] def set_sim_action(self, name, handler): """Add an action handler function. Parameters ---------- name : str Name of the action handler : callable(model_instance, action_args) Callable that handles action (name). Is called with the model instance as the first parameter. """ self.sim_actions[name] = partial(handler, self)
[docs] def set_test_sim_action(self, name, handler): """Add an action handler function. Parameters ---------- name : str Name of the action handler : callable(model_instance, action_args) Callable that handles action (name). Is called with the model instance as the first parameter. """ self.test_sim_actions[name] = partial(handler, self)
[docs] def set_sim_property(self, device_prop): self.sim_properties.update(device_prop)
[docs] def reset_model_state(self): """Reset the model's quantities' adjustable attributes to their default values. """ quantities = self.sim_quantities.values() for quantity in quantities: self._reset_quantity_adjustable_attributes_values(quantity)
def _reset_quantity_adjustable_attributes_values(self, quantity): quantity_metadata = quantity.meta key_vals = quantity_metadata.keys() attr_data_type = quantity_metadata["data_type"] # the xmi, json and fgo files have data_format attributes indicating # SPECTRUM, SCALAR OR IMAGE data formats. The xml file does not have this # key in its attribute list. It has a key labelled possible values which # is a list. Hence, SPECTRUM is no data_format is found. try: attr_data_format = str(quantity_metadata["data_format"]) except KeyError: attr_data_format = "SPECTRUM" max_dim_x, max_dim_y = self._get_quantity_dimensions(quantity_metadata) val_type, val = INITIAL_CONSTANT_VALUE_TYPES[attr_data_type] expected_key_vals = ["value", "possiblevalues"] adjustable_attrs = quantity.adjustable_attributes for attribute in adjustable_attrs: if attribute == "last_update_time": quantity.last_update_time = self.start_time continue else: if "quantity_simulation_type" in quantity_metadata: simulation_type = quantity_metadata["quantity_simulation_type"] if simulation_type == "ConstantQuantity": initial_value = quantity_metadata.get("initial_value", None) if initial_value not in [None, ""]: adjustable_val = initial_value else: adjustable_val = val if val_type is None: adjustable_val = None else: adjustable_val = val_type(adjustable_val) else: if attribute == "last_val": quantity.last_val = float( quantity_metadata["mean"] ) continue else: adjustable_val = float(quantity_metadata[attribute]) else: if any(key_val in expected_key_vals for key_val in key_vals): if "value" in quantity_metadata: adjustable_val = quantity_metadata["value"] elif "possiblevalues" in quantity_metadata: adjustable_val = quantity_metadata["possiblevalues"] if attr_data_format == "SCALAR": adjustable_val = val_type(adjustable_val) elif attr_data_format == "SPECTRUM": adjustable_val = list(map(val_type, adjustable_val)) else: adjustable_val = [ [val_type(curr_val) for curr_val in sublist] for sublist in adjustable_val ] else: if attr_data_format == "SCALAR": adjustable_val = val elif attr_data_format == "SPECTRUM": adjustable_val = [val] * max_dim_x else: adjustable_val = [[val] * max_dim_x for i in range(max_dim_y)] setattr(quantity, attribute, adjustable_val) def _get_quantity_dimensions(self, quantity_metadata): key_vals = quantity_metadata.keys() expected_key_vals = ["max_dim_x", "max_dim_y", "maxX", "maxY"] # the xmi, json and fgo files have either (max_dim_x, max_dim_y) or # (maxX, maxY) keys. If none of these keys are found in them or in the # xml file, we use default values of 1 for x and 2 for y - same applies # for files where the keys have empty values. if any(key_val in expected_key_vals for key_val in key_vals): try: max_dim_x = quantity_metadata["max_dim_x"] max_dim_y = quantity_metadata["max_dim_y"] except KeyError: max_dim_x = quantity_metadata.get("maxX", 1) max_dim_y = quantity_metadata.get("maxY", 2) # just in case the keys exist but have no values if not max_dim_x: max_dim_x = 1 if not max_dim_y: max_dim_y = 2 return max_dim_x, max_dim_y
[docs]class PopulateModelQuantities(object): """Used to populate/update model quantities. Populates the model quantities using the data from the TANGO device information captured in the json file / POGO generated xmi / FANDANGO generated fgo file. Attributes ---------- parser_instance : Parser instance The Parser object which reads an xmi/xml/json file and parses it into device attributes, commands, and properties. sim_model : Model instance An instance of the Model class which is used for simulation of simple attributes. """ def __init__(self, parser_instance, tango_device_name, sim_model=None): self.parser_instance = parser_instance if sim_model: if isinstance(sim_model, Model): self.sim_model = sim_model else: raise SimModelException( "The sim_model object passed is not an instance" " of the class mkat_tango.simlib.model.Model" ) else: self.sim_model = Model(tango_device_name) self.logger = self.sim_model.logger self.setup_sim_quantities()
[docs] def setup_sim_quantities(self): """Set up self.sim_quantities from Model with simulated quantities. Places simulated quantities in sim_quantities dict. Keyed by name of quantity, value must be instances satisfying the :class:`quantities.Quantity` interface Notes ===== - Must use self.start_time to set initial time values. - Must call super method after setting up `sim_quantities` """ start_time = self.sim_model.start_time attributes = self.parser_instance.get_device_attribute_metadata() for attr_name, attr_props in attributes.items(): # When using more than one config file, the attribute meta data can be # overwritten, so we need to update it instead of reassigning a different # object. try: model_attr_props = self.sim_model.sim_quantities[attr_name].meta except KeyError: self.logger.debug( "Initializing '{}' quantity meta information using config file:" " '{}'.".format( attr_name, self.parser_instance.data_description_file_name ) ) model_attr_props = attr_props else: # Before the model attribute props dict is updated, the # parameter keys with no values specified from the attribute # props template are removed. # i.e. All optional parameters not provided in the SimDD attr_props = dict( (param_key, param_val) for param_key, param_val in iteritems(attr_props) if param_val ) model_attr_props.update(attr_props) if "quantity_simulation_type" in model_attr_props: if model_attr_props["quantity_simulation_type"] == "ConstantQuantity": try: initial_value = model_attr_props["initial_value"] except KeyError: # `initial_value` is an optional parameter, thus if not # specified in the SimDD datafile, an initial value of # default value of is assigned to the attribute # quantity initial value initial_value = None self.logger.info( "Parameter `initial_value` does not exist for" "attribute {}. Default will be used".format( model_attr_props["name"] ) ) attr_data_type = model_attr_props["data_type"] val_type, val = INITIAL_CONSTANT_VALUE_TYPES[attr_data_type] init_val = initial_value if initial_value not in [None, ""] else val if val_type is None: start_val = None else: start_val = val_type(init_val) quantity_factory = quantities.registry[ attr_props["quantity_simulation_type"] ] self.sim_model.sim_quantities[attr_name] = quantity_factory( start_time=start_time, meta=model_attr_props, start_value=start_val, ) else: try: sim_attr_quantities = self.sim_attribute_quantities( float(model_attr_props["min_bound"]), float(model_attr_props["max_bound"]), float(model_attr_props["max_slew_rate"]), float(model_attr_props["mean"]), float(model_attr_props["std_dev"]), ) except KeyError: raise ValueError( "Attribute with name '{}' specified in the configuration" " file [{}] has no minimum or maximum values set".format( attr_name, self.parser_instance.data_description_file_name ) ) quantity_factory = quantities.registry[ attr_props["quantity_simulation_type"] ] self.sim_model.sim_quantities[attr_name] = quantity_factory( start_time=start_time, meta=model_attr_props, **sim_attr_quantities ) else: key_vals = model_attr_props.keys() attr_data_type = model_attr_props["data_type"] # the xmi, json and fgo files have data_format attributes indicating # SPECTRUM, SCALAR OR IMAGE data formats. The xml file does not have this # key in its attribute list. It has a key labelled possible values which # is a list. Hence, SPECTRUM is no data_format is found. try: attr_data_format = str(model_attr_props["data_format"]) except KeyError: attr_data_format = "SPECTRUM" expected_key_vals = ["max_dim_x", "max_dim_y", "maxX", "maxY"] # the xmi, json and fgo files have either (max_dim_x, max_dim_y) or # (maxX, maxY) keys. If none of these keys are found in them or in the # xml file, we use default values of 1 for x and 2 for y - same applies # for files where the keys have empty values. if any(key_val in expected_key_vals for key_val in key_vals): try: max_dim_x = model_attr_props["max_dim_x"] max_dim_y = model_attr_props["max_dim_y"] except KeyError: max_dim_x = model_attr_props.get("maxX", 1) max_dim_y = model_attr_props.get("maxY", 2) # just in case the keys exist but have no values if not max_dim_x: max_dim_x = 1 if not max_dim_y: max_dim_y = 2 val_type, val = INITIAL_CONSTANT_VALUE_TYPES[attr_data_type] expected_key_vals = ["value", "possiblevalues"] if any(key_val in expected_key_vals for key_val in key_vals): try: default_val = model_attr_props["value"] except KeyError: default_val = model_attr_props["possiblevalues"] if attr_data_format == "SCALAR": default_val = val_type(default_val) elif attr_data_format == "SPECTRUM": default_val = list(map(val_type, default_val)) else: default_val = [ [val_type(curr_val) for curr_val in sublist] for sublist in default_val ] else: if attr_data_format == "SCALAR": default_val = val elif attr_data_format == "SPECTRUM": default_val = [val] * max_dim_x else: default_val = [[val] * max_dim_x for i in range(max_dim_y)] self.sim_model.sim_quantities[attr_name] = quantities.ConstantQuantity( start_time=start_time, meta=model_attr_props, start_value=default_val ) self.sim_model.setup_sim_quantities()
[docs] def sim_attribute_quantities( self, min_bound, max_bound, max_slew_rate, mean, std_dev ): """Simulate attribute quantities with a Gaussian value distribution. Parameters ---------- min_value : float minimum attribute value to be simulated max_value : float maximum attribute value to be simulated max_slew_rate : float maximum changing rate of the simulated quantities between min and max values mean : float average value of the simulated quantity std_dev : float standard deviation value of the simulated quantity Returns ------- sim_attribute_quantities : dict Dict of Gaussian simulated quantities """ sim_attribute_quantities = {} sim_attribute_quantities["max_slew_rate"] = max_slew_rate sim_attribute_quantities["min_bound"] = min_bound sim_attribute_quantities["max_bound"] = max_bound sim_attribute_quantities["mean"] = mean sim_attribute_quantities["std_dev"] = std_dev return sim_attribute_quantities
[docs]class PopulateModelActions(object): """Used to populate/update model actions. Populates the model actions using the data from the TANGO device information captured in the json file / POGO generated xmi / FANDANGO generated fgo file. Attributes ---------- cmd_info : dict A dictionary of all the device commands together with their metadata specified in the xmi, json or fgo file(s). override_info : dict A dictionary of device override info in specified the xmi, json or fgo file(s). sim_model : Model instance An instance of the Model class which is used for simulation of simple attributes and/or commands. """ def __init__(self, cmd_info, override_info, tango_device_name, model_instance=None): self.cmd_info = cmd_info self.override_info = override_info if model_instance is None: self.sim_model = Model(tango_device_name) else: self.sim_model = model_instance self.logger = self.sim_model.logger self.add_actions()
[docs] def add_actions(self): instances = {} if self.override_info != {}: instances = self._get_class_instances(self.override_info) # Need to override the model's update method if the override class provides one. instance = [] for instance_ in instances: if instance_.startswith("Sim"): instance.append(instances[instance_]) for inst in instance: try: pre_update_overwrite = getattr(inst, "pre_update") except AttributeError: self.logger.info( "No pre-update method defined in the '{}'" " override class.".format(type(inst).__name__) ) else: self.sim_model.override_pre_updates.append(pre_update_overwrite) try: post_update_overwrite = getattr(inst, "post_update") except AttributeError: self.logger.info( "No post-update method defined in the '{}'" " override class.".format(type(inst).__name__) ) else: self.sim_model.override_post_updates.append(post_update_overwrite) for cmd_name, cmd_meta in self.cmd_info.items(): # Exclude the TANGO default commands as they have their own built in handlers # provided. if cmd_name in DEFAULT_TANGO_COMMANDS: continue # Every command is to be declared to have one or more action behaviour. # Example of a list of actions handle at this moment is as follows # [{'behaviour': 'input_transform', # 'destination_variable': 'temporary_variable'}, # {'behaviour': 'side_effect', # 'destination_quantity': 'temperature', # 'source_variable': 'temporary_variable'}, # {'behaviour': 'output_return', # 'source_variable': 'temporary_variable'}] actions = cmd_meta.get("actions", []) instance = None if cmd_name.startswith("test_"): cmd_name = cmd_name.split("test_")[1] for instance_ in instances: if instance_.startswith("SimControl"): instance = instances[instance_] self._check_override_action_presence(cmd_name, instance, "test_action_{}") handler = getattr( instance, "test_action_{}".format(cmd_name.lower()), self.generate_action_handler( cmd_name, cmd_meta["dtype_out"], actions ), ) self.sim_model.set_test_sim_action(cmd_name, handler) else: for instance_ in instances: if instance_.startswith("Sim"): instance = instances[instance_] self._check_override_action_presence(cmd_name, instance, "action_{}") handler = getattr( instance, "action_{}".format(cmd_name.lower()), self.generate_action_handler( cmd_name, cmd_meta["dtype_out"], actions ), ) self.sim_model.set_sim_action(cmd_name, handler) # Might store the action's metadata in the sim_actions dictionary # instead of creating a separate dict. try: self.sim_model.sim_actions_meta[cmd_name.split("test_")[1]] = cmd_meta except IndexError: self.sim_model.sim_actions_meta[cmd_name] = cmd_meta
def _get_class_instances(self, override_class_info): instances = {} for klass_info in override_class_info.values(): if klass_info["module_directory"] == "None": module = importlib.import_module(klass_info["module_name"]) else: sys.path.append(klass_info["module_directory"]) module = importlib.import_module(klass_info["module_name"]) sys.path.remove(klass_info["module_directory"]) klass = getattr(module, klass_info["class_name"]) instance = klass() instances[klass_info["name"]] = instance return instances def _check_override_action_presence(self, cmd_name, instance, action_type): instance_attributes = dir(instance) instance_attributes_list = [attr.lower() for attr in instance_attributes] attr_occurences = instance_attributes_list.count( action_type.format(cmd_name.lower()) ) # Check if there is only one override class method defined for each command if attr_occurences > MAX_NUM_OF_CLASS_ATTR_OCCURENCE: raise Exception( "The command '{}' has multiple override methods defined" " in the override class".format(cmd_name) ) # Assuming that there is only one override method defined, now we check if it # is in the correct letter case. elif attr_occurences == MAX_NUM_OF_CLASS_ATTR_OCCURENCE: try: instance_attributes.index(action_type.format(cmd_name.lower())) except ValueError: raise Exception("Only lower-case override method names are supported.")
[docs] def generate_action_handler(self, action_name, action_output_type, actions=None): """Generates and returns an action handler to manage tango commands. Parameters ---------- action_name : str Name of action handler to generate action_output_type : PyTango._PyTango.CmdArgType Tango command argument type actions : list List of actions that the handler will provide Returns ------- action_handler : function action handler, taking command input argument in case of tango commands with input arguments. """ if actions is None: actions = [] def action_handler(model, data_input=None, tango_dev=None): """Action handler taking command input arguments. Parameters ---------- model : model.Model Model instance data_in : float, string, int, etc. Input arguments of tango command Returns ------- return_value : float, string, int, etc. Output value of an executed tango command """ # TODO (KM 18-01-2016): Need to remove the tango_dev parameter from # action handler, currently used for testing functionality of the # override class actions. temp_variables = {} return_value = None for action in actions: if action["behaviour"] == "long_running": time.sleep(float(action["execution_time_secs"])) if action["behaviour"] == "input_transform": temp_variables[action["destination_variable"]] = data_input if action["behaviour"] == "side_effect": quantity = action["destination_quantity"] temp_variables[action["source_variable"]] = data_input model_quantity = model.sim_quantities[quantity] model_quantity.set_val(data_input, model.time_func()) if action["behaviour"] == "output_return": if "source_variable" in action and "source_quantity" in action: raise ValueError( "{}: Either 'source_variable' or 'source_quantity'" " for 'output_return' action, not both".format(action_name) ) elif "source_variable" in action: source_variable = action["source_variable"] try: return_value = temp_variables[source_variable] except KeyError: raise ValueError( "{}: Source variable {} not defined".format( action_name, source_variable ) ) elif "source_quantity" in action: quantity = action["source_quantity"] try: model_quantity = model.sim_quantities[quantity] except KeyError: raise ValueError( "{}: Source quantity {} not defined".format( action_name, quantity ) ) return_value = model_quantity.last_val else: raise ValueError( "{}: Need to specify one of 'source_variable' " "or 'source_quantity' for 'output_return' action".format( action_name ) ) else: # Return a default value if output_return is not specified. return_value = ARBITRARY_DATA_TYPE_RETURN_VALUES[action_output_type] return return_value action_handler.__name__ = action_name return action_handler
[docs]class PopulateModelProperties(object): """Used to populate/update model properties. Populates the model properties using the data from the TANGO device information captured in the json file / POGO generated xmi / FANDANGO generated fgo file. Attributes ---------- properties_info : dict A dictionary of device property configuration specified in the xmi, json or fgo file(s). sim_model : Model instance An instance of the Model class which is used for simulation of simple attributes. """ def __init__(self, properties_info, tango_device_name, sim_model=None): self.properties_info = properties_info if sim_model: if isinstance(sim_model, Model): self.sim_model = sim_model else: raise SimModelException( "The sim_model object passed is not an instance" " of the class mkat_tango.simlib.model.Model" ) else: self.sim_model = Model(tango_device_name) self.logger = self.sim_model.logger self.setup_sim_properties()
[docs] def setup_sim_properties(self): """Set up self.sim_properties from Model with simulated quantities Places simulated properties in sim_quantities dict. Keyed by name of property, value must be a string, number or array and it is optional. """ self.sim_model.set_sim_property(self.properties_info)
[docs]class SimModelException(Exception): def __init__(self, message): super(SimModelException, self).__init__(message)