Workflow Runtime Interface

Cylc workflows are TCP servers which use the ZeroMQ protocol to communicate with clients and jobs.

Cylc provides a Python client to communicate with this server cylc.flow.network.client.WorkflowRuntimeClient

>>> from cylc.flow.network.client import WorkflowRuntimeClient
>>> client = WorkflowRuntimeClient('my-workflow')
>>> client('ping_workflow')
True

Cylc also provides sub-command called cylc client which is a simple wrapper of the Python client.

$ cylc client generic ping_workflow -n
true

The available “commands” or (“endpoints”) are contained in cylc.flow.network.server.WorkflowRuntimeServer class.

Client

class cylc.flow.network.client.WorkflowRuntimeClient(workflow: str, host: str | None = None, port: int | str | None = None, timeout: float | str | None = None, context: Context | None = None, srv_public_key_loc: str | None = None)

Initiate a client to the scheduler API.

Initiates the REQ part of a ZMQ REQ-REP pair.

This class contains the logic for the ZMQ message interface and client - server communication.

Determine host and port from the contact file unless provided.

If there is no socket bound to the specified host/port the client will bail after timeout seconds.

Args:
workflow:

Name of the workflow to connect to.

timeout:

Set the default timeout in seconds. The default is ZMQClient.DEFAULT_TIMEOUT. Note the default timeout can be overridden for individual requests.

host:

The host where the flow is running if known.

If both host and port are provided it is not necessary to load the contact file.

port:

The port on which the REQ-REP TCP server is listening.

If both host and port are provided it is not necessary to load the contact file.

Attributes:
host (str):

Workflow host name.

port (int):

Workflow host port.

timeout_handler (function):

Optional function which runs before ClientTimeout is raised. This provides an interface for raising more specific exceptions in the event of a communication timeout.

header (dict):

Request “header” data to attach to each request.

Usage:

Call endpoints using ZMQClient.__call__.

Message interface:
  • Accepts responses of the format: {“data”: {…}}

  • Accepts error in the format: {“error”: {“message”: MSG}}

  • Returns requests of the format: {“command”: CMD, “args”: {…}}

Raises:

WorkflowStopped: if the workflow is not running.

Call server “endpoints” using:
__call__, serial_request
serial_request(command: str, args: Dict[str, Any] | None = None, timeout: float | None = None, req_meta: Dict[str, Any] | None = None) object

Send a request.

For convenience use __call__ to call this method.

Args:

command: The name of the endpoint to call. args: Arguments to pass to the endpoint function. timeout: Override the default timeout (seconds).

Raises:

ClientTimeout: If a response takes longer than timeout to arrive. ClientError: Coverall for all other issues including failed auth.

Returns:
object: The data exactly as returned from the endpoint function,

nothing more, nothing less.

async_request
async async_request(command: str, args: Dict[str, Any] | None = None, timeout: float | None = None, req_meta: Dict[str, Any] | None = None) object

Send an asynchronous request using asyncio.

Has the same arguments and return values as serial_request.

Server

class cylc.flow.network.server.WorkflowRuntimeServer(schd)

Workflow runtime service API facade exposed via zmq.

This class starts and coordinates the publisher and replier, and contains the Cylc endpoints invoked by the receiver to provide a response to incoming messages.

Args:
schd (object): The parent object instantiating the server. In

this case, the workflow scheduler.

Usage:
  • Define endpoints using the expose decorator.

  • Endpoints are called via the receiver using the function name.

Message interface:
  • Accepts messages of the format: {“command”: CMD, “args”: {…}}

  • Returns responses of the format: {“data”: {…}}

  • Returns error in the format: {“error”: {“message”: MSG}}

Common Arguments:

Arguments which are shared between multiple commands.

task identifier (str):

A task identifier in the format cycle-point/task-name e.g. 1/foo or 20000101T0000Z/bar.

task globs (list):

A list of Cylc IDs relative to the workflow.

  • 1 - The cycle point “1”.

  • 1/foo - The task “foo” in the cycle “1”.

  • 1/foo/01 - The first job of the task “foo” from the cycle “1”.

Glob-like patterns may be used to match multiple items e.g.

*

Matches everything.

1/*

Matches everything in cycle 1.

*/*:failed

Matches all failed tasks.

api(endpoint: str | None = None, **_kwargs) str | List[str]

Return information about this API.

Returns a list of callable endpoints.

Args:
endpoint:

If specified the documentation for the endpoint will be returned instead.

Returns:

List of endpoints or string documentation of the requested endpoint.

graphql(request_string: str | None = None, variables: Dict[str, Any] | None = None, meta: Dict[str, Any] | None = None)

Return the GraphQL schema execution result.

Args:

request_string: GraphQL request passed to Graphene. variables: Dict of variables passed to Graphene. meta: Dict containing auth user etc.

Returns:

object: Execution result, or a list with errors.

operate() None

Orchestrate the receive, send, publish of messages.

pb_data_elements(element_type: str, **_kwargs) bytes

Send the specified data elements in delta form.

Args:

element_type: Key from DELTAS_MAP dictionary.

Returns serialised Protobuf message

pb_entire_workflow(**_kwargs) bytes

Send the entire data-store in a single Protobuf message.

Returns serialised Protobuf message

async publish_queued_items() None

Publish all queued items.

receiver(message)

Process incoming messages and coordinate response.

Wrap incoming messages, dispatch them to exposed methods and/or coordinate a publishing stream.

Args:

message (dict): message contents

register_endpoints()

Register all exposed methods.

start(barrier)

Start the TCP servers.

async stop(reason: BaseException | str) None

Stop the TCP servers, and clean up authentication.

This method must be called/awaited from a different thread to the server’s self.thread in order to interrupt the self.operate() loop and wait for self.thread to terminate.