Workflow Runtime Interface
Cylc workflows are TCP servers which use the ZeroMQ protocol to communicate with clients and jobs.
Cylc provides a Python client to communicate with this server
cylc.flow.network.client.WorkflowRuntimeClient
>>> from cylc.flow.network.client import WorkflowRuntimeClient
>>> client = WorkflowRuntimeClient('my-workflow')
>>> client('ping_workflow')
True
Cylc also provides sub-command called cylc client
which is a simple
wrapper of the Python client.
$ cylc client generic ping_workflow -n
true
The available “commands” or (“endpoints”) are contained in
cylc.flow.network.server.WorkflowRuntimeServer
class.
Client
- class cylc.flow.network.client.WorkflowRuntimeClient(workflow: str, host: str | None = None, port: int | str | None = None, timeout: float | str | None = None, context: Context | None = None, srv_public_key_loc: str | None = None)
Initiate a client to the scheduler API.
Initiates the REQ part of a ZMQ REQ-REP pair.
This class contains the logic for the ZMQ message interface and client - server communication.
Determine host and port from the contact file unless provided.
If there is no socket bound to the specified host/port the client will bail after
timeout
seconds.- Args:
- workflow:
Name of the workflow to connect to.
- timeout:
Set the default timeout in seconds. The default is
ZMQClient.DEFAULT_TIMEOUT
. Note the default timeout can be overridden for individual requests.- host:
The host where the flow is running if known.
If both host and port are provided it is not necessary to load the contact file.
- port:
The port on which the REQ-REP TCP server is listening.
If both host and port are provided it is not necessary to load the contact file.
- Attributes:
- host (str):
Workflow host name.
- port (int):
Workflow host port.
- timeout_handler (function):
Optional function which runs before ClientTimeout is raised. This provides an interface for raising more specific exceptions in the event of a communication timeout.
- header (dict):
Request “header” data to attach to each request.
- Usage:
Call endpoints using
ZMQClient.__call__
.- Message interface:
Accepts responses of the format: {“data”: {…}}
Accepts error in the format: {“error”: {“message”: MSG}}
Returns requests of the format: {“command”: CMD, “args”: {…}}
- Raises:
WorkflowStopped: if the workflow is not running.
- Call server “endpoints” using:
__call__
,serial_request
- serial_request(command: str, args: Dict[str, Any] | None = None, timeout: float | None = None, req_meta: Dict[str, Any] | None = None) object
Send a request.
For convenience use
__call__
to call this method.- Args:
command: The name of the endpoint to call. args: Arguments to pass to the endpoint function. timeout: Override the default timeout (seconds).
- Raises:
ClientTimeout: If a response takes longer than timeout to arrive. ClientError: Coverall for all other issues including failed auth.
- Returns:
- object: The data exactly as returned from the endpoint function,
nothing more, nothing less.
async_request
Server
- class cylc.flow.network.server.WorkflowRuntimeServer(schd)
Workflow runtime service API facade exposed via zmq.
This class starts and coordinates the publisher and replier, and contains the Cylc endpoints invoked by the receiver to provide a response to incoming messages.
- Args:
- schd (object): The parent object instantiating the server. In
this case, the workflow scheduler.
- Usage:
Define endpoints using the
expose
decorator.Endpoints are called via the receiver using the function name.
- Message interface:
Accepts messages of the format: {“command”: CMD, “args”: {…}}
Returns responses of the format: {“data”: {…}}
Returns error in the format: {“error”: {“message”: MSG}}
- Common Arguments:
Arguments which are shared between multiple commands.
- task identifier (str):
A task identifier in the format
cycle-point/task-name
e.g.1/foo
or20000101T0000Z/bar
.
- task globs (list):
A list of Cylc IDs relative to the workflow.
1
- The cycle point “1”.1/foo
- The task “foo” in the cycle “1”.1/foo/01
- The first job of the task “foo” from the cycle “1”.
Glob-like patterns may be used to match multiple items e.g.
*
Matches everything.
1/*
Matches everything in cycle
1
.*/*:failed
Matches all failed tasks.
- api(endpoint: str | None = None, **_kwargs) str | List[str]
Return information about this API.
Returns a list of callable endpoints.
- Args:
- endpoint:
If specified the documentation for the endpoint will be returned instead.
- Returns:
List of endpoints or string documentation of the requested endpoint.
- graphql(request_string: str | None = None, variables: Dict[str, Any] | None = None, meta: Dict[str, Any] | None = None)
Return the GraphQL schema execution result.
- Args:
request_string: GraphQL request passed to Graphene. variables: Dict of variables passed to Graphene. meta: Dict containing auth user etc.
- Returns:
object: Execution result, or a list with errors.
- pb_data_elements(element_type: str, **_kwargs) bytes
Send the specified data elements in delta form.
- Args:
element_type: Key from DELTAS_MAP dictionary.
Returns serialised Protobuf message
- pb_entire_workflow(**_kwargs) bytes
Send the entire data-store in a single Protobuf message.
Returns serialised Protobuf message
- receiver(message)
Process incoming messages and coordinate response.
Wrap incoming messages, dispatch them to exposed methods and/or coordinate a publishing stream.
- Args:
message (dict): message contents
- register_endpoints()
Register all exposed methods.
- start(barrier)
Start the TCP servers.
- async stop(reason: BaseException | str) None
Stop the TCP servers, and clean up authentication.
This method must be called/awaited from a different thread to the server’s self.thread in order to interrupt the self.operate() loop and wait for self.thread to terminate.