Source code for cylc.flow.main_loop.log_data_store

# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""Log the number and size of each type of object in the data store.

.. note::

   This plugin is for Cylc developers debugging the data store.

If ``matplotlib`` is installed this plugin will plot results as a PDF in
the run directory when the workflow is shut down (cleanly).

"""
import json
from pathlib import Path
from time import time

from cylc.flow.main_loop import (startup, shutdown, periodic)


try:
    import matplotlib
    matplotlib.use('Agg')
    from matplotlib import pyplot as plt
    PLT = True
except ModuleNotFoundError:
    PLT = False

from pympler.asizeof import asized


[docs] @startup async def init(scheduler, state): """Construct the initial state.""" state['objects'] = {} state['size'] = {} state['times'] = [] for key, _ in _iter_data_store(scheduler.data_store_mgr.data): state['objects'][key] = [] state['size'][key] = []
[docs] @periodic async def log_data_store(scheduler, state): """Count the number of objects and the data store size.""" state['times'].append(time()) for key, value in _iter_data_store(scheduler.data_store_mgr.data): state['objects'][key].append( len(value) ) state['size'][key].append( asized(value).size )
[docs] @shutdown async def report(scheduler, state): """Dump data to JSON, attempt to plot results.""" _dump(state, scheduler.workflow_run_dir) _plot(state, scheduler.workflow_run_dir)
def _iter_data_store(data_store): for item in data_store.values(): for key, value in item.items(): if key != 'workflow': yield (key, value) # there should only be one workflow in the data store break def _dump(state, path): data = { 'times': state['times'], 'objects': state['objects'], 'size': state['size'] } json.dump( data, Path(path, f'{__name__}.json').open('w+') ) return True def _plot(state, path): if ( not PLT or len(state['times']) < 2 ): return False times = [tick - state['times'][0] for tick in state['times']] _, ax1 = plt.subplots(figsize=(10, 7.5)) ax1.set_xlabel('Time (s)') ax1.set_ylabel('Objects') for key, objects in state['objects'].items(): ax1.plot(times, objects, label=key) ax2 = ax1.twinx() ax2.set_ylabel('Size (kb)') for sizes in state['size'].values(): ax2.plot(times, [x / 1000 for x in sizes], linestyle=':') ax1.legend(loc=0) ax2.legend( (ax1.get_children()[0], ax2.get_children()[0]), ('objects', 'size'), loc=1 ) # start the x-axis at zero ax1.set_xlim(0, ax1.get_xlim()[1]) plt.savefig( Path(path, f'{__name__}.pdf') ) return True