External Triggers
External triggers allow tasks to trigger directly off of external events, which is often preferable to implementing long-running polling tasks in the workflow. The triggering mechanism described in this section is intended to replace the one one documented in Push External Triggers (however, that one is a push mechanism, whereas this one involves regular polling by the scheduler).
If you can write a Python function to check the status of an external condition or event, the scheduler can call it at configurable intervals until it reports success, at which point dependent tasks can trigger and data returned by the function will be passed to the job environments of those tasks. Functions can be written for triggering off of almost anything, such as delivery of a new dataset, creation of a new entry in a database table, or appearance of new data availability notifications in a message broker.
Cylc has several built-in external trigger functions:
clock triggers - see Built-in Clock Triggers
inter-workflow triggers - see Built-in Workflow State Triggers
Trigger functions are normal Python functions, with certain constraints as described below in:
custom trigger functions - see Custom Trigger Functions
External triggers are configured in the
flow.cylc[scheduling][xtriggers]
section.
Built-in Clock Triggers
These are more transparent (exposed in the graph) and efficient (shared among dependent tasks) than the older clock triggers described in Clock Triggers.
Clock triggers, unlike other trigger functions, are executed synchronously in the main process. The clock trigger function signature looks like this:
wall_clock(offset=None)
The offset
argument is a datetime duration (PT1H
is 1
hour) relative to the dependent task’s cycle point (automatically passed to the
function via a second argument not shown above).
In the following workflow, task foo
has a daily cycle point sequence,
and each task instance can trigger once the wallclock time has passed its
cycle point value by one hour:
[scheduling]
initial cycle point = 2018-01-01
[[xtriggers]]
clock_1 = wall_clock(offset=PT1H):PT10S
[[graph]]
P1D = "@clock_1 => foo"
[runtime]
[[foo]]
script = run-foo.sh
Notice that the short label clock_1
is used to represent the
trigger function in the graph. The function call interval, which determines how
often the scheduler checks the clock, is optional. Here it is
PT10S
(i.e. 10 seconds, which is also the default value).
Argument keywords can be omitted if called in the right order, so the
clock_1
trigger can also be declared like this:
[[xtriggers]]
clock_1 = wall_clock(PT1H)
A zero-offset clock trigger does not need to be declared under
the [xtriggers]
section:
[scheduling]
initial cycle point = 2018-01-01
[[graph]]
# zero-offset clock trigger:
P1D = "@wall_clock => foo"
[runtime]
[[foo]]
script = run-foo.sh
However, when xtriggers are declared the name used must adhere to the following rules:
- class cylc.flow.unicode_rules.XtriggerNameValidator
The rules for valid xtrigger labels:
can only contain: latin letters and numbers,
_
cannot start with:
_cylc
Built-in Workflow State Triggers
These can be used instead of the older workflow state polling tasks described in Triggering Off Of Tasks In Other Workflows for inter-workflow triggering - i.e. to trigger local tasks off of remote task statuses or messages in other workflows.
The workflow state trigger function signature looks like this:
workflow_state(workflow, task, point, offset=None, status='succeeded',
message=None, cylc_run_dir=None, debug=False)
The first three arguments are compulsory; they single out the target workflow name
(workflow
) task name (task
) and cycle point
(point
). The function arguments mirror the arguments and options of
the cylc workflow-state
command - see
cylc workflow-state --help
for documentation.
As a simple example, consider the following “upstream” workflow (which we want to trigger off of):
[scheduler]
cycle point format = %Y
[scheduling]
initial cycle point = 2005
final cycle point = 2015
[[graph]]
P1Y = "foo => bar"
[runtime]
[[bar]]
script = sleep 10
[[foo]]
script = sleep 5; cylc message "data ready"
[[[outputs]]]
x = "data ready"
It must be installed and run under the name up, as referenced in the “downstream” workflow that depends on it:
[scheduler]
cycle point format = %Y
[scheduling]
initial cycle point = 2010
[[xtriggers]]
upstream = workflow_state(workflow=up, task=foo, point=%(point)s, \
message='data ready'):PT10S
clock_0 = wall_clock(offset=PT0H)
[[graph]]
P1Y = """
foo
@clock_0 & @upstream => FAM:succeed-all => blam
"""
[runtime]
[[root]]
script = sleep 5
[[foo, blam]]
[[FAM]]
[[f1,f2,f3]]
inherit = FAM
Try starting the downstream workflow first, then the upstream, and
watch what happens.
In each cycle point the @upstream
trigger in the downstream workflow
waits on the task foo
(with the same cycle point) in the upstream
workflow to emit the data ready message.
Some important points to note about this:
The function call interval, which determines how often the scheduler checks the clock, is optional. Here it is
PT10S
(i.e. 10 seconds, which is also the default value).The
workflow_state
trigger function, like thecylc workflow-state
command, must have read-access to the upstream workflow’s public database.The cycle point argument is supplied by a string template
%(point)s
. The string templates available to trigger function arguments are described in Custom Trigger Functions).
The return value of the workflow_state
trigger function looks like
this:
results = {
'workflow': workflow,
'task': task,
'point': point,
'offset': offset,
'status': status,
'message': message,
'cylc_run_dir': cylc_run_dir
}
return (satisfied, results)
The satisfied
variable is boolean (value True or False, depending
on whether or not the trigger condition was found to be satisfied). The
results
dictionary contains the names and values of all of the
target workflow state parameters. Each item in it gets qualified with the
unique trigger label (“upstream” here) and passed to the environment of
dependent tasks (the members of the FAM
family in this case).
To see this, take a look at the job script for one of the downstream tasks:
% cylc cat-log -f j dn//2011/f22011
...
cylc__job__inst__user_env() {
# TASK RUNTIME ENVIRONMENT:
export upstream_workflow upstream_cylc_run_dir upstream_offset \
upstream_message upstream_status upstream_point upstream_task
upstream_workflow="up"
upstream_cylc_run_dir="/home/vagrant/cylc-run"
upstream_offset="None"
upstream_message="data ready"
upstream_status="succeeded"
upstream_point="2011"
upstream_task="foo"}
...
Note
The task has to know the name (label) of the external trigger that it depends on - “upstream” in this case - in order to use this information. However the name could be given to the task environment in the workflow configuration.
Custom Trigger Functions
Trigger functions are just normal Python functions, with a few special properties:
they must:
be defined in a module with the same name as the function;
be compatible with the same Python version that runs the scheduler (see Distribution requirements for the latest version specification).
they can be located either:
in
<workflow-dir>/lib/python/
;or anywhere in your Python library path.
they can take arbitrary positional and keyword arguments
workflow and task identity, and cycle point, can be passed to trigger functions by using string templates in function arguments (see below)
integer, float, boolean, and string arguments will be recognized and passed to the function as such
if a trigger function depends on files or directories (for example) that might not exist when the function is first called, just return unsatisfied until everything required does exist.
Note
Trigger functions cannot store data Pythonically between invocations because each call is executed in an independent process in the process pool. If necessary the filesystem can be used for this purpose.
- point
The cycle point of the dependent task.
- debug
True if Cylc is being run in debug mode (–debug, -vv).
- workflow_run_dir
The path to the workflow run directory.
- workflow_share_dir
The path to the workflow share directory.
- id
The ID of the dependent task.
- name
The name of the dependent task.
- user_name
The user account under which the workflow is being run.
- workflow
The workflow ID.
- workflow_name
The workflow ID.
Deprecated since version 8.0.0: Use
workflow
instead.- suite_name
The workflow ID.
Deprecated since version 8.0.0: Use
workflow
instead.- suite_run_dir
The path to the workflow run directory.
Deprecated since version 8.0.0: Use
workflow_run_dir
instead.- suite_share_dir
The path to the workflow share directory.
Deprecated since version 8.0.0: Use
workflow_share_dir
instead.
Templates variables for string replacement in xtrigger functions.
The following string templates are available for use, if the trigger function needs any of this information, in function arguments in the workflow configuration.
[scheduling]
initial cycle point = now
[[xtriggers]]
my_xtrigger = my_xtrigger_fcn('%(workflow)', '%(point)')
For an explanation of the substitution syntax, see String Formatting Operations in the Python documentation.
Function return values should be as follows:
if the trigger condition is not satisfied:
return
(False, {})
if the trigger condition is satisfied:
return
(True, results)
where results
is an arbitrary dictionary of information to be passed to
dependent tasks, which in terms of format must:
be flat (non-nested);
contain only keys which are valid as environment variable names.
See Built-in Workflow State Triggers for an example of one such
results
dictionary and how it gets processed by the workflow.
The scheduler manages trigger functions as follows:
they are called asynchronously in the process pool - (except for clock triggers, which are called from the main process)
they are called repeatedly on a configurable interval, until satisfied - the call interval defaults to
PT10S
(10 seconds) - repeat calls are not made until the previous call has returnedthey are subject to the normal process pool command time out - if they take too long to return, the process will be killed
they are shared for efficiency: a single call will be made for all triggers that share the same function signature - i.e.the same function name and arguments
their return status and results are stored in the workflow DB and persist across workflow restarts
their stdout, if any, is redirected to stderr and will be visible in the workflow log in debug mode (stdout is needed to communicate return values from the sub-process in which the function executes)
Toy Examples
echo
The trivial built-in echo
function takes any number of positional
and keyword arguments (from the workflow configuration) and simply prints
them to stdout, and then returns False (i.e. trigger condition not
satisfied). Here it is in its entirety.
def echo(*args, **kwargs):
print("echo: ARGS:", args)
print("echo: KWARGS:", kwargs)
return False, {}
Here’s an example echo trigger workflow:
[scheduling]
initial cycle point = now
[[xtriggers]]
echo_1 = echo(hello, 99, qux=True, point=%(point)s, foo=10)
[[graph]]
PT1H = "@echo_1 => foo"
[runtime]
[[foo]]
script = exit 1
To see the result, run this workflow in debug mode and take a look at the
workflow log (or run cylc play --debug --no-detach <workflow>
and watch
your terminal).
xrandom
The built-in xrandom
function sleeps for a configurable amount of
time (useful for testing the effect of a long-running trigger function
- which should be avoided) and has a configurable random chance of
success. The function signature is:
xrandom(percent, secs=0, _=None, debug=False)
The percent
argument sets the odds of success in any given call;
secs
is the number of seconds to sleep before returning; and the
_
argument (underscore is a conventional name for a variable
that is not used, in Python) is provided to allow specialization of the
trigger to (for example) task name, task ID, or cycle point (just use
the appropriate string templates in the workflow configuration for this).
An example xrandom trigger workflow:
[scheduling]
cycling mode = integer
initial cycle point = 1
final cycle point = 5
[[xtriggers]]
# Called once for all dependent tasks (all cycles).
x1 = xrandom(percent=25, secs=2):PT5S
# Called once per dependent task name (all cycles).
x2 = xrandom(percent=25, secs=2, _=%(name)s):PT5S
# Called once per cycle for all dependent tasks.
x3 = xrandom(percent=25, secs=2, _=%(point)s):PT5S
[[graph]]
P1 = """
# all instances of foo and bar should trigger at once, together
@x1 => foo & bar
# all instances of cat should trigger at once, and separately, all
# instances of baz should trigger at once.
@x2 => cat & dog
# each instance of qux should trigger separately
@x3 => qux
# Result:
# - x1 should return True once, and not be called again.
# - x2 should return True twice, and not be called again.
# - x3 should return True five times, and not be called again.
# i.e. 8 True returns in the 5-cycle workflow run.
"""
[runtime]
[[root]]
script = sleep 5
[[foo, bar, cat, dog, qux]]
Current Limitations
The following issues may be addressed in future Cylc releases:
trigger labels cannot currently be used in conditional (OR) expressions in the graph; attempts to do so will fail validation.
aside from the predefined zero-offset
wall_clock
trigger, all unique trigger function calls must be declared with all of their arguments under the[xtriggers]
section, and referred to by label alone in the graph. It would be convenient (and less verbose, although no more functional) if we could just declare a label against the common arguments, and give remaining arguments (such as different wallclock offsets in clock triggers) as needed in the graph.we may move away from the string templating method for providing workflow and task attributes to trigger function arguments.
Filesystem Events?
Cylc does not have built-in support for triggering off of filesystem events
such as inotify
on Linux. There is no cross-platform standard for
this, and in any case filesystem events are not very useful in HPC cluster
environments where events can only be detected at the specific node on which
they were generated.
Continuous Event Watchers?
For some applications a persistent process that continually monitors the external world is better than discrete periodic checking. This would be more difficult to support as a plugin mechanism in Cylc, but we may decide to do it in the future. In the meantime, consider implementing a small daemon process as the watcher (e.g. to watch continuously for filesystem events) and have your Cylc trigger functions interact with it.
Push External Triggers
Note
The external triggering mechanism described here is harder to use than the newer method of External Triggers. The trigger is a task property rather than something the task depends on, it requires the external system to push a message to the scheduler, and it has a less flexible way to pass information to downstream tasks. However, a push mechanism may sometimes be preferred over polling by the scheduler, so we have retained support pending something better in a future Cylc 8 release.
These external triggers are hidden task prerequisites that must be satisfied by
using the cylc ext-trigger
client command to send a pre-defined message to
the workflow along with an ID string that distinguishes one instance of the
event from another (the name of the target task and its current cycle point are
not required). The event ID is just an arbitrary string to Cylc, but it can be
used to identify something associated with the event to the workflow - such as
the filename of a new externally-generated dataset. When the scheduler
receives the event notification it will trigger the next instance of any task
waiting on that trigger (whatever its cycle point) and then broadcast
(see Cylc Broadcast) the event ID to the cycle point of the triggered
task as $CYLC_EXT_TRIGGER_ID
. Downstream tasks with the same cycle
point therefore know the new event ID too and can use it, if they need to, to
identify the same new dataset. In this way a whole workflow can be associated
with each new dataset, and multiple datasets can be processed in parallel if
they happen to arrive in quick succession.
An externally-triggered task must register the event it waits on in the workflow scheduling section:
# workflow "sat-proc"
[scheduling]
cycling mode = integer
initial cycle point = 1
[[special tasks]]
external-trigger = get-data("new sat X data avail")
[[graph]]
P1 = get-data => conv-data => products
Then, each time a new dataset arrives the external detection system should notify the workflow like this:
$ cylc ext-trigger sat-proc "new sat X data avail" passX12334a
where “sat-proc” is the workflow name and “passX12334a” is the ID string for the new event. The workflow passphrase must be installed on triggering account.
Note
Only one task in a workflow can trigger off a particular external message. Other tasks can trigger off the externally triggered task as required, of course.
Here is a working example of a simulated satellite processing workflow:
#!Jinja2
# WARNING: this uses the old-style push external trigger mechanism.
[meta]
title = Real time satellite data processing demo, variant 3 of 3
description = """
Successive cycle points retrieve and process the next arbitrarily timed
and labelled dataset, in parallel if the data comes in quickly. This
variant of the workflow has initial get_data tasks with external triggers:
they do not submit until triggered by an external system.
"""
# Note that the satellite simulator task here that supplies the external event
# trigger happens to be a workflow task - i.e. it is not really "external" - but
# this is only a convenience - an easy route to a self-contained example workflow.
# you can monitor output processing with:
# $ watch -n 1 \
# "find ~/cylc-run/<workflow-id>/share; find ~/cylc-run/<workflow-id>/work"
{% set N_DATASETS = 5 %}
# define shared directories (could use runtime namespaces for this)
{% set DATA_IN_DIR = "$CYLC_WORKFLOW_SHARE_DIR/incoming" %}
{% set PRODUCT_DIR = "$CYLC_WORKFLOW_SHARE_DIR/products" %}
[scheduling]
cycling mode = integer
initial cycle point = 1
final cycle point = {{N_DATASETS}}
[[special tasks]]
external-trigger = get_data("new dataset ready for processing")
[[graph]]
# first cycle
R1 = prep => satsim & get_data
P1 = """
# Processing chain for each dataset
get_data => proc1 => proc2 => products
# As one dataset is retrieved, start waiting on another.
get_data[-P1] => get_data
"""
# last cycle
R1//{{N_DATASETS}} = products => collate
[runtime]
[[prep]]
script = \
rm -rf $CYLC_WORKFLOW_SHARE_DIR $CYLC_WORKFLOW_WORK_DIR
[[[meta]]]
title = clean the workflow output directories
[[satsim]]
pre-script = mkdir -p {{DATA_IN_DIR}}
script = """
COUNT=0
while true; do
((COUNT == {{N_DATASETS}})) && break
# sleep $((RANDOM % 20))
# Generate datasets very quickly to test parallel processing.
DATA_ID=$(date +%s).$((RANDOM % 100))
DATA_FILE=dataset-${DATA_ID}.raw
touch {{DATA_IN_DIR}}/$DATA_FILE
((COUNT += 1))
# (required to distinguish fast-arriving messages).
# Trigger downstream processing in the workflow.
cylc ext-trigger $CYLC_WORKFLOW_ID \
"new dataset ready for processing" $DATA_ID
done
"""
[[[meta]]]
title = simulate a satellite data feed
description = """
Generates {{N_DATASETS}} arbitrarily labelled datasets very
quickly, to show parallel processing streams.
"""
[[WORKDIR]]
# Define a common cycle-point-specific work-directory for all
# processing tasks so that they all work on the same dataset.
work sub-directory = proc-$CYLC_TASK_CYCLE_POINT
post-script = sleep 5
[[[environment]]]
DATASET = dataset-$CYLC_EXT_TRIGGER_ID
[[get_data]]
inherit = WORKDIR
script = mv {{DATA_IN_DIR}}/${DATASET}.raw $PWD
[[[meta]]]
title = retrieve next dataset
description = just do it - we know it exists already
[[proc1]]
inherit = WORKDIR
script = mv ${DATASET}.raw ${DATASET}.proc1
[[[meta]]]
title = convert .raw dataset to .proc1 form
[[proc2]]
inherit = WORKDIR
script = mv ${DATASET}.proc1 ${DATASET}.proc2
[[[meta]]]
title = convert .proc1 dataset to .proc2 form
[[products]]
inherit = WORKDIR
script = """
mkdir -p {{PRODUCT_DIR}}
mv ${DATASET}.proc2 {{PRODUCT_DIR}}/${DATASET}.prod
"""
[[[meta]]]
title = generate products from .proc2 processed dataset
[[collate]]
# Note you might want to use "cylc workflow-state" to check that
# _all_ product tasks have finished before collating results.
script = """
echo PRODUCTS:
ls {{PRODUCT_DIR}}
sleep 20
"""
[[[meta]]]
title = collate all products from the workflow run
External triggers are not normally needed in datetime cycling workflows driven by real time data that comes in at regular intervals. In these cases a data retrieval task can be clock-triggered (and have appropriate retry intervals) to submit at the expected data arrival time, so little time is wasted in polling. However, if the arrival time of the cycle-point-specific data is highly variable, external triggering may be used with the cycle point embedded in the message:
# workflow "data-proc"
[scheduling]
initial cycle point = 20150125T00
final cycle point = 20150126T00
[[special tasks]]
external-trigger = get-data("data arrived for $CYLC_TASK_CYCLE_POINT")
[[graph]]
T00 = init-process => get-data => post-process
Once the variable-length waiting is finished, an external detection system should notify the workflow like this:
$ cylc ext-trigger data-proc "data arrived for 20150126T00" passX12334a
where “data-proc” is the workflow name, the cycle point has replaced the variable in the trigger string, and “passX12334a” is the ID string for the new event. The workflow passphrase must be installed on the triggering account. In this case, the event will trigger for the second cycle point but not the first because of the cycle-point matching.
Triggering Off Of Tasks In Other Workflows
Note
Please read External Triggers before using the older inter-workflow triggering mechanism described in this section.
The cylc workflow-state
command interrogates workflow run databases. It
has a polling mode that waits for a given task in the target workflow to achieve a
given state, or receive a given message. This can be used to make task
scripting wait for a remote task to succeed (for example).
Automatic workflow-state polling tasks can be defined with in the graph. They get
automatically-generated task scripting that uses cylc workflow-state
appropriately (it is an error to give your own script
item for these
tasks).
Here’s how to trigger a task bar
off a task foo
in
a remote workflow called other.workflow
:
[scheduling]
[[graph]]
T00, T12 = "my-foo<other.workflow::foo> => bar"
Local task my-foo
will poll for the success of foo
in workflow other.workflow
, at the same cycle point, succeeding only when
or if it succeeds. Other task states can also be polled:
T00, T12 = "my-foo<other.workflow::foo:fail> => bar"
The default polling parameters (e.g. maximum number of polls and the interval
between them) are printed by cylc workflow-state --help
and can be
configured if necessary under the local polling task runtime section:
[scheduling]
[[graph]]
T00,T12 = "my-foo<other.workflow::foo> => bar"
[runtime]
[[my-foo]]
[[[workflow state polling]]]
max-polls = 100
interval = PT10S
To poll for the target task to receive a message rather than achieve a state, give the message in the runtime configuration (in which case the task status inferred from the graph syntax will be ignored):
[runtime]
[[my-foo]]
[[[workflow state polling]]]
message = "the quick brown fox"
For workflows owned by others, or those with run databases in non-standard
locations, use the --run-dir
option, or in-workflow:
[runtime]
[[my-foo]]
[[[workflow state polling]]]
run-dir = /path/to/top/level/cylc/run-directory
If the remote task has a different cycling sequence, just arrange for the
local polling task to be on the same sequence as the remote task that it
represents. For instance, if local task cat
cycles 6-hourly at
0,6,12,18
but needs to trigger off a remote task dog
at 3,9,15,21
:
[scheduling]
[[graph]]
T03,T09,T15,T21 = "my-dog<other.workflow::dog>"
T00,T06,T12,T18 = "my-dog[-PT3H] => cat"
For workflow-state polling, the cycle point is automatically converted to the cycle point format of the target workflow.
The remote workflow does not have to be running when polling commences because the command interrogates the workflow run database, not the scheduler.
Note
The graph syntax for workflow polling tasks cannot be combined with cycle point offsets, family triggers, or parameterized task notation. This does not present a problem because workflow polling tasks can be put on the same cycling sequence as the remote-workflow target task (as recommended above), and there is no point in having multiple tasks (family members or parameterized tasks) performing the same polling operation. Task state triggers can be used with workflow polling, e.g. to trigger another task if polling fails after 10 tries at 10 second intervals:
[scheduling]
[[graph]]
R1 = "poller<other-workflow::foo:succeed>:fail => another-task"
[runtime]
[[my-foo]]
[[[workflow state polling]]]
max-polls = 10
interval = PT10S