Workflow Runtime Interface

Cylc workflows are TCP servers which use the ZeroMQ protocol to communicate with clients and jobs.

Cylc provides a Python client to communicate with this server cylc.flow.network.client.WorkflowRuntimeClient

>>> from cylc.flow.network.client import WorkflowRuntimeClient
>>> client = WorkflowRuntimeClient('my-workflow')
>>> client('ping_workflow')
True

Cylc also provides sub-command called cylc client which is a simple wrapper of the Python client.

$ cylc client generic ping_workflow -n
true

The available “commands” or (“endpoints”) are contained in cylc.flow.network.server.WorkflowRuntimeServer class.

Client

class cylc.flow.network.client.WorkflowRuntimeClient(workflow, host=None, port=None, timeout=None, context=None, srv_public_key_loc=None)[source]

Initiate a client to the scheduler API.

Initiates the REQ part of a ZMQ REQ-REP pair.

This class contains the logic for the ZMQ message interface and client - server communication.

Determine host and port from the contact file unless provided.

If there is no socket bound to the specified host/port the client will bail after timeout seconds.

Parameters:
  • workflow (str) – Name of the workflow to connect to.

  • timeout (float | str | None) – Set the default timeout in seconds. The default is ZMQClient.DEFAULT_TIMEOUT. Note the default timeout can be overridden for individual requests.

  • host (str | None) –

    The host where the flow is running if known.

    If both host and port are provided it is not necessary to load the contact file.

  • port (int | str | None) –

    The port on which the REQ-REP TCP server is listening.

    If both host and port are provided it is not necessary to load the contact file.

  • context (Context | None)

  • srv_public_key_loc (str | None)

host

Workflow host name.

port

Workflow host port.

timeout_handler

Optional function which runs before ClientTimeout is raised. This provides an interface for raising more specific exceptions in the event of a communication timeout.

header

Request “header” data to attach to each request.

Usage:

Call endpoints using ZMQClient.__call__.

Message interface:
  • Accepts responses of the format: {“data”: {…}}

  • Accepts error in the format: {“error”: {“message”: MSG}}

  • Returns requests of the format: {“command”: CMD, “args”: {…}}

Raises:

WorkflowStopped – if the workflow is not running.

Parameters:
  • workflow (str)

  • host (str | None)

  • port (int | str | None)

  • timeout (float | str | None)

  • context (Context | None)

  • srv_public_key_loc (str | None)

Call server “endpoints” using:
__call__, serial_request
serial_request(command, args=None, timeout=None, req_meta=None)

Send a request.

For convenience use __call__ to call this method.

Parameters:
  • command (str) – The name of the endpoint to call.

  • args (Dict[str, Any] | None) – Arguments to pass to the endpoint function.

  • timeout (float | None) – Override the default timeout (seconds).

  • req_meta (Dict[str, Any] | None)

Raises:
  • ClientTimeout – If a response takes longer than timeout to arrive.

  • ClientError – Coverall for all other issues including failed auth.

Returns:

The data exactly as returned from the endpoint function,

nothing more, nothing less.

Return type:

object

async_request
async async_request(command, args=None, timeout=None, req_meta=None)[source]

Send an asynchronous request using asyncio.

Has the same arguments and return values as serial_request.

Parameters:
Return type:

object

Server

class cylc.flow.network.server.WorkflowRuntimeServer(schd)[source]

Workflow runtime service API facade exposed via zmq.

This class starts and coordinates the publisher and replier, and contains the Cylc endpoints invoked by the receiver to provide a response to incoming messages.

Parameters:

schd (object) – The parent object instantiating the server. In this case, the workflow scheduler.

Usage:
  • Define endpoints using the expose decorator.

  • Endpoints are called via the receiver using the function name.

Message interface:
  • Accepts messages of the format: {“command”: CMD, “args”: {…}}

  • Returns responses of the format: {“data”: {…}}

  • Returns error in the format: {“error”: {“message”: MSG}}

Common Arguments:

Arguments which are shared between multiple commands.

task identifier (str):

A task identifier in the format cycle-point/task-name e.g. 1/foo or 20000101T0000Z/bar.

task globs (list):

A list of Cylc IDs relative to the workflow.

  • 1 - The cycle point “1”.

  • 1/foo - The task “foo” in the cycle “1”.

  • 1/foo/01 - The first job of the task “foo” from the cycle “1”.

Glob-like patterns may be used to match multiple items e.g.

*

Matches everything.

1/*

Matches everything in cycle 1.

*/*:failed

Matches all failed tasks.

api(endpoint=None, **_kwargs)[source]

Return information about this API.

Returns a list of callable endpoints.

Parameters:

endpoint (str | None) – If specified the documentation for the endpoint will be returned instead.

Returns:

List of endpoints or string documentation of the requested endpoint.

Return type:

str | List[str]

graphql(request_string=None, variables=None, meta=None)[source]

Return the GraphQL schema execution result.

Parameters:
  • request_string (str | None) – GraphQL request passed to Graphene.

  • variables (Dict[str, Any] | None) – Dict of variables passed to Graphene.

  • meta (Dict[str, Any] | None) – Dict containing auth user etc.

Returns:

Execution result, or a list with errors.

Return type:

object

operate()[source]

Orchestrate the receive, send, publish of messages.

Return type:

None

pb_data_elements(element_type, **_kwargs)[source]

Send the specified data elements in delta form.

Parameters:

element_type (str) – Key from DELTAS_MAP dictionary.

Return type:

bytes

Returns serialised Protobuf message

pb_entire_workflow(**_kwargs)[source]

Send the entire data-store in a single Protobuf message.

Returns serialised Protobuf message

Return type:

bytes

async publish_queued_items()[source]

Publish all queued items.

Return type:

None

receiver(message)[source]

Process incoming messages and coordinate response.

Wrap incoming messages, dispatch them to exposed methods and/or coordinate a publishing stream.

Parameters:

message (dict) – message contents

register_endpoints()[source]

Register all exposed methods.

start(barrier)[source]

Start the TCP servers.

async stop(reason)[source]

Stop the TCP servers, and clean up authentication.

This method must be called/awaited from a different thread to the server’s self.thread in order to interrupt the self.operate() loop and wait for self.thread to terminate.

Parameters:

reason (BaseException | str)

Return type:

None