External Triggers

Cylc dependencies allow us to trigger tasks off of events/outputs created within the workflow.

[scheduling]
    [[graph]]
        # "bar" will wait until "foo" has succeeded
        P1D = foo:succeeded => bar

External triggers allow us to trigger tasks off of events that are external to the workflow itself, for example the real-world time (aka wallclock time), the arrival of a data set, or the completion of tasks in other workflows.

[scheduling]
    [[graph]]
        # "bar" will wait until the real-world time matches the cycle point
        P1D = @wall_clock => bar

Cylc provides two mechanisms for implementing external triggers in a workflow:

Xtriggers (pull):

These are Python functions that are routinely “polled” until a condition is satisfied.

Ext-triggers (push):

These are commands that send messages to the scheduler once a condition has been satisfied.

Xtriggers (pull)

Xtriggers allow tasks to trigger off of arbitrary external conditions. The scheduler periodically calls a Python function, to check on the condition, until it returns success to satisfy dependent tasks.

You can write Custom Xtrigger Functions, and Cylc has several built in:

Periodic Checking

Xtrigger calls commence when the first dependent task enters the active window, repeating at configurable intervals until success is achieved. The default call interval is 10 seconds.

Note

The xtrigger prerequisites of future tasks that have yet to enter the active window, will always show as unsatisfied because the associated xtrigger function has not been called yet.

The scheduler satisfies future tasks when they enter the active window, if they depend on the same xtrigger, without calling the function again - see xtrigger Specificity.

Xtriggers must return quickly, or be killed by the global.cylc[scheduler]process pool timeout.

Warning

Each xtrigger call is made in a new Python subprocess. Consider increasing the call interval if you have many xtriggers, to reduce the associated system load.

Declaring Xtriggers

Xtriggers are declared under flow.cylc[scheduling][xtriggers] by associating a short label with a function name, arguments, and optional custom check interval.

The label must be prefixed by “@” for use in the graph, and must comply with basic naming rules:

class cylc.flow.unicode_rules.XtriggerNameValidator[source]

The rules for valid xtrigger labels:

  • can only contain: latin letters and numbers, _

  • cannot start with: _cylc

The following workflow declares an xtrigger x1 = check_data. The function has one argument, a file path, and will be called every 30 seconds until it succeeds - at which point process_data can trigger:

[scheduling]
    [[xtriggers]]
        x1 = check_data(loc="/path/to/data/source"):PT30S
    [[graph]]
        P1D = "@x1 => process_data"
[runtime]
    [[process_data]]

Argument keywords can be omitted, so long as argument order is preserved:

[scheduling]
    [[xtriggers]]
        x1 = check_data("/path/to/data/source"):PT30S

Note

Trigger labels can be used with & (AND) operators in the graph, but currently not with | (OR) - attempts to do that will fail validation.

Xtrigger Results

Xtrigger functions must return a flat dictionary of results to be broadcast to dependent tasks, via environment variables named as <xtrigger_label>_<dictionary_key>.

For example, if the x1 xtrigger returns this dictionary:

# returned by check_data() on success:
{
    "data_path": "/path/to/data",
    "data_type": "netcdf"
}

Then the process_data task, which depends on x1, will see the following environment variables:

# job environment of process_data:
x1_data_path="/path/to/data"
x1_data_type="netcdf"

The process_data task would likely run an application that needs this information in terms of its native configuration. You can translate from xtrigger to application in the workflow configuration:

[runtime]
    [[process_data]]
        script = run-process.py
        [[[environment]]]
            INPUT_DATA_LOCN = $x1_data_path
            INPUT_DATA_TYPE = $x1_date_type

Task and Cycle Specificity

Cylc makes a call sequence for each unique xtrigger with one or more dependent tasks in the active window. Uniqueness is determined by the function signature, i.e. the function name and arguments.

Depending on the argument list, an xtrigger can be universal - the same for all tasks that depend on it; or specific - to the name and/or cycle point of tasks that depend on it.

Universal xtriggers

If an xtrigger has no arguments that vary as the workflow runs, a single call sequence will satisfy every dependent task in the entire graph.

Below, every cycle point instance of process_data depends on the same xtrigger, which presumably checks data for the entire workflow. Once satisfied it allows every instance of the task to run:

[scheduling]
    cycling mode = integer
    initial cycle point = 1
    final cycle point = 10
    [[xtriggers]]
        x = check_data("/path/to/data")
    [[graph]]
        P1 = "@x => process_data"
[runtime]
    [[check_data]]

Task and Cycle-Specific Xtriggers

Xtrigger arguments can incorporate string templates as placeholders for certain workflow parameters - see Custom Xtrigger Functions. Several of these are specific to the cycle point or name of tasks that depend on the xtrigger:

  • %(point)s - cycle point of the dependent task

  • %(names)s - name of the dependent task

  • %(id)s - identity (point/name) of the dependent task

If these are used, a new call sequence, with new arguments, will commence whenever a dependent task with a new name or cycle point enters the active window.

Below, every instance of process_data depends on a different, cycle point-specific xtrigger, which presumably checks data just for that instance. Each xtrigger, once satisfied, allows just one instance of the task to run:

[scheduling]
    cycling mode = integer
    initial cycle point = 1
    final cycle point = 10
    [[xtriggers]]
        x = check_data(loc="/path/to/data", cycle="%(point)s")
    [[graph]]
        P1 = "@x => process_data"

Note

You have to inspect the function signature to see the cycle and task specificity of xtriggers.

The following working example shows four xtriggers based on the toy “echo” xtrigger, which takes an arbitrary list of arguments:

[scheduling]
    cycling mode = integer
    initial cycle point = 1
    final cycle point = 2
    [[xtriggers]]
        w1 = echo(succeed=True)  # universal
        x2 = echo(succeed=True, task="%(name)s")  # task name specific
        y2 = echo(succeed=True, cycle="%(point)s")  # cycle point specific
        z4 = echo(succeed=True, task="%(name)s", cycle="%(point)s")  # both
    [[graph]]
        P1 = "@w1 & @x1 & @y2 & @z4 => foo & bar"
[runtime]
    [[foo, bar]]

Run this with cylc play --no-detach and watch when each xtrigger is called: w1 will be called once, because it has only static arguments; x2 will be called twice, once for each task name (regardless of cycle point); y2 will be called twice, once for each cycle point (regardless of task name); and z4 will be called four times, once for each task in each cycle point:

$ cylc cat-log xtr | grep 'xtrigger succeeded'
INFO - xtrigger succeeded: w1 = echo(succeed=True)
INFO - xtrigger succeeded: x2 = echo(succeed=True, task=bar)
INFO - xtrigger succeeded: y2 = echo(cycle=1, succeed=True)
INFO - xtrigger succeeded: z4 = echo(cycle=1, succeed=True, task=bar)
INFO - xtrigger succeeded: x2 = echo(succeed=True, task=foo)
INFO - xtrigger succeeded: z4 = echo(cycle=1, succeed=True, task=foo)
INFO - xtrigger succeeded: y2 = echo(cycle=2, succeed=True)
INFO - xtrigger succeeded: z4 = echo(cycle=2, succeed=True, task=bar)
INFO - xtrigger succeeded: z4 = echo(cycle=2, succeed=True, task=foo)

Sequential Xtriggers

Parentless tasks, which often depend on clock or other xtriggers, automatically spawn into the active window, multiple cycles ahead (to the runahead limit). If they depend on xtriggers that will only be satisfied in cycle point order, this causes unnecessary xtrigger checking and UI clutter.

Sequential xtriggers prevent this by delaying the spawning of the next dependent task instance until the current one is satisfied. To do this (in reverse order of precedence):

  • set [scheduling]sequential xtriggers=True for all xtriggers in the workflow

  • add a sequential=True argument to function definitions (in Python source files)

  • add a sequential=True argument to function declarations (in workflow configurations)

Note

The built in wall_clock xtrigger is sequential by default.

Forcing Xtriggers

You can manually satisfy a task’s xtrigger prerequisites via the GUI or command line, so the task can run even if the xtrigger has not yet succeeded.

This will not affect other tasks that depend on the same xtrigger, but the scheduler will stop checking the xtrigger if no other active tasks depend on it.

# Satisfy @x1 => foo in cycle point 1
$ cylc set --pre=xtrigger/x1 my_workflow//1/foo
# See cylc set --help

For this to work without causing task failure, set appropriate default values for xtrigger results in task scripting.

Built-in Clock Triggers

Clock xtriggers succeed when the real (“wall clock”) time reaches some offset from the task’s cycle point value.

Note

These should be used instead of the older task clock triggers documented in Clock Triggers.

The clock xtrigger function signature looks like this:

cylc.flow.xtriggers.wall_clock.wall_clock(offset='PT0S', sequential=True)[source]

Trigger at a specific real “wall clock” time relative to the cycle point in the graph.

Clock triggers, unlike other trigger functions, are executed synchronously in the main process.

Parameters:
  • offset (str) – ISO 8601 interval to wait after the cycle point is reached in real time before triggering. May be negative, in which case it will trigger before the real time reaches the cycle point.

  • sequential (bool) – Wall-clock xtriggers are run sequentially by default. See Sequential Xtriggers for more details.

Changed in version 8.3.0: The sequential argument was added.

Note

Clock xtriggers are cycle-point specific by nature; you don’t need to use function arguments to achieve this.

In the following workflow, task foo has a daily cycle point sequence, and each task instance will trigger when the real time is one hour past its cycle point value.

[scheduling]
    initial cycle point = 2018-01-01
    [[xtriggers]]
        clock_1 = wall_clock(offset=PT1H)
    [[graph]]
        P1D = "@clock_1 => foo"
[runtime]
    [[foo]]
        script = run-foo.sh

Or omitting the argument keyword:

[scheduling]
    [[xtriggers]]
        clock_1 = wall_clock(PT1H)

A zero-offset clock trigger does not need to be declared before use:

[scheduling]
    initial cycle point = 2018-01-01
    [[graph]]
         # zero-offset clock trigger:
        P1D = "@wall_clock => foo"
[runtime]
    [[foo]]
        script = run-foo.sh

Built-in Workflow State Triggers

Workflow-state xtriggers succeed when a given task in another workflow achieves a given state or output.

Note

These should be used instead of the older workflow state polling tasks described in Workflow State Polling (old style).

The workflow state trigger function signature looks like this:

cylc.flow.xtriggers.workflow_state.workflow_state(workflow_task_id, offset=None, flow_num=None, is_trigger=False, is_message=False, alt_cylc_run_dir=None)[source]

Connect to a workflow DB and check a task status or output.

If the status or output has been achieved, return {True, result}.

Parameters:
  • workflow_task_id (str) – ID (workflow//point/task:selector) of the target task.

  • offset (str | None) – Offset from cycle point as an ISO8601 or integer duration, e.g. PT1H (1 hour) or P1 (1 integer cycle)

  • flow_num (int | None) – Flow number of the target task.

  • is_trigger (bool) – Interpret the task:selector as a task trigger name rather than a task status.

  • is_message (bool) – Interpret the task:selector as a task output message rather than a task status.

  • alt_cylc_run_dir (str | None) – Alternate cylc-run directory, e.g. for another user.

Returns:

(satisfied, result)

satisfied:

True if satisfied else False.

result:

Dict containing the keys:

  • workflow

  • task

  • point

  • offset

  • status

  • message

  • trigger

  • flow_num

  • run_dir

Return type:

tuple

Changed in version 8.3.0: The workflow_task_id argument was introduced to replace the separate workflow, point, task, status, and message arguments (which are still supported for backwards compatibility). The flow_num argument was added. The cylc_run_dir argument was renamed to alt_cylc_run_dir.

The first argument identifies the target workflow, cycle, task, and status or output trigger name. The function arguments mirror the arguments and options of the cylc workflow-state command - see cylc workflow-state --help.

As a simple example, consider the following “upstream” workflow (which we want to trigger off of):

[scheduler]
    cycle point format = %Y
[scheduling]
    initial cycle point = 2005
    final cycle point = 2015
   [[graph]]
      P1Y = "foo => bar"
[runtime]
   [[bar]]
      script = sleep 10
   [[foo]]
      script = sleep 5; cylc message "data ready"
      [[[outputs]]]
          x = "data ready"

It must be installed and run under the name up, as referenced in the “downstream” workflow that depends on it:

[scheduler]
  cycle point format = %Y
[scheduling]
    initial cycle point = 2010
    [[xtriggers]]
         upstream = workflow_state("up//%(point)s/foo:x", is_trigger=True):PT10S
         clock_0 = wall_clock(offset=PT0H)
   [[graph]]
        P1Y = """
            foo
            @clock_0 & @upstream => FAM:succeed-all => blam
        """
[runtime]
    [[root]]
        script = sleep 5
    [[foo, blam]]
    [[FAM]]
    [[f1,f2,f3]]
        inherit = FAM

Try starting the downstream workflow first, then the upstream, and watch what happens. In each cycle point the @upstream trigger in the downstream workflow waits for the upstream task foo (with the same cycle point) workflow to generate the “data ready” message.

Note

  • The workflow_state trigger function, like the cylc workflow-state command, must have read-access to the upstream workflow’s public database.

  • The task cycle point is supplied by a string template %(point)s. See Custom Xtrigger Functions) for other string templates available to xtriggers.

The return value of the workflow_state trigger function looks like this:

results = {
    'workflow': workflow_id,
    'task': task_name,
    'point': cycle_point,
    'status': task_status,  # or
    'trigger': task_output_trigger,  # or
    'message': task_output_message,
    'flow_num': flow_num  # if given
}
return (satisfied, results)

The results dictionary contains the names and values of the target workflow state parameters. Each name gets qualified with the unique trigger name (“upstream” here) and passed to the environment of dependent tasks (the members of the FAM family in this case). To see this, take a look at the job script for one of the downstream tasks:

% cylc cat-log -f j dn//2011/f22011
...
cylc__job__inst__user_env() {
    # TASK RUNTIME ENVIRONMENT:
    export upstream_workflow upstream_cylc_run_dir upstream_offset \
      upstream_message upstream_status upstream_point upstream_task
    upstream_workflow="up"
    upstream_task="foo"
    upstream_point="2011"
    upstream_status="succeeded"
}
...

Note

The dependent task has to know the name of the xtrigger that it depends on - “upstream” in this case - in order to use this information. However the name could be given to the task environment in the workflow configuration.

Built-in Toy Xtriggers

echo

The toy echo trigger simply prints any arguments that you give it to stdout, and then fails (trigger condition not met) or succeeds (trigger condition met) according to the value of a succeed=True argument (which defaults to False). On success, it returns all arguments in the result dictionary.

cylc.flow.xtriggers.echo.echo(*args, **kwargs)[source]

Print arguments to stdout, return kwargs[‘succeed’] and kwargs.

This may be a useful aid to understanding how xtriggers work.

Parameters:
  • succeed – Set the success of failure of this xtrigger.

  • *args – Print to stdout.

  • **kwargs – Print to stdout, and return as output.

Returns:

(True/False, kwargs)

Return type:

Tuple

Examples

>>> echo('Breakfast Time', succeed=True, egg='poached')
echo: ARGS: ('Breakfast Time',)
echo: KWARGS: {'succeed': True, 'egg': 'poached'}
(True, {'succeed': True, 'egg': 'poached'})
[scheduling]
    initial cycle point = now
    [[xtriggers]]
        echo_1 = echo(succeeded=True, hello, 99, point=%(point)s, foo=10)
    [[graph]]
        PT1H = "@echo_1 => foo"
[runtime]
    [[foo]]
        script = "printenv | grep echo_1"

Run this with cylc play --no-detach <workflow> and watch your terminal to see the xtrigger calls. View the task job log with cylc cat-log <workflow_id>//1/foo to confirm that the dependent task received the xtrigger results.

xrandom

The toy xrandom function sleeps for a configurable amount of time (useful for testing the effect of a long-running trigger function - which should be avoided) and has a configurable random chance of success. The function signature is:

cylc.flow.xtriggers.xrandom.xrandom(percent, secs=0, _=None)[source]

Random xtrigger, with configurable sleep and percent success.

Sleep for sec seconds, and report satisfied with percent likelihood.

The _ argument is not used in the function code, but can be used to specialize the function signature to cycle point or task.

Parameters:
  • percent (float) – Percent likelihood of passing.

  • secs (int) – Seconds to sleep before starting the trigger.

  • _ (Any | None) – Used to allow users to specialize the trigger with extra parameters.

Returns:

(satisfied, results)

satisfied:

True if satisfied else False.

results:

A dictionary containing the following keys:

COLOR

A random colour (e.g. red, orange, …).

SIZE

A random size (e.g. small, medium, …).

Return type:

tuple

Examples

If the percent is zero, it returns that the trigger condition was not satisfied, and an empty dictionary.

>>> xrandom(0, 0)
(False, {})

If the percent is not zero, but the random percent success is not met, then it also returns that the trigger condition was not satisfied, and an empty dictionary.

>>> import sys
>>> mocked_random = lambda: 0.3
>>> sys.modules[__name__].random = mocked_random
>>> xrandom(15.5, 0)
(False, {})

Finally, if the percent is not zero, and the random percent success is met, then it returns that the trigger condition was satisfied, and a dictionary containing random colour and size as result.

>>> import sys
>>> mocked_random = lambda: 0.9
>>> sys.modules[__name__].random = mocked_random
>>> mocked_randint = lambda x, y: 1
>>> sys.modules[__name__].randint = mocked_randint
>>> xrandom(99.99, 0)
(True, {'COLOR': 'orange', 'SIZE': 'small'})
cylc.flow.xtriggers.xrandom.validate(args)[source]

Validate the args that xrandom is called with.

Cylc calls this function automatically when parsing the workflow.

Here we specify the rules for args are:

  • percent: Must be 0 ≤ x ≤ 100

  • secs: Must be an integer.

Parameters:

args (Dict[str, Any])

An example xrandom trigger workflow:

[scheduling]
    cycling mode = integer
    initial cycle point = 1
    final cycle point = 5
    [[xtriggers]]
        # Called once for all dependent tasks (all cycles).
        x1 = xrandom(percent=25, secs=2):PT5S
        # Called once per dependent task name (all cycles).
        x2 = xrandom(percent=25, secs=2, _=%(name)s):PT5S
        # Called once per cycle for all dependent tasks.
        x3 = xrandom(percent=25, secs=2, _=%(point)s):PT5S
    [[graph]]
        P1 = """
            # all instances of foo and bar should trigger at once, together
            @x1 => foo & bar

            # all instances of cat should trigger at once, and separately, all
            # instances of baz should trigger at once.
            @x2 => cat & dog

            # each instance of qux should trigger separately
            @x3 => qux

            # Result:
            # - x1 should return True once, and not be called again.
            # - x2 should return True twice, and not be called again.
            # - x3 should return True five times, and not be called again.
            # i.e. 8 True returns in the 5-cycle workflow run.
        """
[runtime]
    [[root]]
        script = sleep 5
    [[foo, bar, cat, dog, qux]]

Custom Xtrigger Functions

Xtrigger functions are Python functions with some special requirements.

Requirements:

Xtrigger functions must be compatible with the Python version that runs the scheduler (see Distribution requirements for the latest version specification).

Xtrigger functions must return a Tuple of (Boolean, Dictionary):

  • (False, {}) - failed: trigger condition not met

  • (True, results) - succeeded: trigger condition met

where results is a flat (non-nested) dictionary of information to be passed to dependent tasks - see Xtrigger Results. Each dictionary key must be valid as an environment variable name.

Xtrigger functions can take arbitrary positional and keyword arguments, except for the keyword sequential, which is reserved for sequential xtriggers.

Xtrigger functions cannot store data between invocations, because each call is executed via a wrapper in a new subprocess. If necessary the filesystem could be used for persistent data.

If xtriggers depend on files (say) that might not exist when the function is first called, just return trigger condition not met (i.e., (False, {})).

Installation:

We recommend using the cylc.xtriggers entry point to install the xtrigger as a Python package - see Developing xtrigger plugins.

Otherwise, e.g., for installing custom xtriggers under your own user account, xtrigger functions must be:

  • defined in a module with the same name as the function

  • located in: - <workflow-dir>/lib/python/; - or anywhere in your $CYLC_PYTHONPATH

Custom xtrigger module can also provide a validate function for checking configured arguments, see Xtrigger Validation Functions for details.

Passing Workflow Parameters to Xtrigger Functions

Workflow and task parameters can be passed to function arguments from the workflow configuration via the following string templates. Task parameters affect xtrigger specificity.

Templates variables for string replacement in xtrigger functions.

The following string templates are available for use, if the trigger function needs any of this information, in function arguments in the workflow configuration.

[scheduling]
    initial cycle point = now
    [[xtriggers]]
        my_xtrigger = my_xtrigger_fcn('%(workflow)s', '%(point)s')

For an explanation of the substitution syntax, see String Formatting Operations in the Python documentation.

point

The cycle point of the dependent task.

debug

True if Cylc is being run in debug mode (–debug, -vv).

workflow_run_dir

The path to the workflow run directory.

workflow_share_dir

The path to the workflow share directory.

id

The ID of the dependent task.

name

The name of the dependent task.

user_name

The user account under which the workflow is being run.

workflow

The workflow ID.

workflow_name

The workflow ID.

Deprecated since version 8.0.0: Use workflow instead.

suite_name

The workflow ID.

Deprecated since version 8.0.0: Use workflow instead.

suite_run_dir

The path to the workflow run directory.

Deprecated since version 8.0.0: Use workflow_run_dir instead.

suite_share_dir

The path to the workflow share directory.

Deprecated since version 8.0.0: Use workflow_share_dir instead.

Xtrigger Validation Functions

The arguments you call the xtrigger function with are automatically validated against the function signature, however, you might wish to add extra validation logic to your custom xtrigger, e.g. to check argument types or values.

If a function named validate is present alongside the xtrigger in its module, it will be automatically called with a dictionary of all the arguments passed to the xtrigger function in the workflow config. It should raise a cylc.flow.exceptions.WorkflowConfigError exception if an error is detected.

The cylc.flow.xtriggers.xrandom xtrigger module contains an example of an xtrigger validation function.

Filesystem Events?

Cylc does not have built-in support for triggering off of filesystem events such as inotify on Linux. There is no cross-platform standard for filesystem events, and they can only be detected on the generating node in on HPC clusters.

Persistent Event Watchers?

For some applications a process that continually monitors an external condition might be preferred over periodic checking. This would be more difficult to support as a Cylc plugin, but we may decide to do it in the future. In the meantime, consider implementing a small daemon process as the watcher and have your Cylc xtrigger functions interact with that.

Ext-Triggers (push)

These external triggers are hidden task prerequisites that must be satisfied by using the cylc ext-trigger client command to send a pre-defined message to the workflow along with an ID string that distinguishes one instance of the event from another (the name of the target task and its current cycle point are not required). The event ID is just an arbitrary string to Cylc, but it can be used to identify something associated with the event to the workflow - such as the filename of a new externally-generated dataset. When the scheduler receives the event notification it will trigger the next instance of any task waiting on that trigger (whatever its cycle point) and then broadcast (see Cylc Broadcast) the event ID to the cycle point of the triggered task as $CYLC_EXT_TRIGGER_ID. Downstream tasks with the same cycle point therefore know the new event ID too and can use it, if they need to, to identify the same new dataset. In this way a whole workflow can be associated with each new dataset, and multiple datasets can be processed in parallel if they happen to arrive in quick succession.

An externally-triggered task must register the event it waits on in the workflow scheduling section:

# workflow "sat-proc"
[scheduling]
    cycling mode = integer
    initial cycle point = 1
    [[special tasks]]
        external-trigger = get-data("new sat X data avail")
    [[graph]]
        P1 = get-data => conv-data => products

Then, each time a new dataset arrives the external detection system should notify the workflow like this:

$ cylc ext-trigger sat-proc "new sat X data avail" passX12334a

where “sat-proc” is the workflow name and “passX12334a” is the ID string for the new event. The workflow passphrase must be installed on triggering account.

Note

Only one task in a workflow can trigger off a particular external message. Other tasks can trigger off the externally triggered task as required, of course.

Here is a working example of a simulated satellite processing workflow:

#!Jinja2

# WARNING: this uses the old-style push external trigger mechanism.

[meta]
    title = Real time satellite data processing demo, variant 3 of 3

    description = """
        Successive cycle points retrieve and process the next arbitrarily timed
        and labelled dataset, in parallel if the data comes in quickly. This
        variant of the workflow has initial get_data tasks with external triggers:
        they do not submit until triggered by an external system.
    """

# Note that the satellite simulator task here that supplies the external event
# trigger happens to be a workflow task - i.e. it is not really "external" - but
# this is only a convenience - an easy route to a self-contained example workflow.

# you can monitor output processing with:
# $ watch -n 1 \
#    "find ~/cylc-run/<workflow-id>/share; find ~/cylc-run/<workflow-id>/work"

{% set N_DATASETS = 5 %}

# define shared directories (could use runtime namespaces for this)
{% set DATA_IN_DIR = "$CYLC_WORKFLOW_SHARE_DIR/incoming" %}
{% set PRODUCT_DIR = "$CYLC_WORKFLOW_SHARE_DIR/products" %}

[scheduling]
    cycling mode = integer
    initial cycle point = 1
    final cycle point = {{N_DATASETS}}
    [[special tasks]]
        external-trigger = get_data("new dataset ready for processing")
    [[graph]]
        # first cycle
        R1 = prep => satsim & get_data
        P1 = """
            # Processing chain for each dataset
            get_data => proc1 => proc2 => products
            # As one dataset is retrieved, start waiting on another.
            get_data[-P1] => get_data
        """
        # last cycle
        R1//{{N_DATASETS}} = products => collate

[runtime]
    [[prep]]
        script = \
rm -rf $CYLC_WORKFLOW_SHARE_DIR $CYLC_WORKFLOW_WORK_DIR
        [[[meta]]]
            title = clean the workflow output directories
    [[satsim]]
        pre-script = mkdir -p {{DATA_IN_DIR}}
        script = """
            COUNT=0
            while true; do
                ((COUNT == {{N_DATASETS}})) && break
                # sleep $((RANDOM % 20))
                # Generate datasets very quickly to test parallel processing.
                DATA_ID=$(date +%s).$((RANDOM % 100))
                DATA_FILE=dataset-${DATA_ID}.raw
                touch {{DATA_IN_DIR}}/$DATA_FILE
                ((COUNT += 1))
                # (required to distinguish fast-arriving messages).
                # Trigger downstream processing in the workflow.
                cylc ext-trigger $CYLC_WORKFLOW_ID \
                   "new dataset ready for processing" $DATA_ID
            done
        """
        [[[meta]]]
            title = simulate a satellite data feed
            description = """
                Generates {{N_DATASETS}} arbitrarily labelled datasets very
                quickly, to show parallel processing streams.
            """
    [[WORKDIR]]
        # Define a common cycle-point-specific work-directory for all
        # processing tasks so that they all work on the same dataset.
        work sub-directory = proc-$CYLC_TASK_CYCLE_POINT
        post-script = sleep 5
        [[[environment]]]
            DATASET = dataset-$CYLC_EXT_TRIGGER_ID

    [[get_data]]
        inherit = WORKDIR
        script = mv {{DATA_IN_DIR}}/${DATASET}.raw $PWD
        [[[meta]]]
            title = retrieve next dataset
            description = just do it - we know it exists already
  [[proc1]]
        inherit = WORKDIR
        script = mv ${DATASET}.raw ${DATASET}.proc1
        [[[meta]]]
            title = convert .raw dataset to .proc1 form
   [[proc2]]
        inherit = WORKDIR
        script = mv ${DATASET}.proc1 ${DATASET}.proc2
        [[[meta]]]
            title = convert .proc1 dataset to .proc2 form
   [[products]]
        inherit = WORKDIR
        script = """
            mkdir -p {{PRODUCT_DIR}}
            mv ${DATASET}.proc2 {{PRODUCT_DIR}}/${DATASET}.prod
        """
        [[[meta]]]
            title = generate products from .proc2 processed dataset
    [[collate]]
        # Note you might want to use "cylc workflow-state" to check that
        # _all_ product tasks have finished before collating results.
        script = """
            echo PRODUCTS:
            ls {{PRODUCT_DIR}}
            sleep 20
        """
        [[[meta]]]
            title = collate all products from the workflow run

External triggers are not normally needed in datetime cycling workflows driven by real time data that comes in at regular intervals. In these cases a data retrieval task can be clock-triggered (and have appropriate retry intervals) to submit at the expected data arrival time, so little time is wasted in polling. However, if the arrival time of the cycle-point-specific data is highly variable, external triggering may be used with the cycle point embedded in the message:

# workflow "data-proc"
[scheduling]
    initial cycle point = 20150125T00
    final cycle point   = 20150126T00
    [[special tasks]]
        external-trigger = get-data("data arrived for $CYLC_TASK_CYCLE_POINT")
    [[graph]]
        T00 = init-process => get-data => post-process

Once the variable-length waiting is finished, an external detection system should notify the workflow like this:

$ cylc ext-trigger data-proc "data arrived for 20150126T00" passX12334a

where “data-proc” is the workflow name, the cycle point has replaced the variable in the trigger string, and “passX12334a” is the ID string for the new event. The workflow passphrase must be installed on the triggering account. In this case, the event will trigger for the second cycle point but not the first because of the cycle-point matching.

Workflow State Polling (old style)

Warning

Please read Built-in Workflow State Triggers before using the older inter-workflow triggering mechanism described in this section.

The cylc workflow-state command interrogates workflow run databases. It has a polling mode that waits for a given task in the target workflow to achieve a given state, or receive a given message. This can be used to make task scripting wait for a remote task to succeed (for example).

Automatic workflow-state polling tasks can be defined with in the graph. They get automatically-generated task scripting that uses cylc workflow-state appropriately (it is an error to give your own script item for these tasks).

Here’s how to trigger a task bar off a task foo in a remote workflow called other.workflow:

[scheduling]
    [[graph]]
        T00, T12 = "my-foo<other.workflow::foo> => bar"

Local task my-foo will poll for the success of foo in workflow other.workflow, at the same cycle point, succeeding only when or if it succeeds. Other task states can also be polled:

T00, T12 = "my-foo<other.workflow::foo:fail> => bar"

The default polling parameters (e.g. maximum number of polls and the interval between them) are printed by cylc workflow-state --help and can be configured if necessary under the local polling task runtime section:

[scheduling]
    [[graph]]
        T00,T12 = "my-foo<other.workflow::foo> => bar"
[runtime]
    [[my-foo]]
        [[[workflow state polling]]]
            max-polls = 100
            interval = PT10S

To poll for the target task to receive a message rather than achieve a state, give the message in the runtime configuration (in which case the task status inferred from the graph syntax will be ignored):

[runtime]
    [[my-foo]]
        [[[workflow state polling]]]
            message = "the quick brown fox"

For workflows owned by others, or those with run databases in non-standard locations, use the --run-dir option, or in-workflow:

[runtime]
    [[my-foo]]
        [[[workflow state polling]]]
            run-dir = /path/to/top/level/cylc/run-directory

If the remote task has a different cycling sequence, just arrange for the local polling task to be on the same sequence as the remote task that it represents. For instance, if local task cat cycles 6-hourly at 0,6,12,18 but needs to trigger off a remote task dog at 3,9,15,21:

[scheduling]
    [[graph]]
        T03,T09,T15,T21 = "my-dog<other.workflow::dog>"
        T00,T06,T12,T18 = "my-dog[-PT3H] => cat"

For workflow-state polling, the cycle point is automatically converted to the cycle point format of the target workflow.

The remote workflow does not have to be running when polling commences because the command interrogates the workflow run database, not the scheduler.

Note

The graph syntax for workflow polling tasks cannot be combined with cycle point offsets, family triggers, or parameterized task notation. This does not present a problem because workflow polling tasks can be put on the same cycling sequence as the remote-workflow target task (as recommended above), and there is no point in having multiple tasks (family members or parameterized tasks) performing the same polling operation. Task state triggers can be used with workflow polling, e.g. to trigger another task if polling fails after 10 tries at 10 second intervals:

[scheduling]
    [[graph]]
        R1 = "poller<other-workflow::foo:succeed>:fail => another-task"
[runtime]
    [[my-foo]]
        [[[workflow state polling]]]
            max-polls = 10
            interval = PT10S