Workflow Runtime Interface

Cylc workflows are TCP servers which use the ZeroMQ protocol to communicate with clients and jobs.

Cylc provides a Python client to communicate with this server

>>> from import WorkflowRuntimeClient
>>> client = WorkflowRuntimeClient('my-workflow')
>>> client('ping_workflow')

Cylc also provides sub-command called cylc client which is a simple wrapper of the Python client.

$ cylc client generic ping_workflow -n

The available “commands” or (“endpoints”) are contained in class.


class str, host: str | None = None, port: int | str | None = None, timeout: float | str | None = None, context: Context | None = None, srv_public_key_loc: str | None = None)

Initiate a client to the scheduler API.

Initiates the REQ part of a ZMQ REQ-REP pair.

This class contains the logic for the ZMQ message interface and client - server communication.

Determine host and port from the contact file unless provided.

If there is no socket bound to the specified host/port the client will bail after timeout seconds.


Name of the workflow to connect to.


Set the default timeout in seconds. The default is ZMQClient.DEFAULT_TIMEOUT. Note the default timeout can be overridden for individual requests.


The host where the flow is running if known.

If both host and port are provided it is not necessary to load the contact file.


The port on which the REQ-REP TCP server is listening.

If both host and port are provided it is not necessary to load the contact file.

host (str):

Workflow host name.

port (int):

Workflow host port.

timeout_handler (function):

Optional function which runs before ClientTimeout is raised. This provides an interface for raising more specific exceptions in the event of a communication timeout.

header (dict):

Request “header” data to attach to each request.


Call endpoints using ZMQClient.__call__.

Message interface:
  • Accepts responses of the format: {“data”: {…}}

  • Accepts error in the format: {“error”: {“message”: MSG}}

  • Returns requests of the format: {“command”: CMD, “args”: {…}}


WorkflowStopped: if the workflow is not running.

Call server “endpoints” using:
__call__, serial_request
serial_request(command: str, args: Dict[str, Any] | None = None, timeout: float | None = None, req_meta: Dict[str, Any] | None = None) object

Send a request.

For convenience use __call__ to call this method.


command: The name of the endpoint to call. args: Arguments to pass to the endpoint function. timeout: Override the default timeout (seconds).


ClientTimeout: If a response takes longer than timeout to arrive. ClientError: Coverall for all other issues including failed auth.

object: The data exactly as returned from the endpoint function,

nothing more, nothing less.

async async_request(command: str, args: Dict[str, Any] | None = None, timeout: float | None = None, req_meta: Dict[str, Any] | None = None) object

Send an asynchronous request using asyncio.

Has the same arguments and return values as serial_request.



Workflow runtime service API facade exposed via zmq.

This class starts and coordinates the publisher and replier, and contains the Cylc endpoints invoked by the receiver to provide a response to incoming messages.

schd (object): The parent object instantiating the server. In

this case, the workflow scheduler.

  • Define endpoints using the expose decorator.

  • Endpoints are called via the receiver using the function name.

Message interface:
  • Accepts messages of the format: {“command”: CMD, “args”: {…}}

  • Returns responses of the format: {“data”: {…}}

  • Returns error in the format: {“error”: {“message”: MSG}}

Common Arguments:

Arguments which are shared between multiple commands.

task identifier (str):

A task identifier in the format cycle-point/task-name e.g. 1/foo or 20000101T0000Z/bar.

task globs (list):

A list of Cylc IDs relative to the workflow.

  • 1 - The cycle point “1”.

  • 1/foo - The task “foo” in the cycle “1”.

  • 1/foo/01 - The first job of the task “foo” from the cycle “1”.

Glob-like patterns may be used to match multiple items e.g.


Matches everything.


Matches everything in cycle 1.


Matches all failed tasks.

api(endpoint: str | None = None, **_kwargs) str | List[str]

Return information about this API.

Returns a list of callable endpoints.


If specified the documentation for the endpoint will be returned instead.


List of endpoints or string documentation of the requested endpoint.

graphql(request_string: str | None = None, variables: Dict[str, Any] | None = None, meta: Dict[str, Any] | None = None)

Return the GraphQL schema execution result.


request_string: GraphQL request passed to Graphene. variables: Dict of variables passed to Graphene. meta: Dict containing auth user etc.


object: Execution result, or a list with errors.

operate() None

Orchestrate the receive, send, publish of messages.

pb_data_elements(element_type: str, **_kwargs) bytes

Send the specified data elements in delta form.


element_type: Key from DELTAS_MAP dictionary.

Returns serialised Protobuf message

pb_entire_workflow(**_kwargs) bytes

Send the entire data-store in a single Protobuf message.

Returns serialised Protobuf message

async publish_queued_items() None

Publish all queued items.


Process incoming messages and coordinate response.

Wrap incoming messages, dispatch them to exposed methods and/or coordinate a publishing stream.


message (dict): message contents


Register all exposed methods.


Start the TCP servers.

async stop(reason: BaseException | str) None

Stop the TCP servers, and clean up authentication.

This method must be called/awaited from a different thread to the server’s self.thread in order to interrupt the self.operate() loop and wait for self.thread to terminate.