Source code for tango_simlib.quantities

#########################################################################################
# Copyright 2017 SKA South Africa (http://ska.ac.za/)                                   #
#                                                                                       #
# BSD license - see LICENSE.txt for details                                             #
#########################################################################################
from __future__ import absolute_import, division, print_function
from future import standard_library

standard_library.install_aliases()  # noqa: E402

import abc
import logging
import time

from random import gauss

from builtins import object
from future.utils import with_metaclass
from past.builtins import cmp

MODULE_LOGGER = logging.getLogger(__name__)

inf = float("inf")
ninf = float("-inf")
registry = {}


[docs]def register_quantity_class(cls): assert cls.__name__ not in registry registry[cls.__name__] = cls Quantity.register(cls)
[docs]class Quantity(with_metaclass(abc.ABCMeta, object)): """Attributes that should be adjustable via a simulation control interface. Parameters ---------- start_time : float The initial time when a quantity is updated. start_value : float The initial value of a quantity. meta : dict This data structure must contain all the attribute description data of all quantities that represent tango device simulator attributes. List of all available tango attribute description data: abs_change, archive_abs_change, archive_period, archive_rel_change, label, max_alarm, max_value, max_warning, min_alarm, min_value, delta_t, delta_val, description, display_unit, format, min_warning, period, rel_change e.g. meta=dict(label="Outside Temperature", dtype=float) TODO (AR) 2016-07-27 : Ideally these properties should not be TANGO specific as is at the moment. Notes ===== Subclasses should add all the attributes to this set that users should be able to adjust via a user interface at simulation runtime, also initialise the `last_val` attribute with the initial quantity value. """ adjustable_attributes = frozenset(["last_val", "last_update_time"]) def __init__(self, start_value=None, start_time=None, meta=None): """Subclasses must call this super __init__()""" self.last_update_time = start_time or time.time() self.meta = meta if start_value is not None: self.last_val = start_value
[docs] @abc.abstractmethod def next_val(self, t): """Return the next simulated value for simulation time at t seconds. Must update attributes `last_val` with the new value and `last_update_time` with the simulation time Parameters ---------- t : float Time to update quantity """ pass
[docs] def set_val(self, val, t): """Set a value to the quantity. Parameters ---------- t : float Time to update quantity val : int/float/string Value to update quantity """ self.last_update_time = t self.last_val = val
[docs] def default_val(self, t): """Set a default value of 0 to the quantity. Parameters ---------- t : float Time to update quantity """ self.last_val = 0 self.last_update_time = t
[docs]class GaussianSlewLimited(Quantity): """A Gaussian random variable a slew-rate limit and clipping. Parameters ---------- mean : float Gaussian mean value std_dev : float Gaussian standard deviation max_slew_rate : float Maximum quantity slew rate in amount per second. Random values will be clipped to satisfy this condition. min_bound : float Minimum quantity value, random values will be clipped if needed. max_bound : float Maximum quantity value, random values will be clipped if needed. """ adjustable_attributes = Quantity.adjustable_attributes | frozenset( ["mean", "std_dev", "max_slew_rate", "min_bound", "max_bound"] ) def __init__( self, mean, std_dev, max_slew_rate=inf, meta=None, min_bound=ninf, max_bound=inf, start_value=None, start_time=None, ): start_value = start_value if start_value is not None else mean super(GaussianSlewLimited, self).__init__( start_value=start_value, start_time=start_time, meta=meta ) self.mean = mean self.std_dev = std_dev assert max_slew_rate > 0 self.max_slew_rate = max_slew_rate self.min_bound = min_bound self.max_bound = max_bound self.last_val = mean
[docs] def next_val(self, t): """Returns the next value of the simulation. Parameters ---------- t : float Time to update quantity """ dt = t - self.last_update_time max_slew = self.max_slew_rate * dt new_val = gauss(self.mean, self.std_dev) delta = new_val - self.last_val val = self.last_val + cmp(delta, 0) * min(abs(delta), max_slew) val = min(val, self.max_bound) val = max(val, self.min_bound) self.last_val = val self.last_update_time = t return val
register_quantity_class(GaussianSlewLimited)
[docs]class ConstantQuantity(Quantity): """A quantity that does not change unless explicitly set."""
[docs] def next_val(self, t): """Returns the last value as the next simulated value. Parameters ---------- t : float Time to update quantity """ return self.last_val
[docs] def default_val(self, t): """Set a default value of `True` to the quantity. Parameters ---------- t : float Time to update quantity """ self.last_val = True self.last_update_time = t
register_quantity_class(ConstantQuantity)