Scan API

Functionality for searching for workflows running as the current user.

The scan() asynchronous generator yields workflows. Iterate over them using an async for statement:

async for flow in scan():
    print(flow['name'])

For further functionality construct a pipe:

pipe = scan | is_active(True) | contact_info
async for flow in pipe:
    print(f'{flow["name"]} {flow["CYLC_WORKFLOW_HOST"]}')

There are filters which you can you to omit workflows e.g. cylc_version() and transformers which acquire more information e.g. contact_info().

cylc.flow.network.scan.scan(run_dir=None, scan_dir=None, max_depth=None)

List flows installed on the filesystem.

Parameters:
  • run_dir (Path | None) –

    The run dir to look for workflows in, defaults to ~/cylc-run.

    All workflow registrations will be given relative to this path.

  • scan_dir (Path | None) –

    The directory to scan for workflows in.

    Use in combination with run_dir if you want to scan a subdir within the run_dir.

  • max_depth (int | None) –

    The maximum number of levels to descend before bailing.

    • max_depth=1 will pick up top-level workflows (e.g. foo).

    • max_depth=2 will pick up nested workflows (e.g. foo/bar).

Yields:

dict - Dictionary containing information about the flow.

Return type:

AsyncGenerator[Dict[str, Path | str], None]

cylc.flow.network.scan.filter_name(flow, pattern)

Filter flows by name.

Parameters:
  • flow (dict) – Flow information dictionary, provided by scan through the pipe.

  • pattern (re.Pattern) – One or more regex patterns as strings. This will return True if any of the patterns match.

cylc.flow.network.scan.is_active(flow, is_active)

Filter flows by the presence of a contact file.

Parameters:
  • flow (dict) – Flow information dictionary, provided by scan through the pipe.

  • is_active (bool) – True to filter for running flows. False to filter for stopped and unregistered flows.

cylc.flow.network.scan.contact_info(flow)

Read information from the contact file.

Requires:
  • is_active(True)

Parameters:

flow (dict) – Flow information dictionary, provided by scan through the pipe.

cylc.flow.network.scan.cylc_version(flow, requirement)

Filter by cylc version.

Requires:
  • is_active(True)

  • contact_info

Parameters:
  • flow (dict) – Flow information dictionary, provided by scan through the pipe.

  • requirement (str) – Requirement specifier in PEP 440 format e.g. > 8, < 9

cylc.flow.network.scan.api_version(flow, requirement)

Filter by the cylc API version.

Requires:
  • is_active(True)

  • contact_info

Parameters:
  • flow (dict) – Flow information dictionary, provided by scan through the pipe.

  • requirement (str) – Requirement specifier in PEP 440 format e.g. > 8, < 9

cylc.flow.network.scan.graphql_query(flow, fields, filters=None)

Obtain information from a GraphQL request to the flow.

Requires:
  • is_active(True)

  • contact_info

Parameters:
  • flow (dict) – Flow information dictionary, provided by scan through the pipe.

  • fields (Iterable) –

    Iterable containing the fields to request e.g:

    ['id', 'name']
    

    One level of nesting is supported e.g:

    {'name': None, 'meta': ['title']}
    

  • filters (list) –

    Filter by the data returned from the query. List in the form [(key, ...), value], e.g:

    # state must be running
    [('state',), 'running']
    
    # state must be running or paused
    [('state',), ('running', 'paused')]
    

cylc.flow.network.scan.title(flow)

Attempt to parse the workflow title out of the flow config file.

Warning

This uses a fast but dumb method which may fail to extract the workflow title.

Obtaining the workflow title via graphql_query() is preferable for running flows.

cylc.flow.network.scan.workflow_params(flow)

Extract workflow parameter entries from the workflow database.

Requires:
  • is_active(True)