import asyncio
import logging
import basyx.aas.model.submodel
from basyx.aas.util import traversal
from smia.logic.exceptions import CapabilityCheckingError, AASModelReadingError
from smia.css_ontology.css_ontology_utils import CapabilitySkillOntologyUtils, CapabilitySkillACLInfo, \
CapabilitySkillOntologyInfo
from smia.utilities.smia_info import AssetInterfacesInfo
_logger = logging.getLogger(__name__)
[docs]
class ExtendedAASModel:
"""This class contains methods related to the AAS model stored in Python objects. These methods are useful when
using the AAS model in the SMIA approach."""
aas_model_object_store = None #: Storage with all Python object obtained from the AAS model
capabilities_skills_dict = None #: Dictionary with all information related to Capability-Skill model obtained from the AAS model
lock = None #: Asyncio Lock object for secure access to shared AAS model objects
def __init__(self):
# The object to store all Python objects obtained from the AAS model are initialized
self.aas_model_object_store = None
# Also, the object to store all information related to Capabilities and Skills is initialized. To this end, the
# dictionary is divided in capabilities related to the agent (DT) and capabilities related to the asset.
self.capabilities_skills_dict = {'AgentCapabilities': {}, 'AssetCapabilities': {}}
# The Lock object is used to manage the access to global agent attributes (request and response dictionaries,
# interaction id number...)
self.lock = asyncio.Lock()
# -----------------------------------------
# Methods related to AAS model object store
# -----------------------------------------
[docs]
async def set_aas_model_object_store(self, object_store):
"""
This method updates the object store for the AAS model.
Args:
object_store (basyx.aas.model.DictObjectStore): object to store all Python elements of the AAS model.
"""
async with self.lock:
self.aas_model_object_store = object_store
[docs]
async def get_aas_model_object_store(self):
"""
This method returns the object store for the AAS model.
Returns:
basyx.aas.model.DictObjectStore: object with all Python elements of the AAS model.
"""
async with self.lock:
return self.aas_model_object_store
# -------------------------------------------
# Methods related to capability skills object
# -------------------------------------------
[docs]
async def set_capabilities_skills_object(self, cap_skills_object):
"""
This method updates the object that contains with all the information about Capabilities and Skills of the DT
and the asset.
Args:
cap_skills_object (dict): object with all the information about Capabilities and Skills of the DT and the asset.
"""
async with self.lock:
self.capabilities_skills_dict = cap_skills_object
[docs]
async def get_capabilities_skills_object(self):
"""
This method returns the object with all the information about Capabilities and Skills of the DT and the asset.
Returns:
dict: all the information about Capabilities and Skills of the DT and the asset in JSON format.
"""
async with self.lock:
return self.capabilities_skills_dict
[docs]
async def get_capability_dict_by_type(self, cap_type):
"""
This method returns the capability dictionary related to the given capability type.
Args:
cap_type(str): type of the capability (AgentCapabilities or AssetCapabilities).
Returns:
dict: dictionary will the information of all capabilities of the given type.
"""
async with self.lock:
if cap_type == CapabilitySkillOntologyUtils.AGENT_CAPABILITY_TYPE:
return self.capabilities_skills_dict['AgentCapabilities']
if cap_type == CapabilitySkillOntologyUtils.ASSET_CAPABILITY_TYPE:
return self.capabilities_skills_dict['AssetCapabilities']
return {}
# -----------------------------------------
# General methods related to AAS meta-model
# -----------------------------------------
[docs]
async def get_object_by_reference(self, reference):
"""
This method gets the AAS meta-model Python object using the reference, distinguishing between ExternalReference
and ModelReference.
Args:
reference (basyx.aas.model.Reference): reference object related to desired element
Returns:
object: Python object of the desired element associated to the reference.
"""
try:
if isinstance(reference, basyx.aas.model.ExternalReference):
for key in reference.key:
return self.aas_model_object_store.get_identifiable(key.value)
elif isinstance(reference, basyx.aas.model.ModelReference):
return reference.resolve(self.aas_model_object_store)
except KeyError as e:
raise AASModelReadingError("The object within the AAS model with reference {} does not "
"exist".format(reference), sme_class=None, reason='AASModelObjectNotExist')
[docs]
async def get_submodel_elements_by_semantic_id(self, semantic_id_external_ref, sme_class=None):
"""
This method gets all SubmodelElements by the semantic id in form of an external reference. The SubmodelElements
to obtain can be filtered by the meta-model class.
Args:
semantic_id_external_ref (str): semantic id in form of an external reference
sme_class (basyx.aas.model.SubmodelElement): Submodel Element class of the elements to be found (None if no
filtering is required).
Returns:
list(basyx.aas.model.SubmodelElement): list with all SubmodelElements of the given class.
"""
if sme_class is None:
sme_class = basyx.aas.model.SubmodelElement
rels_elements = []
for aas_object in self.aas_model_object_store:
if isinstance(aas_object, basyx.aas.model.Submodel):
for submodel_element in traversal.walk_submodel(aas_object):
if isinstance(submodel_element, sme_class):
if submodel_element.check_semantic_id_exist(semantic_id_external_ref):
rels_elements.append(submodel_element)
if isinstance(submodel_element, basyx.aas.model.Operation):
# In case of Operation, OperationVariables need to be analyzed
rels_elements.extend(submodel_element.get_operation_variables_by_semantic_id(
semantic_id_external_ref))
return rels_elements
[docs]
async def get_submodel_elements_by_semantic_id_list(self, semantic_id_external_refs, sme_class=None):
"""
This method obtains all the SubmodelElements that have any of the given semantic identifiers (in form of an
external references). The SubmodelElements to obtain can be filtered by the meta-model class.
Args:
semantic_id_external_refs (list(str)): semantic identifiers in form of a list of external references
sme_class (basyx.aas.model.SubmodelElement): Submodel Element class of the elements to be found (None if no
filtering is required).
Returns:
list(basyx.aas.model.SubmodelElement): list with all SubmodelElements of the given class.
"""
if sme_class is None:
sme_class = basyx.aas.model.SubmodelElement
rels_elements = []
for aas_object in self.aas_model_object_store:
if isinstance(aas_object, basyx.aas.model.Submodel):
for submodel_element in traversal.walk_submodel(aas_object):
if isinstance(submodel_element, sme_class):
for semantic_id in semantic_id_external_refs:
if submodel_element.check_semantic_id_exist(semantic_id):
rels_elements.append(submodel_element)
if isinstance(submodel_element, basyx.aas.model.Operation):
# In case of Operation, OperationVariables need to be analyzed
rels_elements.extend(submodel_element.get_operation_variables_by_semantic_id(
semantic_id))
return rels_elements
[docs]
async def get_submodel_by_semantic_id(self, sm_semantic_id):
"""
This method gets the Submodel object using its semantic identifier.
Args:
sm_semantic_id (str): semantic identifier of the Submodel.
Returns:
basyx.aas.model.Submodel: Submodel in form of a Python object.
"""
for aas_object in self.aas_model_object_store:
if isinstance(aas_object, basyx.aas.model.Submodel) and aas_object.semantic_id is not None:
a = aas_object.semantic_id
for reference in aas_object.semantic_id.key:
if reference.value == sm_semantic_id:
return aas_object
[docs]
async def check_element_exist_in_namespaceset_by_id_short(self, namespaceset_elem, elem_id_short):
"""
This method checks if an element exists in the NamespaceSet using its id_short.
Args:
namespaceset_elem (basyx.aas.model.NamespaceSet): NamespaceSet element
elem_id_short (str): id_short of the element.
Returns:
bool: result of the check
"""
for namespace_elem in namespaceset_elem:
if namespace_elem.id_short == elem_id_short:
return True
return False
[docs]
async def get_concept_description_pair_value_id_by_value_name(self, concept_description_id, value_name):
"""
This method gets the value_id of a pair within a Concept Description using the value name.
Args:
concept_description_id (str): globally unique identifier of the Concept Description.
value_name (str): name of the value inside the pair to find.
Returns:
str: value_id of the pair that contains the provided value name.
"""
concept_description = self.aas_model_object_store.get_identifiable(concept_description_id)
if concept_description:
# First, it is checked the embedded_data_specifications (the specification of the data inside the element).
if concept_description.embedded_data_specifications:
# Vamos a comprobar que tenga valueList (en esa variable se añaden los posibles valores para una propiedad)
for embedded_data_spec in concept_description.embedded_data_specifications:
if isinstance(embedded_data_spec.data_specification_content,
basyx.aas.model.DataSpecificationIEC61360):
if embedded_data_spec.data_specification_content.value_list:
value_list = embedded_data_spec.data_specification_content.value_list
for value_elem in value_list:
if value_elem.value == 'battery':
# As it is another ConceptDescription, it is a ExternalReference (first key)
return value_elem.value_id.key[0].value
_logger.error("Concept Description with id [{}] not found.".format(concept_description_id))
return None
# ---------------------------------------------------------------
# Methods related to Capability-Skill ontology and AAS meta-model
# ---------------------------------------------------------------
[docs]
async def get_capability_by_id_short(self, cap_type, cap_id_short):
"""
This method gets the capability object with all its information using its id_short attribute and the type of
the Capability.
Args:
cap_type (str): type of the capability (AgentCapabilities or AssetCapabilities).
cap_id_short (str): id_short of the Capability to find.
Returns:
basyx.aas.model.Capability: Python object of capability to find (None if the Capability does not exist)
"""
for cap_elem, cap_info in (await self.get_capability_dict_by_type(cap_type)).items():
if cap_elem.id_short == cap_id_short:
return cap_elem
return None
[docs]
async def get_cap_skill_elem_from_relationship(self, rel_element):
"""
This method returns the Capability and Skill objects from the Relationship element, no matter in which order
they are specified.
Args:
rel_element (basyx.aas.model.RelationshipElement): Python object of the RelationshipElement.
Returns:
basyx.aas.model.Capability, basyx.aas.model.SubmodelElement: capability and skill SME in Python
reference objects.
"""
first_rel_elem = await self.get_object_by_reference(rel_element.first)
second_rel_elem = await self.get_object_by_reference(rel_element.second)
if isinstance(first_rel_elem, basyx.aas.model.Capability):
return first_rel_elem, second_rel_elem
elif isinstance(second_rel_elem, basyx.aas.model.Capability):
return second_rel_elem, first_rel_elem
else:
_logger.error(
"This method has been used incorrectly. This Relationship does not have a Capability element.")
return None, None
[docs]
async def get_elements_from_relationship(self, rel_element, first_elem_class=None, second_elem_class=None):
"""
This method returns the objects of a given Relationship element taking into account the type of class that is
required for the objects referenced within the relationship. The objects will be returned in the order
specified by the classes, no matter in which order they are defined in the AAS model (in the case of not
specifying any class, it is returned in the original order).
Args:
rel_element (basyx.aas.model.RelationshipElement): Python object of the RelationshipElement.
first_elem_class (basyx.aas.model.SubmodelElement): Class required for the first element returned.
second_elem_class (basyx.aas.model.SubmodelElement): Class required for the second element returned.
Returns:
basyx.aas.model.SubmodelElement, basyx.aas.model.SubmodelElement: SME Python objects with the required format.
"""
# Using the references within the relationship, both SubmodelElement are obtained
first_rel_elem = await self.get_object_by_reference(rel_element.first)
second_rel_elem = await self.get_object_by_reference(rel_element.second)
if first_rel_elem is None or second_rel_elem is None:
raise AASModelReadingError("Elements of the relationship {} does not exist in the AAS model"
"".format(rel_element.id_short), sme_class=rel_element,
reason="Relationship referenced element invalid")
if None in (first_rel_elem, second_rel_elem):
# If no one is required, it simply returns the elements
return first_rel_elem, second_rel_elem
if None not in (first_rel_elem, second_rel_elem):
# If both are required, both have to be checked
if isinstance(first_rel_elem, first_elem_class) and isinstance(second_rel_elem, second_elem_class):
return first_rel_elem, second_rel_elem
elif isinstance(first_rel_elem, second_elem_class) and isinstance(second_rel_elem, first_elem_class):
return second_rel_elem, first_rel_elem
else:
raise AASModelReadingError("Elements of the relationship {} are not exist of the required classes {}, "
"{}".format(rel_element.id_short, first_elem_class, second_elem_class)
, sme_class=rel_element, reason="Relationship referenced element invalid")
if first_elem_class is not None:
if isinstance(first_rel_elem, first_elem_class):
return first_rel_elem, second_rel_elem
elif isinstance(second_rel_elem, first_elem_class):
return second_rel_elem, first_rel_elem
else:
raise AASModelReadingError("The element {} within the relationship {} is not of the required class "
"{}".format(first_rel_elem, rel_element.id_short, first_elem_class),
sme_class=rel_element, reason="Relationship referenced element invalid")
if second_elem_class is not None:
if isinstance(first_rel_elem, second_elem_class):
return second_rel_elem, first_rel_elem
elif isinstance(second_rel_elem, second_elem_class):
return first_rel_elem, second_rel_elem
else:
raise AASModelReadingError("The element {} within the relationship {} is not of the required class "
"{}".format(second_rel_elem, rel_element.id_short, second_elem_class),
sme_class=rel_element, reason="Relationship referenced element invalid")
[docs]
async def get_capability_associated_constraints(self, capability_elem):
"""
This method gets the constraints associated to a capability.
Args:
capability_elem (basyx.aas.model.Capability): capability Python object.
Returns:
list: list with all constraints of the selected capability in form of Python objects.
"""
cap_constraints = []
rels_cap_constraints = await self.get_submodel_elements_by_semantic_id(
CapabilitySkillOntologyInfo.CSS_ONTOLOGY_PROP_ISRESTRICTEDBY_IRI, basyx.aas.model.RelationshipElement)
for rel in rels_cap_constraints:
first_elem = await self.get_object_by_reference(rel.first)
second_elem = await self.get_object_by_reference(rel.second)
if first_elem == capability_elem:
cap_constraints.append(second_elem)
elif second_elem == capability_elem:
cap_constraints.append(first_elem)
return cap_constraints
[docs]
async def get_capability_associated_constraints_by_qualifier_data(self, capability_elem, qualifier_type,
qualifier_value):
"""
This method gets the constraints associated to a capability that have specific qualifier data.
Args:
capability_elem (basyx.aas.model.Capability): capability Python object.
qualifier_type (str): type of the qualifier
qualifier_value (str): value of the qualifier
Returns:
list: list with all constraints of the selected capability in form of Python objects.
"""
all_constraints = await self.get_capability_associated_constraints(capability_elem)
for constraint in all_constraints:
if constraint.qualifier:
for qualifier in constraint.qualifier:
if (qualifier.type == qualifier_type) and (qualifier.value == qualifier_value):
return constraint
return None
[docs]
async def check_skill_elem_by_capability(self, cap_type, cap_elem, skill_data):
"""
This method checks if a skill SubmodelElement is defined associated to a Capability using given skill data.
Args:
cap_type (str): type of the capability (AgentCapabilities or AssetCapabilities).
cap_elem (basyx.aas.model.Capability): capability Python object.
skill_data (dict): data of the skill.
Returns:
bool: result of the check (True if the skill is in the global variable, linked to the capability)
"""
# First, the skill id_short will be checked
if CapabilitySkillACLInfo.REQUIRED_SKILL_NAME not in skill_data:
_logger.warning(f"The received data is invalid due to missing the required field #{CapabilitySkillACLInfo.REQUIRED_SKILL_NAME}")
raise CapabilityCheckingError(cap_elem.id_short,
f"The received data is invalid due to missing the required field #{CapabilitySkillACLInfo.REQUIRED_SKILL_NAME}")
required_skill_name = skill_data[CapabilitySkillACLInfo.REQUIRED_SKILL_NAME]
skill_elem = (await self.get_capability_dict_by_type(cap_type))[cap_elem]['skillObject']
if skill_elem.id_short != required_skill_name:
_logger.warning("The given skill does not exist in this DT.")
raise CapabilityCheckingError(cap_elem.id_short, "The given skill does not exist in this DT.")
# Then, the skill SubmodelElement type will be checked
if CapabilitySkillACLInfo.REQUIRED_SKILL_ELEMENT_TYPE not in skill_data:
_logger.warning("The given data does not contain required skill SubmodelElement type.")
raise CapabilityCheckingError(cap_elem.id_short,
"The given data does not contain required skill SubmodelElement type.")
required_skill_sme_type = skill_data[CapabilitySkillACLInfo.REQUIRED_SKILL_ELEMENT_TYPE]
if skill_elem.__class__.__name__ != required_skill_sme_type:
_logger.warning("The given skill SubmodelElement type is not the same as the element exists in this DT.")
raise CapabilityCheckingError(cap_elem.id_short,
"The given skill SubmodelElement type is not the same as the element exists in this DT.")
# The skill parameters will be also checked
# TODO PROXIMO PASO: PENSAR COMO SE HARIA CON VARIOS PARAMETROS (una opcion es definir en el JSON 'inputs' como
# listas y que haya que recorrer cada uno de los parametros, tanto de entrada como de salida. Habra que
# comprobar que todos existen como elemento Python buscandolos por el id_short. En el caso de que alguno no
# esté, el checking falla)
if CapabilitySkillACLInfo.REQUIRED_SKILL_PARAMETERS not in skill_data:
_logger.warning("The given data does not contain required skill parameters information.")
raise CapabilityCheckingError(cap_elem.id_short,
"The given data does not contain required skill parameters information.")
required_skill_parameters = skill_data[CapabilitySkillACLInfo.REQUIRED_SKILL_PARAMETERS]
if 'input' in required_skill_parameters:
if await self.check_element_exist_in_namespaceset_by_id_short(skill_elem.input_variable,
required_skill_parameters['input']) is False:
# if required_skill_parameters['input'] != skill_elem.input_variable.id_short:
_logger.warning("The given skill does not have the same input parameter as the element exists in this DT.")
raise CapabilityCheckingError(cap_elem.id_short,
"The given skill does not have the same input parameter as the element exists in this DT.")
if 'output' in required_skill_parameters:
if await self.check_element_exist_in_namespaceset_by_id_short(skill_elem.output_variable,
required_skill_parameters['output']) is False:
# if required_skill_parameters['output'] != skill_elem.output_variable.id_short:
_logger.warning(
"The given skill does not have the same output parameter as the element exists in this DT.")
raise CapabilityCheckingError(cap_elem.id_short,
"The given skill does not have the same output parameter as the element exists in this DT.")
# When all checks have been passed, the given skill is valid
return True
[docs]
async def get_skill_data_by_capability(self, required_cap_elem, required_data):
"""
This method gets the Skill SubmodelElement of a given Capability element.
Args:
required_cap_elem (basyx.aas.model.Capability): capability Python object.
required_data (str): required data of the skill elem (Python object or SkillInterface element)
Returns:
basyx.aas.model.SubmodelElement: skill SME in form of Python object (None if the Capability does not exist).
"""
for cap_info_dict in self.capabilities_skills_dict.values():
for cap_elem, cap_info in cap_info_dict.items():
if cap_elem == required_cap_elem:
return cap_info[required_data]
return None
[docs]
async def get_skill_interface_by_skill_elem(self, skill_elem):
"""
This method gets the interfaces associated to a skill.
Args:
skill_elem (basyx.aas.model.SubmodelElement): skill Python object in form of a SubmodelElement.
Returns:
(basyx.aas.model.SubmodelElement): the interface of the selected skill in form of Python object (None if it does not exist).
"""
rels_skill_interfaces = await self.get_submodel_elements_by_semantic_id(
CapabilitySkillOntologyInfo.CSS_ONTOLOGY_PROP_ACCESSIBLETHROUGH_IRI, basyx.aas.model.RelationshipElement)
for rel in rels_skill_interfaces:
first_elem = await self.get_object_by_reference(rel.first)
second_elem = await self.get_object_by_reference(rel.second)
if first_elem == skill_elem:
return second_elem
elif second_elem == skill_elem:
return first_elem
return None
[docs]
async def get_skill_parameters_exposure_interface_elem(self, skill_elem):
"""
This method gets the exposure element within the skill interface linked to the parameters of the given skill.
Args:
skill_elem (basyx.aas.model.SubmodelElement): skill Python object in form of a SubmodelElement.
Returns:
basyx.aas.model.SubmodelElement: exposure submodel element of skill parameters.
"""
# The exposure elements can be obtained with the related relationship semanticID
rels_params_exposed = await self.get_submodel_elements_by_semantic_id(
CapabilitySkillOntologyUtils.SEMANTICID_REL_SKILL_PARAMETER_SKILL_INTERFACE, basyx.aas.model.RelationshipElement)
for rel in rels_params_exposed:
first_elem = await self.get_object_by_reference(rel.first)
second_elem = await self.get_object_by_reference(rel.second)
if first_elem == skill_elem:
return second_elem
elif second_elem == skill_elem:
return first_elem
return None
[docs]
async def capability_checking_from_acl_request(self, required_capability_data):
"""
This method checks if the DT has a capability required in an ACL message.
Args:
required_capability_data (dict): all the information about the required capability
Returns:
bool: result of the check (True if the DT can perform the capability)
reason: None if the result of the check is True (the reason of the Failure if it is False)
"""
required_cap_name = required_capability_data[CapabilitySkillACLInfo.REQUIRED_CAPABILITY_NAME]
required_cap_type = required_capability_data[CapabilitySkillACLInfo.REQUIRED_CAPABILITY_TYPE]
# It will be checked if the type is among the available options
if required_cap_type not in CapabilitySkillOntologyUtils.CAPABILITY_TYPE_POSSIBLE_VALUES:
_logger.warning("The required capability does not have a valid type.")
raise CapabilityCheckingError(required_cap_name, "The received capability does not have a valid type.")
# It will be checked if the capability is among the defined in this DT
capability_elem = await self.get_capability_by_id_short(required_cap_type, required_cap_name)
if capability_elem is None:
_logger.warning("A capability has been requested that this DT does not have.")
raise CapabilityCheckingError(required_cap_name,
"A capability has been requested that this DT does not have.")
# TODO quedan por analizar las constraints
# TODO PROXIMO PASO: para analizar las constraint, simplemente se comprobará si el nombre de la constraint se
# ha definido en la capacidad requerida (en este paso no se ejecuta la capacidad, por lo que las constraints no se analizan)
# It will be also checked the skill id_short
required_skill_data = required_capability_data[CapabilitySkillACLInfo.REQUIRED_SKILL_INFO]
if await self.check_skill_elem_by_capability(required_cap_type, capability_elem, required_skill_data) is False:
_logger.warning("A capability has been requested that its skill does not exist in this DT.")
raise CapabilityCheckingError(required_cap_name,
"A capability has been requested that its skill does not exist in this DT.")
# When all checks have been passed, the given skill is valid
return True
[docs]
async def skill_feasibility_checking_post_conditions(self, capability_elem, constraints_data):
"""
This method checks the feasibility of a Capability element in relation with its post-conditions.
Args:
capability_elem (basyx.aas.model.Capability): capability Python object.
constraints_data (dict): JSON object with the data of the constraints (with required values)
"""
# First, the postcondition constraints are obtained
post_condition_constraints = await self.get_capability_associated_constraints_by_qualifier_data(capability_elem,
CapabilitySkillOntologyUtils.QUALIFIER_FEASIBILITY_CHECKING_TYPE,
'POSTCONDITION')
if post_condition_constraints:
# TODO habra que pensar como analizar las post condiciones
pass