Source code for pyion.mem
"""
# ===========================================================================
# Defines proxies to ION's SDR and PSM so that you can monitor them.
#
# Author: Marc Sanchez Net
# Date: 08/12/2019
# Copyright (c) 2019, California Institute of Technology ("Caltech").
# U.S. Government sponsorship acknowledged.
# ===========================================================================
"""
# Generic imports
import abc
from collections import defaultdict
from pprint import pprint
from unittest.mock import Mock
from threading import Thread
import time
from warnings import warn
# Import pyion modules
import pyion.utils as utils
# Import C Extension
import _mem
# Define all methods/vars exposed at pyion
__all__ = ['SdrProxy', 'PsmProxy']
# ============================================================================
# === Abstract Memory Proxy
# ============================================================================
class MemoryProxy(utils.Proxy, abc.ABC):
""" Abstract class parent to SdrProxy and PsmProxy """
def __init__(self, node_nbr):
# Call parent constructor
super().__init__(node_nbr)
# Variables for monitor
self._th = None
self._monitor_on = False
self._rate = None
self._tspan = -1
self._print_res = False
self._results = defaultdict(dict)
@abc.abstractmethod
def dump(self):
""" Dump memory state """
pass
def close(self):
# If no monitoring thread, return
if self._th is None: return
# If monitoring thread is not active, return
if not self._th.is_alive(): return
# Wait for thread to stop
self._monitor_on = False
self._th.join()
# Delete results
del self._results
def clean_results(self):
""" Clean all stored results """
self._results = defaultdict(dict)
def start_monitoring(self, rate=1, timespan=-1, print_results=False):
""" Start monitoring the SDR for a while
:param int rate: Rate in [Hz]
:param float timespan: Span of time during which measurements
are taken. After that, the measurements
are deleted and new data is acquired.
The default value of -1 indicates that
measurements are never deleted.
:param float print_results: If True, the state of the memory is printed
to stdout prior to being deleted
"""
# If a monitoring session is already on, error
if self._monitor_on:
raise ValueError('Memory is already being monitored.')
# Prepare monitoring session
self._rate = utils.Rate(rate)
self._monitor_on = True
self._tspan = timespan
self._print_res = print_results
# Start monitor in separate thread
self._th = Thread(target=self._start_monitoring, daemon=True)
self._th.start()
def _start_monitoring(self):
# Initialize variables
tic = time.time()
summary = {}
while self._monitor_on:
# Measurement index
t = time.time()
# If enough time has elapsed, clear the results
if self._tspan > 0 and time.time()-tic >= self._tspan:
# Print if necessary
if self._print_res:
print('Num. measurements:', len(self._results['summary']))
pprint(summary)
# Clean results and restart time counter
self.clean_results()
tic = time.time()
# Take measurement
try:
summary, small_pool, large_pool = self.dump()
except Exception as e:
summary, small_pool, large_pool = e, None, None
# Store results
self._results['summary'][t] = summary
self._results['small_pool'][t] = small_pool
self._results['large_pool'][t] = large_pool
# Sleep for a while
if self._rate: self._rate.sleep()
def stop_monitoring(self):
""" Stop monitoring the SDR and return results"""
# If no monitoring session is on, return
if not self._monitor_on: return
# Signal exit to the monitor thread
self._monitor_on = False
self._rate = None
self._tspan = -1
# Wait for monitoring thread to exit
self._th.join()
# Return collected results
return self._results
def __str__(self):
return '<{}>'.format(self.__class__.__name__)
# ============================================================================
# === SDR PROXY
# ============================================================================
[docs]
class SdrProxy(MemoryProxy):
""" Proxy object to ION's SDR
:ivar int: Node number
:ivar str: SDR name (see ``ionconfig``, option ``sdrName``)
"""
[docs]
@utils.in_ion_folder
def dump(self):
""" Dump the current state of the SDR
:return Tuple[Dict]: usage summary, small pool free block count,
large pool free block count
"""
return _mem.sdr_dump()
# ============================================================================
# === PSM PROXY
# ============================================================================
[docs]
class PsmProxy(MemoryProxy):
""" Proxy object to ION's PSM
:ivar int wm_key: See ``wmKey`` in ``ionconfig``. Defaults to 65281
(i.e., ION's default value)
:ivar int wm_size: Unused
:ivar str partition_name: Unused
"""
[docs]
@utils.in_ion_folder
def dump(self):
""" Dump the current state of the PSM
:return Tuple[Dict]: usage summary, small pool free block count,
large pool free block count
"""
return _mem.psm_dump()