Scan API
Functionality for searching for workflows running as the current user.
The scan()
asynchronous generator yields workflows. Iterate
over them using an async for
statement:
async for flow in scan():
print(flow['name'])
For further functionality construct a pipe:
pipe = scan | is_active(True) | contact_info
async for flow in pipe:
print(f'{flow["name"]} {flow["CYLC_WORKFLOW_HOST"]}')
There are filters which you can you to omit workflows e.g.
cylc_version()
and transformers which acquire more information
e.g. contact_info()
.
- cylc.flow.network.scan.scan(run_dir=None, scan_dir=None, max_depth=None)
List flows installed on the filesystem.
- Parameters:
run_dir (Path | None) –
The run dir to look for workflows in, defaults to ~/cylc-run.
All workflow registrations will be given relative to this path.
scan_dir (Path | None) –
The directory to scan for workflows in.
Use in combination with run_dir if you want to scan a subdir within the run_dir.
max_depth (int | None) –
The maximum number of levels to descend before bailing.
max_depth=1
will pick up top-level workflows (e.g.foo
).max_depth=2
will pick up nested workflows (e.g.foo/bar
).
- Yields:
dict - Dictionary containing information about the flow.
- Return type:
AsyncGenerator[Dict[str, str | Path], None]
- cylc.flow.network.scan.filter_name(flow, pattern)
Filter flows by name.
- Parameters:
flow (dict) – Flow information dictionary, provided by scan through the pipe.
pattern (re.Pattern) – One or more regex patterns as strings. This will return True if any of the patterns match.
- cylc.flow.network.scan.is_active(flow, is_active)
Filter flows by the presence of a contact file.
- cylc.flow.network.scan.contact_info(flow)
Read information from the contact file.
- Requires:
is_active(True)
- Parameters:
flow (dict) – Flow information dictionary, provided by scan through the pipe.
- cylc.flow.network.scan.cylc_version(flow, requirement)
Filter by cylc version.
- Requires:
is_active(True)
contact_info
- cylc.flow.network.scan.api_version(flow, requirement)
Filter by the cylc API version.
- Requires:
is_active(True)
contact_info
- cylc.flow.network.scan.graphql_query(flow, fields, filters=None)
Obtain information from a GraphQL request to the flow.
- Requires:
is_active(True)
contact_info
- Parameters:
flow (dict) – Flow information dictionary, provided by scan through the pipe.
fields (Iterable) –
Iterable containing the fields to request e.g:
['id', 'name']
One level of nesting is supported e.g:
{'name': None, 'meta': ['title']}
filters (list) –
Filter by the data returned from the query. List in the form
[(key, ...), value]
, e.g:# state must be running [('state',), 'running'] # state must be running or paused [('state',), ('running', 'paused')]
- cylc.flow.network.scan.title(flow)
Attempt to parse the workflow title out of the flow config file.
Warning
This uses a fast but dumb method which may fail to extract the workflow title.
Obtaining the workflow title via
graphql_query()
is preferable for running flows.
- cylc.flow.network.scan.workflow_params(flow)
Extract workflow parameter entries from the workflow database.
- Requires:
is_active(True)