Source code for smia.logic.intra_aas_interactions_utils
"""This class groups the methods related to the Intra AAS interactions between the Manager and the Core."""importjsonimportloggingfromdatetimeimportdatetimefromaiokafkaimportAIOKafkaConsumer,TopicPartitionfromsmia.utilities.general_utilsimportSMIAGeneralInfofromsmia.utilities.smia_archive_utilsimportfile_to_json,update_json_filefromsmia.utilities.kafka_infoimportKafkaInfo_logger=logging.getLogger(__name__)# ----------------------------# Methods related to responses# ----------------------------
[docs]defsave_svc_info_in_log_file(requested_entity,svc_type_log_file_name,interaction_id_num):""" This method saves the information of a service request in the associated log file. Args: requested_entity (str): The entity that has requested the service, to know in which interaction files to search. svc_type_log_file_name (str): The log file name of the service type. interaction_id_num (int): Identifier of the interaction. """# Get the information about the request and responsesvc_request_info,svc_response_info=None# TODO falta pensar como conseguirlo, ya no hay interactionID# Create the JSON structurelog_structure={'level':'INFO','interactionID':interaction_id_num,'serviceStatus':svc_response_info['serviceStatus'],'serviceInfo':{'serviceID':svc_request_info['serviceID'],'serviceType':svc_request_info['serviceType'],'requestTimestamp':str(datetime.fromtimestamp(svc_request_info['serviceData']['timestamp'])),'responseTimestamp':str(datetime.fromtimestamp(svc_response_info['serviceData']['timestamp']))}}# If some data has been requested, added to the structuraif'requestedData'insvc_request_info['serviceData'].keys():requested_data_name=svc_request_info['serviceData']['requestedData']svc_data_json={'requestedData':requested_data_name,'dataValue':svc_response_info['serviceData'][requested_data_name]}log_structure['serviceInfo']['serviceData']=svc_data_json# Get the content of LOG filelog_file_json=file_to_json(SMIAGeneralInfo.SVC_LOG_FOLDER_PATH+'/'+svc_type_log_file_name)# Add the structure in the filelog_file_json.append(log_structure)update_json_file(file_path=SMIAGeneralInfo.SVC_LOG_FOLDER_PATH+'/'+svc_type_log_file_name,content=log_file_json)_logger.info("Service information related to interaction "+str(interaction_id_num)+" added in log file.")
# ------------------------# Methods related to Kafka# ------------------------
[docs]defcreate_interaction_kafka_consumer(client_id):""" This method creates the Kafka consumer for subscribing to AAS Core partition in the topic of the AAS. Args: client_id (str): the id of the client of the Kafka consumer. Returns: AIOKafkaConsumer: the object of the Kafka consumer. """kafka_consumer_core_partition=AIOKafkaConsumer(bootstrap_servers=[KafkaInfo.KAFKA_SERVER_IP+':9092'],client_id=client_id,value_deserializer=lambdax:json.loads(x.decode('utf-8')),enable_auto_commit=True,group_id='i4-0-smia-managers')kafka_consumer_core_partition.assign([TopicPartition(KafkaInfo.KAFKA_TOPIC,KafkaInfo.CORE_TOPIC_PARTITION)])returnkafka_consumer_core_partition