Source code for pyion.cfdp

""" 
# ===========================================================================
# Interface between the ION's implementation of the CCSDS CFDP and Python. 
# Internally, all classes call the C Extension _cfdp.
#
# Author: Marc Sanchez Net
# Date:   04/17/2019
# Copyright (c) 2019, California Institute of Technology ("Caltech").  
# U.S. Government sponsorship acknowledged.
# ===========================================================================
"""

# General imports
from unittest.mock import Mock
from pathlib import Path
from threading import Event, Thread
from warnings import warn

# Module imports
import pyion
import pyion.utils as utils
from pyion.constants import CfdpEventEnum

# Import C Extension
import _cfdp

# Define all methods/vars exposed at pyion
__all__ = ['Entity']

# ============================================================================
# === Entity class
# ============================================================================

[docs] class Entity(): """ Class to represent a CFDP entity. """ def __init__(self, proxy, peer_entity_nbr, param_addr, endpoint, mode, closure_latency, seg_metadata): """ Class initializer """ # Store variables self.proxy = proxy self.entity_nbr = peer_entity_nbr self.node_dir = proxy.node_dir self._param_addr = param_addr self.endpoint = endpoint self.mode = mode self.closure_lat = closure_latency self.seg_metadata = seg_metadata # Map {event_id}: event handler function self.event_handers = {} # Create a condition to signal end of transaction self._end_transaction = Event() self._ok_transaction = False # Start a thread to monitor all events self.th = Thread(target=self._monitor_events, daemon=False) self.th.start() def __del__(self): # If you have already been closed, return if not self.is_open: return # Close the Entity and free C memory self.proxy.cfdp_close(self.entity_nbr) @property def is_open(self): """ Returns True if entity is opened """ return (self.proxy is not None and self._param_addr is not None) def _cleanup(self): """ Clean entity after closing. Do not call directly, use ``proxy.cfdp_close`` """ # Clear variables self.proxy = None self.entity_nbr = None self._param_addr = None self.endpoint = None # Stop the thread monitoring CFDP events _cfdp.cfdp_interrupt_events() # Wait until the event thread has finished self.th.join() # Mark end of transactions to wake up threads self._mark_transaction_end(False)
[docs] @utils._chk_is_open @utils.in_ion_folder def cfdp_send(self, source_file, dest_file=None, mode=None, closure_lat=None, seg_metadata=None): """ Send a file using CFDP to the peer engine for this entity :param source_file: str or Path of file to send :param dest_file: str or Path. Name of file at receiving engine. It defaults to source_file :param **kwargs: See ``proxy.cfdp_send`` """ # Create path object src_file = Path(source_file) # Check that the source file exists if not src_file.exists(): raise FileNotFoundError('Source file {} does not exist'.format(src_file)) # Initialize variables src_file = src_file.resolve().absolute() # Set default values dst_file = source_file if dest_file is None else dest_file # Set default values if necessary if mode is None: mode = self.mode if closure_lat is None: closure_lat = self.closure_lat if seg_metadata is None: seg_metadata = self.seg_metadata # Mark that the current transaction has not succeeded yet self._ok_transaction = False # Trigger CFDP send _cfdp.cfdp_send(self._param_addr, str(src_file), str(dst_file), closure_lat, seg_metadata, mode)
[docs] @utils._chk_is_open @utils.in_ion_folder def cfdp_request(self, source_file, dest_file=None, mode=None, closure_lat=None, seg_metadata=None): """ Request a file to be sent to this node using CFDP :param source_file: str or Path of file to request :param dest_file: str or Path. Name of file at this node once it is received. Defaults to ``source_file`` :param **kwargs: See ``proxy.cfdp_send`` """ # Set default values if necessary if mode is None: mode = self.mode if closure_lat is None: closure_lat = self.closure_lat if seg_metadata is None: seg_metadata = self.seg_metadata # Mark that the current transaction has not succeeded yet self._ok_transaction = False # Trigger CFDP request _cfdp.cfdp_request(self._param_addr, str(source_file), str(dest_file), closure_lat, seg_metadata, mode)
[docs] @utils._chk_is_open @utils.in_ion_folder def cfdp_cancel(self): """ Cancel the current CFDP transaction """ _cfdp.cfdp_cancel(self._param_addr)
[docs] @utils._chk_is_open @utils.in_ion_folder def cfdp_suspend(self): """ Suspend the current CFDP transaction """ _cfdp.cfdp_suspend(self._param_addr)
[docs] @utils._chk_is_open @utils.in_ion_folder def cfdp_resume(self): """ Resume the current CFDP transaction """ _cfdp.cfdp_resume(self._param_addr)
[docs] @utils._chk_is_open @utils.in_ion_folder def cfdp_report(self): """ Request issuance on the transmission/reception progress of the current CFDP transaction. """ _cfdp.cfdp_report(self._param_addr)
[docs] @utils._chk_is_open def add_usr_message(self, msg): """ Add a user message to all CFDP PDUs in the next transaction :param str: User message to add """ _cfdp.cfdp_add_usr_msg(self._param_addr, msg)
[docs] @utils._chk_is_open def add_filestore_request(self, action, file1, file2=None): """ Add a user message to all CFDP PDUs in the next transaction :param action: See ``pyion.CFDP_CREATE_FILE``, etc. :param file1: String or Path-object :param file2: None, string or Path-object. """ # Transform files to string file1 = str(file1) if file2 is not None: file2 = str(file2) # Call pyion function _cfdp.cfdp_add_filestore_request(self._param_addr, action, file1, file2)
[docs] def register_event_handler(self, event, func): """ Register and event handler for this entity :param event: See ``pyion.constants.CFDP_CREATE_FILE``, etc. :param func: Function handle with the following signature ``def ev_handler(params)``, where params is a dictionary with contents that depend on the type of event (see CCSDS CFDP spec, section 3.5.6) """ self.event_handers[event] = func
[docs] def wait_for_transaction_end(self, timeout=None): """ Blocks the calling thread until the transaction has finished or the timeout expires. :param timeout: Time to wait in [seconds] :return: True if transaction finished successfully """ # Wait for the end of the transaction self._end_transaction.wait(timeout=timeout) # Return state of transaction return self._ok_transaction
def _mark_transaction_end(self, success): """ Mark that the current CFDP transaction has ended :param success: True/False """ # Mark the transaction as successful self._ok_transaction = success # Awake all threads that were waiting self._end_transaction.set() # Reset the event self._end_transaction.clear() def _monitor_events(self): """ Monitor all CFDP events """ while self.is_open: # Get the next event evt, ev_params = _cfdp.cfdp_next_event() # Create event type class from integer code evt = CfdpEventEnum(evt) # If no event, continue if evt == CfdpEventEnum.CFDP_NO_EVENT: continue # If you have an event handler for this event type, use it try: func = self.event_handers[evt] func(evt, ev_params) except KeyError: pass # If you have a handler for all events, use it try: func = self.event_handers[CfdpEventEnum.CFDP_ALL_IND] func(evt, ev_params) except KeyError: pass # If transaction finished ok, report it if evt == CfdpEventEnum.CFDP_TRANSACTION_FINISHED_IND: self._mark_transaction_end(True) # If transaction failed, report it if evt == CfdpEventEnum.CFDP_ABANDONED_IND: self._mark_transaction_end(False) def __enter__(self): """ Allows an endpoint to be used as context manager """ return self def __exit__(self, exc_type, exc_val, exc_tb): """ Allows an endpoint to be used as context manager """ pass def __str__(self): return '<Entity: {} ({})>'.format(self.entity_nbr, 'Open' if self.is_open else 'Closed') def __repr__(self): return '<Entity: {} ({})>'.format(self.entity_nbr, self._param_addr)