Workflow Runtime Interface
Cylc workflows are TCP servers which use the ZeroMQ protocol to communicate with clients and jobs.
Cylc provides a Python client to communicate with this server
cylc.flow.network.client.WorkflowRuntimeClient
>>> from cylc.flow.network.client import WorkflowRuntimeClient
>>> client = WorkflowRuntimeClient('my-workflow')
>>> client('ping_workflow')
True
Cylc also provides sub-command called cylc client
which is a simple
wrapper of the Python client.
$ cylc client generic ping_workflow -n
true
The available “commands” or (“endpoints”) are contained in
cylc.flow.network.server.WorkflowRuntimeServer
class.
Client
- class cylc.flow.network.client.WorkflowRuntimeClient(workflow, host=None, port=None, timeout=None, context=None, srv_public_key_loc=None)[source]
Initiate a client to the scheduler API.
Initiates the REQ part of a ZMQ REQ-REP pair.
This class contains the logic for the ZMQ message interface and client - server communication.
Determine host and port from the contact file unless provided.
If there is no socket bound to the specified host/port the client will bail after
timeout
seconds.- Parameters:
workflow (str) – Name of the workflow to connect to.
timeout (float | str | None) – Set the default timeout in seconds. The default is
ZMQClient.DEFAULT_TIMEOUT
. Note the default timeout can be overridden for individual requests.host (str | None) –
The host where the flow is running if known.
If both host and port are provided it is not necessary to load the contact file.
The port on which the REQ-REP TCP server is listening.
If both host and port are provided it is not necessary to load the contact file.
context (Context | None)
srv_public_key_loc (str | None)
- host
Workflow host name.
- port
Workflow host port.
- timeout_handler
Optional function which runs before ClientTimeout is raised. This provides an interface for raising more specific exceptions in the event of a communication timeout.
- header
Request “header” data to attach to each request.
- Usage:
Call endpoints using
ZMQClient.__call__
.- Message interface:
Accepts responses of the format: {“data”: {…}}
Accepts error in the format: {“error”: {“message”: MSG}}
Returns requests of the format: {“command”: CMD, “args”: {…}}
- Raises:
WorkflowStopped – if the workflow is not running.
- Parameters:
- Call server “endpoints” using:
__call__
,serial_request
- serial_request(command, args=None, timeout=None, req_meta=None)
Send a request.
For convenience use
__call__
to call this method.- Parameters:
- Raises:
ClientTimeout – If a response takes longer than timeout to arrive.
ClientError – Coverall for all other issues including failed auth.
- Returns:
- The data exactly as returned from the endpoint function,
nothing more, nothing less.
- Return type:
async_request
Server
- class cylc.flow.network.server.WorkflowRuntimeServer(schd)[source]
Workflow runtime service API facade exposed via zmq.
This class starts and coordinates the publisher and replier, and contains the Cylc endpoints invoked by the receiver to provide a response to incoming messages.
- Parameters:
schd (object) – The parent object instantiating the server. In this case, the workflow scheduler.
- Usage:
Define endpoints using the
expose
decorator.Endpoints are called via the receiver using the function name.
- Message interface:
Accepts messages of the format: {“command”: CMD, “args”: {…}}
Returns responses of the format: {“data”: {…}}
Returns error in the format: {“error”: {“message”: MSG}}
- Common Arguments:
Arguments which are shared between multiple commands.
- task identifier (str):
A task identifier in the format
cycle-point/task-name
e.g.1/foo
or20000101T0000Z/bar
.
- task globs (list):
A list of Cylc IDs relative to the workflow.
1
- The cycle point “1”.1/foo
- The task “foo” in the cycle “1”.1/foo/01
- The first job of the task “foo” from the cycle “1”.
Glob-like patterns may be used to match multiple items e.g.
*
Matches everything.
1/*
Matches everything in cycle
1
.*/*:failed
Matches all failed tasks.
- api(endpoint=None, **_kwargs)[source]
Return information about this API.
Returns a list of callable endpoints.
- graphql(request_string=None, variables=None, meta=None)[source]
Return the GraphQL schema execution result.
- pb_data_elements(element_type, **_kwargs)[source]
Send the specified data elements in delta form.
Returns serialised Protobuf message
- pb_entire_workflow(**_kwargs)[source]
Send the entire data-store in a single Protobuf message.
Returns serialised Protobuf message
- Return type:
- receiver(message)[source]
Process incoming messages and coordinate response.
Wrap incoming messages, dispatch them to exposed methods and/or coordinate a publishing stream.
- Parameters:
message (dict) – message contents
- async stop(reason)[source]
Stop the TCP servers, and clean up authentication.
This method must be called/awaited from a different thread to the server’s self.thread in order to interrupt the self.operate() loop and wait for self.thread to terminate.
- Parameters:
reason (BaseException | str)
- Return type:
None