Scan API

Functionality for searching for workflows running as the current user.

The scan() asynchronous generator yields workflows. Iterate over them using an async for statement:

async for flow in scan():
    print(flow['name'])

For further functionality construct a pipe:

pipe = scan | is_active(True) | contact_info
async for flow in pipe:
    print(f'{flow["name"]} {flow["CYLC_WORKFLOW_HOST"]}')

There are filters which you can you to omit workflows e.g. cylc_version() and transformers which acquire more information e.g. contact_info().

cylc.flow.network.scan.scan(run_dir: Path | None = None, scan_dir: Path | None = None, max_depth: int | None = None) AsyncGenerator[Dict[str, str | Path], None]

List flows installed on the filesystem.

Args:
run_dir:

The run dir to look for workflows in, defaults to ~/cylc-run.

All workflow registrations will be given relative to this path.

scan_dir:

The directory to scan for workflows in.

Use in combination with run_dir if you want to scan a subdir within the run_dir.

max_depth:

The maximum number of levels to descend before bailing.

  • max_depth=1 will pick up top-level workflows (e.g. foo).

  • max_depth=2 will pick up nested workflows (e.g. foo/bar).

Yields:

dict - Dictionary containing information about the flow.

cylc.flow.network.scan.filter_name(flow, pattern)

Filter flows by name.

Args:
flow (dict):

Flow information dictionary, provided by scan through the pipe.

pattern (re.Pattern):

One or more regex patterns as strings. This will return True if any of the patterns match.

cylc.flow.network.scan.is_active(flow, is_active)

Filter flows by the presence of a contact file.

Args:
flow (dict):

Flow information dictionary, provided by scan through the pipe.

is_active (bool):

True to filter for running flows. False to filter for stopped and unregistered flows.

cylc.flow.network.scan.contact_info(flow)

Read information from the contact file.

Requires:
  • is_active(True)

Args:
flow (dict):

Flow information dictionary, provided by scan through the pipe.

cylc.flow.network.scan.cylc_version(flow, requirement)

Filter by cylc version.

Requires:
  • is_active(True)

  • contact_info

Args:
flow (dict):

Flow information dictionary, provided by scan through the pipe.

requirement (str):

Requirement specifier in pkg_resources format e.g. > 8, < 9

cylc.flow.network.scan.api_version(flow, requirement)

Filter by the cylc API version.

Requires:
  • is_active(True)

  • contact_info

Args:
flow (dict):

Flow information dictionary, provided by scan through the pipe.

requirement (str):

Requirement specifier in pkg_resources format e.g. > 8, < 9

cylc.flow.network.scan.graphql_query(flow, fields, filters=None)

Obtain information from a GraphQL request to the flow.

Requires:
  • is_active(True)

  • contact_info

Args:
flow (dict):

Flow information dictionary, provided by scan through the pipe.

fields (iterable):

Iterable containing the fields to request e.g:

['id', 'name']

One level of nesting is supported e.g:

{'name': None, 'meta': ['title']}
filters (list):

Filter by the data returned from the query. List in the form [(key, ...), value], e.g:

# state must be running
[('state',), 'running']

# state must be running or paused
[('state',), ('running', 'paused')]
cylc.flow.network.scan.title(flow)

Attempt to parse the workflow title out of the flow config file.

Warning

This uses a fast but dumb method which may fail to extract the workflow title.

Obtaining the workflow title via graphql_query() is preferable for running flows.

cylc.flow.network.scan.workflow_params(flow)

Extract workflow parameter entries from the workflow database.

Requires:
  • is_active(True)