Source code for pyion.ltp
"""
# ===========================================================================
# Interface between the ION's implementation of the CCSDS LTP and Python.
# Internally, all classes call the C Extension _ltp.
#
# Author: Marc Sanchez Net
# Date: 05/30/2019
# Copyright (c) 2019, California Institute of Technology ("Caltech").
# U.S. Government sponsorship acknowledged.
# ===========================================================================
"""
# General imports
from unittest.mock import Mock
from pathlib import Path
from threading import Event, Thread
from warnings import warn
import contextlib
# Module imports
import pyion
import pyion.utils as utils
from pyion.mgmt import sm_task_yield
# Import C Extension
import _ltp
# Define all methods/vars exposed at pyion
__all__ = [
'ltp_init',
'ltp_dequeue_outbound_segment',
'ltp_handle_inbound_segment',
'AccessPoint',
]
# ============================================================================
# === Segment queueing functionality
# ============================================================================
def ltp_init(estMaxExportSessions):
_ltp.ltp_init(estMaxExportSessions)
@contextlib.contextmanager
def ltp_dequeue_outbound_segment(vspan):
try:
yield _ltp.ltp_dequeue_outbound_segment(vspan)
finally:
# Close every LTP dequeue segment handling block with a task yield
sm_task_yield()
def ltp_handle_inbound_segment(data):
_ltp.ltp_handle_inbound_segment(data)
# ============================================================================
# === AccessPoint class
# ============================================================================
[docs]
class AccessPoint():
""" Class to represent an LTP access point. """
def __init__(self, proxy, client_id, sap_addr):
# Store variables
self.proxy = proxy
self.client_id = client_id
self._sap_addr = sap_addr
self.node_dir = proxy.node_dir
self._result = None
def __del__(self):
# If you have already been closed, return
if not self.is_open:
return
# Close the Entity and free C memory
self.proxy.ltp_close(self.client_id)
@property
def is_open(self):
""" Returns True if the access point is opened """
return (self.proxy is not None and self._sap_addr is not None)
def _cleanup(self):
""" Clean access point after closing. Do not call directly,
use ``proxy.ltp_close``
"""
# Clear variables
self.proxy = None
self.client_id = None
self._sap_addr = None
[docs]
@utils._chk_is_open
@utils.in_ion_folder
def ltp_send(self, dest_engine_nbr, data):
""" Trigger LTP to send data
:param: Destination engine number
:param: Data as str, bytes or bytearray
"""
self._result = _ltp.ltp_send(self._sap_addr, dest_engine_nbr, data)
[docs]
@utils._chk_is_open
def ltp_receive(self):
""" Trigger LTP to receive data """
# Receive on another thread. Otherwise you cannot handle a SIGINT
th = Thread(target=self._ltp_receive, daemon=True)
th.start()
th.join()
# If exception, raise it
if isinstance(self._result, Exception):
raise self._result
return self._result
@utils.in_ion_folder
def _ltp_receive(self):
# Get payload from next bundle. If exception, return it too
try:
self._result = _ltp.ltp_receive(self._sap_addr)
except Exception as e:
self._result = e
[docs]
@utils._chk_is_open
@utils.in_ion_folder
def ltp_interrupt(self):
""" Trigger LTP to be interrupted """
_ltp.ltp_interrupt(self._sap_addr)
def __enter__(self):
""" Allows an endpoint to be used as context manager """
return self
def __exit__(self, exc_type, exc_val, exc_tb):
""" Allows an endpoint to be used as context manager """
pass
def __str__(self):
return '<AccessPoint: {} ({})>'.format(self.client_id, 'Open' if self.is_open else 'Closed')
def __repr__(self):
return '<AccessPoint: {} ({})>'.format(self.client_id, self._sap_addr)