External Triggers
Cylc dependencies allow us to trigger tasks off of events/outputs created within the workflow.
[scheduling]
[[graph]]
# "bar" will wait until "foo" has succeeded
P1D = foo:succeeded => bar
External triggers allow us to trigger tasks off of events that are external to the workflow itself, for example the real-world time (aka wallclock time), the arrival of a data set, or the completion of tasks in other workflows.
[scheduling]
[[graph]]
# "bar" will wait until the real-world time matches the cycle point
P1D = @wall_clock => bar
Cylc provides two mechanisms for implementing external triggers in a workflow:
- Xtriggers (pull):
These are Python functions that are routinely “polled” until a condition is satisfied.
- Ext-triggers (push):
These are commands that send messages to the scheduler once a condition has been satisfied.
Xtriggers (pull)
Xtriggers allow tasks to trigger off of arbitrary external conditions. The scheduler periodically calls a Python function, to check on the condition, until it returns success to satisfy dependent tasks.
You can write Custom Xtrigger Functions, and Cylc has several built in:
Clock xtriggers - real time trigger relative to task cycle point
Workflow state xtriggers - trigger off tasks in other workflows
Toy xtriggers - to facilitate understanding
Periodic Checking
Xtrigger calls commence when the first dependent task enters the active window, repeating at configurable intervals until success is achieved. The default call interval is 10 seconds.
Note
The xtrigger prerequisites of future tasks that have yet to enter the active window, will always show as unsatisfied because the associated xtrigger function has not been called yet.
The scheduler satisfies future tasks when they enter the active window, if they depend on the same xtrigger, without calling the function again - see xtrigger Specificity.
Xtriggers must return quickly, or be killed by the
global.cylc[scheduler]process pool timeout
.
Warning
Each xtrigger call is made in a new Python subprocess. Consider increasing the call interval if you have many xtriggers, to reduce the associated system load.
Declaring Xtriggers
Xtriggers are declared under flow.cylc[scheduling][xtriggers]
by associating a short label with a function name, arguments, and optional
custom check interval.
The label must be prefixed by “@” for use in the graph, and must comply with basic naming rules:
- class cylc.flow.unicode_rules.XtriggerNameValidator[source]
The rules for valid xtrigger labels:
can only contain: latin letters and numbers,
_
cannot start with:
_cylc
The following workflow declares an xtrigger x1 = check_data
. The function
has one argument, a file path, and will be called every 30 seconds until
it succeeds - at which point process_data
can trigger:
[scheduling]
[[xtriggers]]
x1 = check_data(loc="/path/to/data/source"):PT30S
[[graph]]
P1D = "@x1 => process_data"
[runtime]
[[process_data]]
Argument keywords can be omitted, so long as argument order is preserved:
[scheduling]
[[xtriggers]]
x1 = check_data("/path/to/data/source"):PT30S
Note
Trigger labels can be used with &
(AND) operators in the graph, but
currently not with |
(OR) - attempts to do that will fail validation.
Xtrigger Results
Xtrigger functions must return a flat dictionary of results to be
broadcast to dependent tasks, via environment
variables named as <xtrigger_label>_<dictionary_key>
.
For example, if the x1
xtrigger returns this dictionary:
# returned by check_data() on success:
{
"data_path": "/path/to/data",
"data_type": "netcdf"
}
Then the process_data
task, which depends on x1
, will see the
following environment variables:
# job environment of process_data:
x1_data_path="/path/to/data"
x1_data_type="netcdf"
The process_data
task would likely run an application that needs this
information in terms of its native configuration. You can translate from
xtrigger to application in the workflow configuration:
[runtime]
[[process_data]]
script = run-process.py
[[[environment]]]
INPUT_DATA_LOCN = $x1_data_path
INPUT_DATA_TYPE = $x1_date_type
Task and Cycle Specificity
Cylc makes a call sequence for each unique xtrigger with one or more dependent tasks in the active window. Uniqueness is determined by the function signature, i.e. the function name and arguments.
Depending on the argument list, an xtrigger can be universal - the same for all tasks that depend on it; or specific - to the name and/or cycle point of tasks that depend on it.
Universal xtriggers
If an xtrigger has no arguments that vary as the workflow runs, a single call sequence will satisfy every dependent task in the entire graph.
Below, every cycle point instance of process_data
depends on the same
xtrigger, which presumably checks data for the entire workflow. Once satisfied
it allows every instance of the task to run:
[scheduling]
cycling mode = integer
initial cycle point = 1
final cycle point = 10
[[xtriggers]]
x = check_data("/path/to/data")
[[graph]]
P1 = "@x => process_data"
[runtime]
[[check_data]]
Task and Cycle-Specific Xtriggers
Xtrigger arguments can incorporate string templates as placeholders for certain workflow parameters - see Custom Xtrigger Functions. Several of these are specific to the cycle point or name of tasks that depend on the xtrigger:
%(point)s
- cycle point of the dependent task%(names)s
- name of the dependent task%(id)s
- identity (point/name
) of the dependent task
If these are used, a new call sequence, with new arguments, will commence whenever a dependent task with a new name or cycle point enters the active window.
Below, every instance of process_data
depends on a different, cycle
point-specific xtrigger, which presumably checks data just for that instance.
Each xtrigger, once satisfied, allows just one instance of the task to run:
[scheduling]
cycling mode = integer
initial cycle point = 1
final cycle point = 10
[[xtriggers]]
x = check_data(loc="/path/to/data", cycle="%(point)s")
[[graph]]
P1 = "@x => process_data"
Note
You have to inspect the function signature to see the cycle and task specificity of xtriggers.
The following working example shows four xtriggers based on the toy “echo” xtrigger, which takes an arbitrary list of arguments:
[scheduling]
cycling mode = integer
initial cycle point = 1
final cycle point = 2
[[xtriggers]]
w1 = echo(succeed=True) # universal
x2 = echo(succeed=True, task="%(name)s") # task name specific
y2 = echo(succeed=True, cycle="%(point)s") # cycle point specific
z4 = echo(succeed=True, task="%(name)s", cycle="%(point)s") # both
[[graph]]
P1 = "@w1 & @x1 & @y2 & @z4 => foo & bar"
[runtime]
[[foo, bar]]
Run this with cylc play --no-detach
and watch when each xtrigger is called:
w1
will be called once, because it has only static arguments; x2
will
be called twice, once for each task name (regardless of cycle point); y2
will be called twice, once for each cycle point (regardless of task name); and
z4
will be called four times, once for each task in each cycle point:
$ cylc cat-log xtr | grep 'xtrigger succeeded'
INFO - xtrigger succeeded: w1 = echo(succeed=True)
INFO - xtrigger succeeded: x2 = echo(succeed=True, task=bar)
INFO - xtrigger succeeded: y2 = echo(cycle=1, succeed=True)
INFO - xtrigger succeeded: z4 = echo(cycle=1, succeed=True, task=bar)
INFO - xtrigger succeeded: x2 = echo(succeed=True, task=foo)
INFO - xtrigger succeeded: z4 = echo(cycle=1, succeed=True, task=foo)
INFO - xtrigger succeeded: y2 = echo(cycle=2, succeed=True)
INFO - xtrigger succeeded: z4 = echo(cycle=2, succeed=True, task=bar)
INFO - xtrigger succeeded: z4 = echo(cycle=2, succeed=True, task=foo)
Sequential Xtriggers
Parentless tasks, which often depend on clock or other xtriggers, automatically spawn into the active window, multiple cycles ahead (to the runahead limit). If they depend on xtriggers that will only be satisfied in cycle point order, this causes unnecessary xtrigger checking and UI clutter.
Sequential xtriggers prevent this by delaying the spawning of the next dependent task instance until the current one is satisfied. To do this (in reverse order of precedence):
set
[scheduling]sequential xtriggers=True
for all xtriggers in the workflowadd a
sequential=True
argument to function definitions (in Python source files)add a
sequential=True
argument to function declarations (in workflow configurations)
Note
The built in wall_clock
xtrigger is sequential by default.
Forcing Xtriggers
You can manually satisfy a task’s xtrigger prerequisites via the GUI or command line, so the task can run even if the xtrigger has not yet succeeded.
This will not affect other tasks that depend on the same xtrigger, but the scheduler will stop checking the xtrigger if no other active tasks depend on it.
# Satisfy @x1 => foo in cycle point 1
$ cylc set --pre=xtrigger/x1 my_workflow//1/foo
# See cylc set --help
For this to work without causing task failure, set appropriate default values for xtrigger results in task scripting.
Built-in Clock Triggers
Clock xtriggers succeed when the real (“wall clock”) time reaches some offset from the task’s cycle point value.
Note
These should be used instead of the older task clock triggers documented in Clock Triggers.
The clock xtrigger function signature looks like this:
- cylc.flow.xtriggers.wall_clock.wall_clock(offset='PT0S', sequential=True)[source]
Trigger at a specific real “wall clock” time relative to the cycle point in the graph.
Clock triggers, unlike other trigger functions, are executed synchronously in the main process.
- Parameters:
offset (str) – ISO 8601 interval to wait after the cycle point is reached in real time before triggering. May be negative, in which case it will trigger before the real time reaches the cycle point.
sequential (bool) – Wall-clock xtriggers are run sequentially by default. See Sequential Xtriggers for more details.
Changed in version 8.3.0: The
sequential
argument was added.
Note
Clock xtriggers are cycle-point specific by nature; you don’t need to use function arguments to achieve this.
In the following workflow, task foo
has a daily cycle point sequence, and
each task instance will trigger when the real time is one hour past its cycle
point value.
[scheduling]
initial cycle point = 2018-01-01
[[xtriggers]]
clock_1 = wall_clock(offset=PT1H)
[[graph]]
P1D = "@clock_1 => foo"
[runtime]
[[foo]]
script = run-foo.sh
Or omitting the argument keyword:
[scheduling]
[[xtriggers]]
clock_1 = wall_clock(PT1H)
A zero-offset clock trigger does not need to be declared before use:
[scheduling]
initial cycle point = 2018-01-01
[[graph]]
# zero-offset clock trigger:
P1D = "@wall_clock => foo"
[runtime]
[[foo]]
script = run-foo.sh
Built-in Workflow State Triggers
Workflow-state xtriggers succeed when a given task in another workflow achieves a given state or output.
Note
These should be used instead of the older workflow state polling tasks described in Workflow State Polling (old style).
The workflow state trigger function signature looks like this:
- cylc.flow.xtriggers.workflow_state.workflow_state(workflow_task_id, offset=None, flow_num=None, is_trigger=False, is_message=False, alt_cylc_run_dir=None)[source]
Connect to a workflow DB and check a task status or output.
If the status or output has been achieved, return {True, result}.
- Parameters:
workflow_task_id (str) – ID (workflow//point/task:selector) of the target task.
offset (str | None) – Offset from cycle point as an ISO8601 or integer duration, e.g. PT1H (1 hour) or P1 (1 integer cycle)
flow_num (int | None) – Flow number of the target task.
is_trigger (bool) – Interpret the task:selector as a task trigger name rather than a task status.
is_message (bool) – Interpret the task:selector as a task output message rather than a task status.
alt_cylc_run_dir (str | None) – Alternate cylc-run directory, e.g. for another user.
- Returns:
(satisfied, result)
- satisfied:
True if
satisfied
elseFalse
.- result:
Dict containing the keys:
workflow
task
point
offset
status
message
trigger
flow_num
run_dir
- Return type:
Changed in version 8.3.0: The
workflow_task_id
argument was introduced to replace the separateworkflow
,point
,task
,status
, andmessage
arguments (which are still supported for backwards compatibility). Theflow_num
argument was added. Thecylc_run_dir
argument was renamed toalt_cylc_run_dir
.
The first argument identifies the target workflow, cycle, task, and status or
output trigger name. The function arguments mirror the arguments and options of
the cylc workflow-state
command - see cylc workflow-state --help
.
As a simple example, consider the following “upstream” workflow (which we want to trigger off of):
[scheduler]
cycle point format = %Y
[scheduling]
initial cycle point = 2005
final cycle point = 2015
[[graph]]
P1Y = "foo => bar"
[runtime]
[[bar]]
script = sleep 10
[[foo]]
script = sleep 5; cylc message "data ready"
[[[outputs]]]
x = "data ready"
It must be installed and run under the name up, as referenced in the “downstream” workflow that depends on it:
[scheduler]
cycle point format = %Y
[scheduling]
initial cycle point = 2010
[[xtriggers]]
upstream = workflow_state("up//%(point)s/foo:x", is_trigger=True):PT10S
clock_0 = wall_clock(offset=PT0H)
[[graph]]
P1Y = """
foo
@clock_0 & @upstream => FAM:succeed-all => blam
"""
[runtime]
[[root]]
script = sleep 5
[[foo, blam]]
[[FAM]]
[[f1,f2,f3]]
inherit = FAM
Try starting the downstream workflow first, then the upstream, and watch what
happens. In each cycle point the @upstream
trigger in the downstream workflow
waits for the upstream task foo
(with the same cycle point) workflow to generate
the “data ready” message.
Note
The
workflow_state
trigger function, like thecylc workflow-state
command, must have read-access to the upstream workflow’s public database.The task cycle point is supplied by a string template
%(point)s
. See Custom Xtrigger Functions) for other string templates available to xtriggers.
The return value of the workflow_state
trigger function looks like this:
results = {
'workflow': workflow_id,
'task': task_name,
'point': cycle_point,
'status': task_status, # or
'trigger': task_output_trigger, # or
'message': task_output_message,
'flow_num': flow_num # if given
}
return (satisfied, results)
The results
dictionary contains the names and values of the
target workflow state parameters. Each name gets qualified with the
unique trigger name (“upstream” here) and passed to the environment of
dependent tasks (the members of the FAM
family in this case).
To see this, take a look at the job script for one of the downstream tasks:
% cylc cat-log -f j dn//2011/f22011
...
cylc__job__inst__user_env() {
# TASK RUNTIME ENVIRONMENT:
export upstream_workflow upstream_cylc_run_dir upstream_offset \
upstream_message upstream_status upstream_point upstream_task
upstream_workflow="up"
upstream_task="foo"
upstream_point="2011"
upstream_status="succeeded"
}
...
Note
The dependent task has to know the name of the xtrigger that it depends on - “upstream” in this case - in order to use this information. However the name could be given to the task environment in the workflow configuration.
Built-in Toy Xtriggers
echo
The toy echo
trigger simply prints any arguments that you give it to stdout,
and then fails (trigger condition not met) or succeeds (trigger condition met)
according to the value of a succeed=True
argument (which defaults to
False
). On success, it returns all arguments in the result dictionary.
- cylc.flow.xtriggers.echo.echo(*args, **kwargs)[source]
Print arguments to stdout, return kwargs[‘succeed’] and kwargs.
This may be a useful aid to understanding how xtriggers work.
- Parameters:
succeed – Set the success of failure of this xtrigger.
*args – Print to stdout.
**kwargs – Print to stdout, and return as output.
- Returns:
(True/False, kwargs)
- Return type:
Examples
>>> echo('Breakfast Time', succeed=True, egg='poached') echo: ARGS: ('Breakfast Time',) echo: KWARGS: {'succeed': True, 'egg': 'poached'} (True, {'succeed': True, 'egg': 'poached'})
[scheduling]
initial cycle point = now
[[xtriggers]]
echo_1 = echo(succeeded=True, hello, 99, point=%(point)s, foo=10)
[[graph]]
PT1H = "@echo_1 => foo"
[runtime]
[[foo]]
script = "printenv | grep echo_1"
Run this with cylc play --no-detach <workflow>
and watch your terminal to see
the xtrigger calls. View the task job log with cylc cat-log <workflow_id>//1/foo
to confirm that the dependent task received the xtrigger results.
xrandom
The toy xrandom
function sleeps for a configurable amount of time (useful for
testing the effect of a long-running trigger function - which should be avoided)
and has a configurable random chance of success. The function signature is:
- cylc.flow.xtriggers.xrandom.xrandom(percent, secs=0, _=None)[source]
Random xtrigger, with configurable sleep and percent success.
Sleep for
sec
seconds, and report satisfied withpercent
likelihood.The
_
argument is not used in the function code, but can be used to specialize the function signature to cycle point or task.- Parameters:
- Returns:
(satisfied, results)
- satisfied:
True if
satisfied
elseFalse
.- results:
A dictionary containing the following keys:
COLOR
A random colour (e.g. red, orange, …).
SIZE
A random size (e.g. small, medium, …).
- Return type:
Examples
If the percent is zero, it returns that the trigger condition was not satisfied, and an empty dictionary.
>>> xrandom(0, 0) (False, {})
If the percent is not zero, but the random percent success is not met, then it also returns that the trigger condition was not satisfied, and an empty dictionary.
>>> import sys >>> mocked_random = lambda: 0.3 >>> sys.modules[__name__].random = mocked_random >>> xrandom(15.5, 0) (False, {})
Finally, if the percent is not zero, and the random percent success is met, then it returns that the trigger condition was satisfied, and a dictionary containing random colour and size as result.
>>> import sys >>> mocked_random = lambda: 0.9 >>> sys.modules[__name__].random = mocked_random >>> mocked_randint = lambda x, y: 1 >>> sys.modules[__name__].randint = mocked_randint >>> xrandom(99.99, 0) (True, {'COLOR': 'orange', 'SIZE': 'small'})
- cylc.flow.xtriggers.xrandom.validate(args)[source]
Validate the args that xrandom is called with.
Cylc calls this function automatically when parsing the workflow.
Here we specify the rules for args are:
percent: Must be 0 ≤ x ≤ 100
secs: Must be an integer.
An example xrandom trigger workflow:
[scheduling]
cycling mode = integer
initial cycle point = 1
final cycle point = 5
[[xtriggers]]
# Called once for all dependent tasks (all cycles).
x1 = xrandom(percent=25, secs=2):PT5S
# Called once per dependent task name (all cycles).
x2 = xrandom(percent=25, secs=2, _=%(name)s):PT5S
# Called once per cycle for all dependent tasks.
x3 = xrandom(percent=25, secs=2, _=%(point)s):PT5S
[[graph]]
P1 = """
# all instances of foo and bar should trigger at once, together
@x1 => foo & bar
# all instances of cat should trigger at once, and separately, all
# instances of baz should trigger at once.
@x2 => cat & dog
# each instance of qux should trigger separately
@x3 => qux
# Result:
# - x1 should return True once, and not be called again.
# - x2 should return True twice, and not be called again.
# - x3 should return True five times, and not be called again.
# i.e. 8 True returns in the 5-cycle workflow run.
"""
[runtime]
[[root]]
script = sleep 5
[[foo, bar, cat, dog, qux]]
Custom Xtrigger Functions
Xtrigger functions are Python functions with some special requirements.
Requirements:
Xtrigger functions must be compatible with the Python version that runs the scheduler (see Distribution requirements for the latest version specification).
Xtrigger functions must return a Tuple of (Boolean, Dictionary)
:
(False, {})
- failed: trigger condition not met(True, results)
- succeeded: trigger condition met
where results
is a flat (non-nested) dictionary of information to be passed
to dependent tasks - see Xtrigger Results.
Each dictionary key must be valid as an
environment variable
name.
Xtrigger functions can take arbitrary positional and keyword arguments, except
for the keyword sequential
, which is
reserved for sequential xtriggers.
Xtrigger functions cannot store data between invocations, because each call is executed via a wrapper in a new subprocess. If necessary the filesystem could be used for persistent data.
If xtriggers depend on files (say) that might not exist when the function is first
called, just return trigger condition not met (i.e., (False, {})
).
Installation:
We recommend using the cylc.xtriggers
entry point to install the xtrigger
as a Python package - see Developing xtrigger plugins.
Otherwise, e.g., for installing custom xtriggers under your own user account, xtrigger functions must be:
defined in a module with the same name as the function
located in: -
<workflow-dir>/lib/python/
; - or anywhere in your$CYLC_PYTHONPATH
Custom xtrigger module can also provide a validate
function for checking
configured arguments, see
Xtrigger Validation Functions for details.
Passing Workflow Parameters to Xtrigger Functions
Workflow and task parameters can be passed to function arguments from the workflow configuration via the following string templates. Task parameters affect xtrigger specificity.
- point
The cycle point of the dependent task.
- debug
True if Cylc is being run in debug mode (–debug, -vv).
- workflow_run_dir
The path to the workflow run directory.
- workflow_share_dir
The path to the workflow share directory.
- id
The ID of the dependent task.
- name
The name of the dependent task.
- user_name
The user account under which the workflow is being run.
- workflow
The workflow ID.
- workflow_name
The workflow ID.
Deprecated since version 8.0.0: Use
workflow
instead.- suite_name
The workflow ID.
Deprecated since version 8.0.0: Use
workflow
instead.- suite_run_dir
The path to the workflow run directory.
Deprecated since version 8.0.0: Use
workflow_run_dir
instead.- suite_share_dir
The path to the workflow share directory.
Deprecated since version 8.0.0: Use
workflow_share_dir
instead.
Templates variables for string replacement in xtrigger functions.
The following string templates are available for use, if the trigger function needs any of this information, in function arguments in the workflow configuration.
[scheduling]
initial cycle point = now
[[xtriggers]]
my_xtrigger = my_xtrigger_fcn('%(workflow)s', '%(point)s')
For an explanation of the substitution syntax, see String Formatting Operations in the Python documentation.
Xtrigger Validation Functions
The arguments you call the xtrigger function with are automatically validated against the function signature, however, you might wish to add extra validation logic to your custom xtrigger, e.g. to check argument types or values.
If a function named validate
is present alongside the xtrigger in its
module, it will be automatically called with a dictionary of all the arguments
passed to the xtrigger function in the workflow config. It should raise a
cylc.flow.exceptions.WorkflowConfigError
exception if an error is
detected.
The cylc.flow.xtriggers.xrandom
xtrigger module contains an example
of an xtrigger validation function.
Filesystem Events?
Cylc does not have built-in support for triggering off of filesystem events
such as inotify
on Linux. There is no cross-platform standard for
filesystem events, and they can only be detected on the generating node in
on HPC clusters.
Persistent Event Watchers?
For some applications a process that continually monitors an external condition might be preferred over periodic checking. This would be more difficult to support as a Cylc plugin, but we may decide to do it in the future. In the meantime, consider implementing a small daemon process as the watcher and have your Cylc xtrigger functions interact with that.
Ext-Triggers (push)
These external triggers are hidden task prerequisites that must be satisfied by
using the cylc ext-trigger
client command to send a pre-defined message to
the workflow along with an ID string that distinguishes one instance of the
event from another (the name of the target task and its current cycle point are
not required). The event ID is just an arbitrary string to Cylc, but it can be
used to identify something associated with the event to the workflow - such as
the filename of a new externally-generated dataset. When the scheduler
receives the event notification it will trigger the next instance of any task
waiting on that trigger (whatever its cycle point) and then broadcast
(see Cylc Broadcast) the event ID to the cycle point of the triggered
task as $CYLC_EXT_TRIGGER_ID
. Downstream tasks with the same cycle
point therefore know the new event ID too and can use it, if they need to, to
identify the same new dataset. In this way a whole workflow can be associated
with each new dataset, and multiple datasets can be processed in parallel if
they happen to arrive in quick succession.
An externally-triggered task must register the event it waits on in the workflow scheduling section:
# workflow "sat-proc"
[scheduling]
cycling mode = integer
initial cycle point = 1
[[special tasks]]
external-trigger = get-data("new sat X data avail")
[[graph]]
P1 = get-data => conv-data => products
Then, each time a new dataset arrives the external detection system should notify the workflow like this:
$ cylc ext-trigger sat-proc "new sat X data avail" passX12334a
where “sat-proc” is the workflow name and “passX12334a” is the ID string for the new event. The workflow passphrase must be installed on triggering account.
Note
Only one task in a workflow can trigger off a particular external message. Other tasks can trigger off the externally triggered task as required, of course.
Here is a working example of a simulated satellite processing workflow:
#!Jinja2
# WARNING: this uses the old-style push external trigger mechanism.
[meta]
title = Real time satellite data processing demo, variant 3 of 3
description = """
Successive cycle points retrieve and process the next arbitrarily timed
and labelled dataset, in parallel if the data comes in quickly. This
variant of the workflow has initial get_data tasks with external triggers:
they do not submit until triggered by an external system.
"""
# Note that the satellite simulator task here that supplies the external event
# trigger happens to be a workflow task - i.e. it is not really "external" - but
# this is only a convenience - an easy route to a self-contained example workflow.
# you can monitor output processing with:
# $ watch -n 1 \
# "find ~/cylc-run/<workflow-id>/share; find ~/cylc-run/<workflow-id>/work"
{% set N_DATASETS = 5 %}
# define shared directories (could use runtime namespaces for this)
{% set DATA_IN_DIR = "$CYLC_WORKFLOW_SHARE_DIR/incoming" %}
{% set PRODUCT_DIR = "$CYLC_WORKFLOW_SHARE_DIR/products" %}
[scheduling]
cycling mode = integer
initial cycle point = 1
final cycle point = {{N_DATASETS}}
[[special tasks]]
external-trigger = get_data("new dataset ready for processing")
[[graph]]
# first cycle
R1 = prep => satsim & get_data
P1 = """
# Processing chain for each dataset
get_data => proc1 => proc2 => products
# As one dataset is retrieved, start waiting on another.
get_data[-P1] => get_data
"""
# last cycle
R1//{{N_DATASETS}} = products => collate
[runtime]
[[prep]]
script = \
rm -rf $CYLC_WORKFLOW_SHARE_DIR $CYLC_WORKFLOW_WORK_DIR
[[[meta]]]
title = clean the workflow output directories
[[satsim]]
pre-script = mkdir -p {{DATA_IN_DIR}}
script = """
COUNT=0
while true; do
((COUNT == {{N_DATASETS}})) && break
# sleep $((RANDOM % 20))
# Generate datasets very quickly to test parallel processing.
DATA_ID=$(date +%s).$((RANDOM % 100))
DATA_FILE=dataset-${DATA_ID}.raw
touch {{DATA_IN_DIR}}/$DATA_FILE
((COUNT += 1))
# (required to distinguish fast-arriving messages).
# Trigger downstream processing in the workflow.
cylc ext-trigger $CYLC_WORKFLOW_ID \
"new dataset ready for processing" $DATA_ID
done
"""
[[[meta]]]
title = simulate a satellite data feed
description = """
Generates {{N_DATASETS}} arbitrarily labelled datasets very
quickly, to show parallel processing streams.
"""
[[WORKDIR]]
# Define a common cycle-point-specific work-directory for all
# processing tasks so that they all work on the same dataset.
work sub-directory = proc-$CYLC_TASK_CYCLE_POINT
post-script = sleep 5
[[[environment]]]
DATASET = dataset-$CYLC_EXT_TRIGGER_ID
[[get_data]]
inherit = WORKDIR
script = mv {{DATA_IN_DIR}}/${DATASET}.raw $PWD
[[[meta]]]
title = retrieve next dataset
description = just do it - we know it exists already
[[proc1]]
inherit = WORKDIR
script = mv ${DATASET}.raw ${DATASET}.proc1
[[[meta]]]
title = convert .raw dataset to .proc1 form
[[proc2]]
inherit = WORKDIR
script = mv ${DATASET}.proc1 ${DATASET}.proc2
[[[meta]]]
title = convert .proc1 dataset to .proc2 form
[[products]]
inherit = WORKDIR
script = """
mkdir -p {{PRODUCT_DIR}}
mv ${DATASET}.proc2 {{PRODUCT_DIR}}/${DATASET}.prod
"""
[[[meta]]]
title = generate products from .proc2 processed dataset
[[collate]]
# Note you might want to use "cylc workflow-state" to check that
# _all_ product tasks have finished before collating results.
script = """
echo PRODUCTS:
ls {{PRODUCT_DIR}}
sleep 20
"""
[[[meta]]]
title = collate all products from the workflow run
External triggers are not normally needed in datetime cycling workflows driven by real time data that comes in at regular intervals. In these cases a data retrieval task can be clock-triggered (and have appropriate retry intervals) to submit at the expected data arrival time, so little time is wasted in polling. However, if the arrival time of the cycle-point-specific data is highly variable, external triggering may be used with the cycle point embedded in the message:
# workflow "data-proc"
[scheduling]
initial cycle point = 20150125T00
final cycle point = 20150126T00
[[special tasks]]
external-trigger = get-data("data arrived for $CYLC_TASK_CYCLE_POINT")
[[graph]]
T00 = init-process => get-data => post-process
Once the variable-length waiting is finished, an external detection system should notify the workflow like this:
$ cylc ext-trigger data-proc "data arrived for 20150126T00" passX12334a
where “data-proc” is the workflow name, the cycle point has replaced the variable in the trigger string, and “passX12334a” is the ID string for the new event. The workflow passphrase must be installed on the triggering account. In this case, the event will trigger for the second cycle point but not the first because of the cycle-point matching.
Workflow State Polling (old style)
Warning
Please read Built-in Workflow State Triggers before using the older inter-workflow triggering mechanism described in this section.
The cylc workflow-state
command interrogates workflow run databases. It
has a polling mode that waits for a given task in the target workflow to achieve a
given state, or receive a given message. This can be used to make task
scripting wait for a remote task to succeed (for example).
Automatic workflow-state polling tasks can be defined with in the graph. They get
automatically-generated task scripting that uses cylc workflow-state
appropriately (it is an error to give your own script
item for these
tasks).
Here’s how to trigger a task bar
off a task foo
in
a remote workflow called other.workflow
:
[scheduling]
[[graph]]
T00, T12 = "my-foo<other.workflow::foo> => bar"
Local task my-foo
will poll for the success of foo
in workflow other.workflow
, at the same cycle point, succeeding only when
or if it succeeds. Other task states can also be polled:
T00, T12 = "my-foo<other.workflow::foo:fail> => bar"
The default polling parameters (e.g. maximum number of polls and the interval
between them) are printed by cylc workflow-state --help
and can be
configured if necessary under the local polling task runtime section:
[scheduling]
[[graph]]
T00,T12 = "my-foo<other.workflow::foo> => bar"
[runtime]
[[my-foo]]
[[[workflow state polling]]]
max-polls = 100
interval = PT10S
To poll for the target task to receive a message rather than achieve a state, give the message in the runtime configuration (in which case the task status inferred from the graph syntax will be ignored):
[runtime]
[[my-foo]]
[[[workflow state polling]]]
message = "the quick brown fox"
For workflows owned by others, or those with run databases in non-standard
locations, use the --run-dir
option, or in-workflow:
[runtime]
[[my-foo]]
[[[workflow state polling]]]
run-dir = /path/to/top/level/cylc/run-directory
If the remote task has a different cycling sequence, just arrange for the
local polling task to be on the same sequence as the remote task that it
represents. For instance, if local task cat
cycles 6-hourly at
0,6,12,18
but needs to trigger off a remote task dog
at 3,9,15,21
:
[scheduling]
[[graph]]
T03,T09,T15,T21 = "my-dog<other.workflow::dog>"
T00,T06,T12,T18 = "my-dog[-PT3H] => cat"
For workflow-state polling, the cycle point is automatically converted to the cycle point format of the target workflow.
The remote workflow does not have to be running when polling commences because the command interrogates the workflow run database, not the scheduler.
Note
The graph syntax for workflow polling tasks cannot be combined with cycle point offsets, family triggers, or parameterized task notation. This does not present a problem because workflow polling tasks can be put on the same cycling sequence as the remote-workflow target task (as recommended above), and there is no point in having multiple tasks (family members or parameterized tasks) performing the same polling operation. Task state triggers can be used with workflow polling, e.g. to trigger another task if polling fails after 10 tries at 10 second intervals:
[scheduling]
[[graph]]
R1 = "poller<other-workflow::foo:succeed>:fail => another-task"
[runtime]
[[my-foo]]
[[[workflow state polling]]]
max-polls = 10
interval = PT10S