Source code for pyion.proxies

""" 
# ===========================================================================
# Interface between the ION and Python. Provides ``Proxy`` object to connect
# to ION.
#
# Author: Marc Sanchez Net
# Date:   04/17/2019
# Copyright (c) 2019, California Institute of Technology ("Caltech").  
# U.S. Government sponsorship acknowledged.
# ===========================================================================
"""

import os
import signal
from pathlib import Path
from time import sleep
# General imports
from unittest.mock import Mock
from warnings import warn

# Module imports
import pyion
import pyion.bp as bp
import pyion.cfdp as cfdp
import pyion.constants as cst
import pyion.ltp as ltp
import pyion.mem as mem
import pyion.utils as utils

# Import C Extensions
import _bp
import _cfdp
import _ltp

# Define all methods/vars exposed at pyion
__all__ = ['get_bp_proxy', 'get_cfdp_proxy', 'get_ltp_proxy', 'get_sdr_proxy',
           'get_psm_proxy']

# ============================================================================
# === Singleton of proxies
# ============================================================================

# Map of proxies to BP defined as {node_nbr: BpProxy}
_bp_proxies = {}

# Map of proxies to CFDP defined as {peer_entity_nbr: CfdpProxy}
_cfdp_proxies = {}

# Map of proxies to LTP defined as {client_id: LtpProxy}
_ltp_proxies = {}

# Map of proxies to the SDR defined as {(node_nbr, sdr_name): SdrProxy}
_sdr_proxies = {}

# Map of proxies to the PSM defined as {(node_nbr, wm_key): PsmProxy}
_psm_proxies = {}

[docs] def get_bp_proxy(node_nbr): """ Returns a BpProxy for a given node number. If it already exists, it gives you the already instantiated copy. :param: Node number :return: BpProxy object """ global _bp_proxies proxy = utils._register_proxy(_bp_proxies, str(node_nbr), BpProxy, node_nbr) proxy.bp_attach() return proxy
[docs] def get_cfdp_proxy(peer_entity_nbr): """ Returns a CfdpProxy for a given peer entity number. If it already exists, it gives you the already instantiated copy. :param: Peer entity number :return: CfdpProxy object """ global _cfdp_proxies proxy = utils._register_proxy(_cfdp_proxies, str(peer_entity_nbr), CfdpProxy, peer_entity_nbr) proxy.cfdp_attach() return proxy
[docs] def get_ltp_proxy(client_id): """ Returns an LtpProxy for a given client application. If it already exists, it gives you the already instantiated copy. :param: Client id (number) :return: LtpProxy object """ global _ltp_proxies proxy = utils._register_proxy(_ltp_proxies, str(client_id), LtpProxy, client_id) proxy.ltp_attach() return proxy
[docs] def get_sdr_proxy(node_nbr): """ Return an SdrProxy for a given client application. If it already exists, it gives you th ealready instantiated copy :param: Node number :return: SdrProxy object """ global _sdr_proxies return utils._register_proxy(_sdr_proxies, node_nbr, mem.SdrProxy, node_nbr)
[docs] def get_psm_proxy(node_nbr): """ Return a PsmProxy for a given client application. If it already exists, it gives you th ealready instantiated copy :param node_nbr: Node number :return: PsmProxy object """ global _psm_proxies return utils._register_proxy(_psm_proxies, node_nbr, mem.PsmProxy, node_nbr)
def shutdown(): """ Shutdowns pyion: All endpoints, access points, etc. """ # Iterate through all BP endpoints and close them print('Closing all BP endpoints... ', end='') for proxy in _bp_proxies.values(): proxy.bp_close_all() print('DONE') # Iterate through all CFDP entities and close them print('Closing all CFDP entities... ', end='') for proxy in _cfdp_proxies.values(): proxy.cfdp_cancel_all() proxy.cfdp_close_all() print('DONE') # Iterate through all LTP access points and close them print('Closing all LTP access points...', end='') for proxy in _ltp_proxies.values(): proxy.ltp_interrupt_all() proxy.ltp_close_all() print('DONE') # Iterate through all SDR proxies and close them print('Closing all SDR proxies...', end='') for proxy in _sdr_proxies.values(): proxy.close() print('DONE') # Iterate through all PSM proxies and close them print('Closing all PSM proxies...', end='') for proxy in _psm_proxies.values(): proxy.close() print('DONE') # ============================================================================ # === Capture the SIGNIT and trigger cleanup # ============================================================================ def pyion_sigint_handler(sig, frame): """ Default SIGINT handler. It closes all endpoints for all proxies and then detaches them from ION. """ print('[pyion] KeyboardInterruption... ') shutdown() # Register the signal handler function. If another one was provided previously, # do not override it, combine it with pyion's prev_sigint_handler = signal.getsignal(signal.SIGINT) if prev_sigint_handler is None: signal.signal(signal.SIGINT, pyion_sigint_handler) else: def combined_sigint_handler(sig, frame): pyion_sigint_handler(sig, frame) prev_sigint_handler(sig, frame) signal.signal(signal.SIGINT, combined_sigint_handler) # ============================================================================ # === Proxy to BP in ION for a given node # ============================================================================ class BpProxy(utils.Proxy): """ Proxy to the ION enginer running in this host. Do not instantiate manually, use ``pyion.get_bp_proxy`` instead. Private Variables ----------------- :ivar _ept_map: {'ipn:1.1': Endpoint_1, 'ipn:1.2': Endpoint_2, ...} """ def __init__(self, node_nbr): """ Initializes the proxy, checks that the ION environment variables are valid, and attaches to the ION engine. :param int node_nbr: Node number for this proxy """ global _bp_proxies # If a proxy for this node already exists, raise error if node_nbr in _bp_proxies: raise KeyError('A proxy for node {} already exists. Use ``pyion.get_bp_proxy`` \ to retrieve it'.format(node_nbr)) # Call parent constructor super().__init__(node_nbr) # Map {eid: Endpoint} self._ept_map = {} def __del__(self): """ Close all Endpoints associated with this proxy """ global _bp_proxies self.bp_close_all() self.bp_detach() utils._unregister_proxy(_bp_proxies, self.node_nbr) def is_endpoint_open(self, eid): """ Check if this Proxy thinks a given EID is currently opened :param EID: EID :return: True if it is open """ return eid in self._ept_map @property def open_endpoints(self): """ List all EIDs that are this Proxy sees open :return: Tuple of EIDS """ return tuple(getattr(self, '_ept_map', {}).values()) @utils.in_ion_folder def bp_attach(self): """ Attach to ION """ # Attach to ION instance _bp.bp_attach() # Mark as attached to ION self.attached = True @utils.in_ion_folder def bp_detach(self): """ Dettach from ION """ # Detach from ION instance _bp.bp_detach() # Mark as detached from ION self.attached = False @utils._chk_attached @utils.in_ion_folder def bp_open(self, eid, TTL=3600, priority=cst.BpPriorityEnum.BP_STD_PRIORITY, report_eid=None, custody=cst.BpCustodyEnum.NO_CUSTODY_REQUESTED, report_flags=cst.BpReportsEnum.BP_NO_RPTS, ack_req=cst.BpAckReqEnum.BP_NO_ACK_REQ, retx_timer=0, chunk_size=None, timeout=None, mem_ctrl=False): """ Open an endpoint. If it already exists, the existing instance is returned. .. Tip:: The parameters specified when opening the endpoint are used as default values. When calling ``bp_send``, they can be overriden. .. Tip:: To see longer explanation for these parameters, see ``man bp`` in shell terminal :param EID: ID of the enpoint being opened :param TTL: Time-to-live [sec] for all bundles through this endpoint. Default is 3600 seconds :param priority: Class of service for all bundles through this endpoint. Default is ``BpPriorityEnum.BP_STD_PRIORITY`` :param report_eid: Endpoint id where report bundles will be sent to :param custody: Type of custody required for the bundles issued by this endpoint. Default is ``BpCustodyEnum.NO_CUSTODY_REQUIRED`` :param report_flags: Flag indicating which reports to request for a bundle. Default is ``BpReportsEnum.BP_NO_RPTS``. Example ``BpReportsEnum.BP_RECEIVED_RPT | BpReportsEnum.BP_CUSTODY_RPT``. :param ack_req: If True, it indicates that the application on top of BP is expecting to receive and end-to-end acknowledgement. Default is ``BpAckReqEnum.BP_NO_ACK_REQ`` :param retx_timer: Custodial timer retransmission in [sec]. Defaults to 0, which means that no timer is created. :param chunk_size: Send/Receive data in bundles of ``chunk_size`` bytes (plus header), instead of a single potentially very large bundle. :param timeout: If specified, the endpoint will be interrupted if timeout seconds elapse without receiving anything. :return: Endpoint object """ # If this EID is already open, return it if self.is_endpoint_open(eid): return self._ept_map[eid] # Detect if this endpoint "detains" bundles (see bp_open_source in ION manual) detained = (retx_timer > 0) # Open EID in ION. Get the address of the BpSapState as a long. If retx_timer>0, then # open the endpoint in "detained mode" (see bp_open_source in ION manual). sap_addr = _bp.bp_open(eid, int(detained), int(mem_ctrl)) # Create an endpoint ept_obj = bp.Endpoint(self, eid, sap_addr, TTL, int(priority), report_eid, int(custody), int(report_flags), int(ack_req), int(retx_timer), detained, chunk_size, timeout, mem_ctrl) # Store it self._ept_map[eid] = ept_obj # Return it return ept_obj @utils._chk_attached @utils.in_ion_folder def bp_close(self, ept): """ Close an endpoint. If it is not open, ``ConnectionError`` is raised. :param: Endpoint object to close """ # If object passed is not endpoint, fail if not isinstance(ept, bp.Endpoint): raise ValueError('Expected an Enpoint instance, '+ str(type(ept))) # Interrupt this endpoint first. try: self.bp_interrupt(ept) except ConnectionError: raise ConnectionError('Cannot close endpoint {}. It is not open.'.format(ept.eid)) # Get SAP address in memory. ept_obj = self._ept_map.pop(ept.eid) # If already closed, you are done if not ept_obj.is_open: return # Close EID in ION _bp.bp_close(ept_obj._sap_addr) # Mark object as inactive ept_obj._cleanup() @utils._chk_attached @utils.in_ion_folder def bp_interrupt(self, ept): """ Interrupt and endpoint while receiving data. If it is not open, ``ConnectionError`` is raised. If this endpoint is not blocked receiving, this call has no effect. :param: Endpoint object to close """ # If object passed is not endpoint, fail if not isinstance(ept, bp.Endpoint): raise ValueError('Expected an Enpoint instance') # If this EID is not open, return if not self.is_endpoint_open(ept.eid): raise ConnectionError('Cannot interrupt endpoint {}. It is not open.'.format(ept.eid)) # Interrupt endpoint in ION _bp.bp_interrupt(ept._sap_addr) # This is little hack to ensure that SIGINT prints "interrupted" as opposed # to raising ConnectionAbortedError. However, this is not guaranteed sleep(0.001) def bp_close_all(self): """ Close all opened endpoints """ # Close all endpoints for ept in self.open_endpoints: self.bp_close(ept) def bp_interrupt_all(self): """ Interrupt all opened endpoints """ # Close all endpoints for ept in self.open_endpoints: self.bp_interrupt(ept) # ============================================================================ # === Proxy to CFDP engine in ION for a given node # ============================================================================ class CfdpProxy(utils.Proxy): def __init__(self, node_nbr): # Call parent constructor super().__init__(node_nbr) # Map {entity_nbr: Entity} self._ett_map = {} def __del__(self): """ Close all Endpoints associated with this proxy """ global _cfdp_proxies self.cfdp_close_all() self.cfdp_detach() utils._unregister_proxy(_cfdp_proxies, self.node_nbr) def is_entity_open(self, peer_entity_nbr): """ Check if this CfdpProxy thinks a given Entity is currently opened :param EID: EID :return: True if it is open """ return peer_entity_nbr in self._ett_map @property def open_entities(self): """ List all EIDs that are this Proxy sees open :return: Tuple of EIDS """ return tuple(getattr(self, '_ett_map', {}).keys()) @utils.in_ion_folder def cfdp_attach(self): """ Attach to ION's CFDP engine """ # Attach to ION instance _cfdp.cfdp_attach() # Mark as attached to ION self.attached = True @utils.in_ion_folder def cfdp_detach(self): """ Dettach from ION """ # Detach from ION instance _cfdp.cfdp_detach() # Mark as detached from ION self.attached = False @utils._chk_attached @utils.in_ion_folder def cfdp_open(self, peer_entity_nbr, endpoint, mode=cst.CfdpMode.CFDP_BP_RELIABLE, closure_latency=cst.CfdpClosure.CFDP_NO_CLOSURE, seg_metadata=cst.CfdpMetadataEnum.CFDP_NO_SEG_METADATA): """ Open a CFDP entity :param peer_entity_nbr: :param endpoint: Enpoint object through which the CFDP will run :param mode: Reliable vs. Non-reliable. See all options in ``pyion.constants`` :param closure_latency: See ``help(CfdpClosure)`` :param seg_metadata: """ # If already open, return it if peer_entity_nbr in self._ett_map: return self._ett_map[peer_entity_nbr] # Open a proxy to the CFDP engine param_addr = _cfdp.cfdp_open( peer_entity_nbr, endpoint.TTL, endpoint.priority, endpoint.sub_priority, endpoint.report_flags, endpoint.criticality, ) # Create a CFDP entity object ett_obj = cfdp.Entity(self, peer_entity_nbr, param_addr, endpoint, int(mode), int(closure_latency), int(seg_metadata)) # Store it self._ett_map[peer_entity_nbr] = ett_obj # Return it return ett_obj @utils._chk_attached @utils.in_ion_folder def cfdp_close(self, peer_entity_nbr): """ Close a CFDP entity :param peer_entity_nbr: """ # Get SAP address in memory. ett_obj = self._ett_map.pop(peer_entity_nbr) # If already closed, you are done if not ett_obj.is_open: return # Close Entity in ION _cfdp.cfdp_close(ett_obj._param_addr) # Mark object as inactive ett_obj._cleanup() def cfdp_close_all(self): """ Close all open entities in this node """ for peer_entity_nbr in self.open_entities: self.cfdp_close(peer_entity_nbr) def cfdp_cancel_all(self): """ Cancel any transaction in all entities in this node """ for peer_nbr in self.open_entities: self._ett_map[peer_nbr].cfdp_cancel() # ============================================================================ # === Proxy to LTP engine in ION for a given node # ============================================================================ class LtpProxy(utils.Proxy): def __init__(self, node_nbr): # Call parent constructor super().__init__(node_nbr) # Map {cliend id: AccessPoint} self._sap_map = {} def __del__(self): """ Close all access points associated with this proxy """ global _ltp_proxies self.ltp_close_all() self.ltp_detach() utils._unregister_proxy(_ltp_proxies, self.node_nbr) def is_client_open(self, client_id): """ Check if this LtpProxy thinks a given client is already using the LTP engine :param client_id: The id of the client application (int) :return: True if it is open """ return client_id in self._sap_map @property def open_clients(self): """ List all client ids that are this Proxy sees open :return: Tuple of client ids """ return tuple(getattr(self, '_sap_map', {}).keys()) @utils.in_ion_folder def ltp_attach(self): """ Attach to ION's LTP engine """ # Attach to ION instance _ltp.ltp_attach() # Mark as attached to ION self.attached = True @utils.in_ion_folder def ltp_detach(self): """ Dettach from ION """ # Detach from ION's LTP engine _ltp.ltp_detach() # Mark as detached from ION self.attached = False @utils._chk_attached @utils.in_ion_folder def ltp_open(self, client_id): """ Open a service access point for a client :param client_id: """ # If already open, return it if client_id in self._sap_map: return self._sap_map[client_id] # Open ltp access point sap_addr = _ltp.ltp_open(client_id) # Create an LTP Access Point sap_obj = ltp.AccessPoint(self, client_id, sap_addr) # Store it self._sap_map[client_id] = sap_obj # Return it return sap_obj @utils._chk_attached @utils.in_ion_folder def ltp_close(self, client_id): """ Close an LTP access point :param client_id: """ # Get SAP address in memory. sap_obj = self._sap_map.pop(client_id) # If already closed, you are done if not sap_obj.is_open: return # Close Entity in ION _ltp.ltp_close(sap_obj._sap_addr) # Mark object as inactive sap_obj._cleanup() def ltp_close_all(self): """ Close all open clients in this proxy """ for client_id in self.open_clients: self.ltp_close(client_id) def ltp_interrupt_all(self): """ Interrupt any LTP transactions in all clients in this proxy """ for client_id in self.open_clients: self._sap_map[client_id].ltp_interrupt() # ============================================================================ # === EOF # ============================================================================