Source code for cylc.flow.main_loop.log_memory

# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""Log the memory usage of a running scheduler over time.

.. note::

   This plugin is for Cylc developers debugging cylc memory usage.

   For general interest memory measurement try
   ``/usr/bin/time -v cylc play`` or ``cylc play --profile``.

.. note::

   Pympler associates memory with the first object which references it.

   In Cylc we have some objects (e.g. the configuration) which are references
   from multiple places.

   This can result in a certain amount of "jitter" in the results where
   pympler has swapper from associating memory with one object to another.

   Watch out for matching increase/decrease in reported memory in
   different objects.

.. warning::

   This plugin can slow down a workflow significantly due to the
   complexity of memory calculations.

   Set a sensible interval before running workflows.

If ``matplotlib`` is installed this plugin will plot results as a PDF in
the run directory when the workflow is shut down (cleanly).

"""

import json
from pathlib import Path
from time import time

from cylc.flow.main_loop import (startup, shutdown, periodic)

try:
    import matplotlib
    matplotlib.use('Agg')
    from matplotlib import pyplot as plt
    PLT = True
except ModuleNotFoundError:
    PLT = False

from pympler.asizeof import asized


# TODO: make this configurable in the global config
MIN_SIZE = 10000


[docs] @startup async def init(scheduler, state): """Take an initial memory snapshot.""" state['data'] = [] await take_snapshot(scheduler, state)
[docs] @periodic async def take_snapshot(scheduler, state): """Take a memory snapshot""" state['data'].append(( time(), _compute_sizes(scheduler, min_size=MIN_SIZE) ))
[docs] @shutdown async def report(scheduler, state): """Take a final memory snapshot and dump the results.""" await take_snapshot(scheduler, state) _dump(state['data'], scheduler.workflow_run_dir) fields, times = _transpose(state['data']) _plot( fields, times, scheduler.workflow_run_dir, f'cylc.flow.scheduler.Scheduler attrs > {MIN_SIZE / 1000}kb' )
def _compute_sizes(obj, min_size=10000): """Return the sizes of the attributes of an object.""" size = asized(obj, detail=2) for ref in size.refs: if ref.name == '__dict__': break else: raise Exception('Cannot find __dict__ reference') return { **{ item.name.split(':')[0][4:]: item.size for item in ref.refs if item.size > min_size }, **{'total': size.size}, } def _transpose(data): """Pivot data from snapshot to series oriented.""" all_keys = set() for _, datum in data: all_keys.update(datum.keys()) # sort keys by the size of the last checkpoint so that the fields # get plotted from largest to smallest all_keys = list(all_keys) all_keys.sort(key=lambda x: data[-1][1].get(x, 0), reverse=True) # extract data for each field, if not present fields = {} for key in all_keys: fields[key] = [ datum.get(key, -1) for _, datum in data ] start_time = data[0][0] times = [ timestamp - start_time for timestamp, _ in data ] return fields, times def _dump(data, path): json.dump( data, Path(path, f'{__name__}.json').open('w+') ) return True def _plot(fields, times, path, title='Objects'): if ( not PLT or len(times) < 2 ): return False fig, ax1 = plt.subplots(figsize=(10, 7.5)) fig.suptitle(title) ax1.set_xlabel('Time (s)') ax1.set_ylabel('Memory (kb)') for key, sizes in fields.items(): ax1.plot(times, [x / 1000 for x in sizes], label=key) ax1.legend(loc=0) # start both axis at 0 ax1.set_xlim(0, ax1.get_xlim()[1]) ax1.set_ylim(0, ax1.get_ylim()[1]) plt.savefig( Path(path, f'{__name__}.pdf') ) return True