Trigger
Trigger a flow on a new file arrival in an Azure Blob Storage container.
This trigger will poll the specified Azure Blob Storage container every interval
. Using the from
and regExp
properties, you can define which files arrival will trigger the flow. Under the hood, we use the Azure Blob Storage API to list the files in a specified location and download them to the internal storage and process them with the declared action
. You can use the action
property to move or delete the files from the container after processing to avoid the trigger to be fired again for the same files during the next polling interval.
type: "io.kestra.plugin.azure.storage.blob.Trigger"
Run a flow if one or more files arrived in the specified Azure Blob Storage bucket location. Then, process all files in a for-loop either sequentially or concurrently, depending on the concurrencyLimit
property.
id: react_to_files
namespace: company.team
tasks:
- id: each
type: io.kestra.plugin.core.flow.ForEach
concurrencyLimit: 1
values: "{{ trigger.blobs | jq('.[].uri') }}"
tasks:
- id: return
type: io.kestra.plugin.core.debug.Return
format: "{{ taskrun.value }}"
triggers:
- id: watch
type: io.kestra.plugin.azure.storage.blob.Trigger
interval: PT5M
endpoint: "https://yourblob.blob.core.windows.net"
connectionString: "DefaultEndpointsProtocol=...=="
container: myBlobContainer
prefix: yourDirectory/subdirectory
action: MOVE
moveTo:
container: mydata
name: archive
Run a flow whenever one or more files arrived in the specified Azure Blob Storage bucket location. Then, process files and delete processed files to avoid re-triggering the flow for the same Blob objects during the next polling interval.
id: process_and_delete_files
namespace: company.team
tasks:
- id: each
type: io.kestra.plugin.core.flow.ForEach
values: "{{ trigger.blobs | jq('.[].name') }}"
tasks:
- id: return
type: io.kestra.plugin.core.debug.Return
format: "{{ taskrun.value }}"
- id: delete
type: io.kestra.plugin.azure.storage.blob.Delete
endpoint: "https://yourblob.blob.core.windows.net"
connectionString: "DefaultEndpointsProtocol=...=="
container: myBlobContainer
name: "{{ taskrun.value }}"
triggers:
- id: watch
type: io.kestra.plugin.azure.storage.blob.Trigger
endpoint: "https://yourblob.blob.core.windows.net"
connectionString: "DefaultEndpointsProtocol=...=="
container: myBlobContainer
prefix: yourDirectory/subdirectory
action: NONE
moveTo:
container: myBlobContainer
name: archive
MOVE
DELETE
NONE
The action to perform on the retrieved files. If using NONE
, make sure to handle the files inside your flow to avoid infinite triggering.
The blob container.
List of conditions in order to limit the flow trigger.
Connection string of the Storage Account.
The delimiter for blob hierarchy, "/" for hierarchy based on directories.
The blob service endpoint.
FILES
FILES
DIRECTORY
BOTH
The filter for files or directories.
PT1M
duration
Interval between polling.
The interval between 2 different polls of schedule, this can avoid to overload the remote system with too many calls. For most of the triggers that depend on external systems, a minimal interval must be at least PT30S. See ISO_8601 Durations for more information of available interval values.
The destination container and key.
Limits the response to keys that begin with the specified prefix.
A regular expression to filter on the full key.
ex:
regExp: .*
to match all files
regExp: .*2020-01-0.\\.csv
to match files between 01 and 09 of january ending with .csv
The SAS token to use for authenticating requests.
This string should only be the query parameters (with or without a leading '?') and not a full URL.
CREATED
RUNNING
PAUSED
RESTARTED
KILLING
SUCCESS
WARNING
FAILED
KILLED
CANCELLED
QUEUED
RETRYING
RETRIED
SKIPPED
List of execution states after which a trigger should be stopped (a.k.a. disabled).
partial-time
SLA daily deadline
Use it only for DAILY_TIME_DEADLINE
SLA.
partial-time
SLA daily end time
Use it only for DAILY_TIME_WINDOW
SLA.
partial-time
SLA daily start time
Use it only for DAILY_TIME_WINDOW
SLA.
DURATION_WINDOW
DAILY_TIME_DEADLINE
DAILY_TIME_WINDOW
DURATION_WINDOW
SLIDING_WINDOW
The type of the SLA
The default SLA is a sliding window (DURATION_WINDOW
) with a window of 24 hours.
duration
The duration of the window
Use it only for DURATION_WINDOW
or SLIDING_WINDOW
SLA.
See ISO_8601 Durations for more information of available duration value.
The start of the window is always based on midnight except if you set windowAdvance parameter. Eg if you have a 10 minutes (PT10M) window,
the first window will be 00: 00 to 00: 10 and a new window will be started each 10 minutes
duration
The window advance duration
Use it only for DURATION_WINDOW
SLA.
Allow to specify the start time of the window
Eg: you want a window of 6 hours (window=PT6H), by default the check will be done between: 00: 00 and 06: 00, 06: 00 and 12: 00, 12: 00 and 18: 00, and 18: 00 and 00: 00.
If you want to check the window between 03: 00 and 09: 00, 09: 00 and 15: 00, 15: 00 and 21: 00, and 21: 00 and 3: 00, you will have to shift the window of 3 hours by settings windowAdvance: PT3H
The flow id.
The namespace of the flow.
uri
The namespace of the flow or the prefix if prefix
is true.
false
If we must look at the flow namespace by prefix (checked using startWith). The prefix is case sensitive.
The flow id.
The namespace of the flow.
time
The time to test must be after this one.
Must be a valid ISO 8601 time with offset.
time
The time to test must be before this one.
Must be a valid ISO 8601 time with offset.
{{ trigger.date }}
The time to test.
Can be any variable or any valid ISO 8601 time. By default, it will use the trigger date.
List of labels to match in the execution.
{{ trigger.date }}
The date to test.
Can be any variable or any valid ISO 8601 datetime. By default, it will use the trigger date.
1
The list of conditions to validate.
If any condition is true, it will allow the event's execution.
String against which to match the execution namespace depending on the provided comparison.
EQUALS
PREFIX
SUFFIX
Comparison to use when checking if namespace matches. If not provided, it will use EQUALS
by default.
false
Whether to look at the flow namespace by prefix. Shortcut for comparison: PREFIX
.
Only used when comparison
is not set
^[a-zA-Z0-9][a-zA-Z0-9_-]*
1
A unique id for the condition
true
Whether to reset the evaluation results of SLA conditions after a first successful evaluation within the given time period.
By default, after a successful evaluation of the set of SLA conditions, the evaluation result is reset, so, the same set of conditions needs to be successfully evaluated again in the same time period to trigger a new execution.
This means that to create multiple executions, the same set of conditions needs to be evaluated to true
multiple times.
You can disable this by setting this property to false
so that, within the same period, each time one of the conditions is satisfied again after a successful evaluation, it will trigger a new execution.
{
"type": "DURATION_WINDOW"
}
Define the time period (or window) for evaluating preconditions.
You can set the type
of sla
to one of the following values:
DURATION_WINDOW
: this is the defaulttype
. It uses a start time (windowAdvance
) and end time (window
) that are moving forward to the next interval whenever the evaluation time reaches the end time, based on the defined durationwindow
. For example, with a 1-day window (the default option:window: PT1D
), the SLA conditions are always evaluated during 24h starting at midnight (i.e. at time 00: 00: 00) each day. If you setwindowAdvance: PT6H
, the window will start at 6 AM each day. If you setwindowAdvance: PT6H
and you also override thewindow
property toPT6H
, the window will start at 6 AM and last for 6 hours — as a result, Kestra will check the SLA conditions during the following time periods: 06: 00 to 12: 00, 12: 00 to 18: 00, 18: 00 to 00: 00, and 00: 00 to 06: 00, and so on.SLIDING_WINDOW
: this option also evaluates SLA conditions over a fixed timewindow
, but it always goes backward from the current time. For example, a sliding window of 1 hour (window: PT1H
) will evaluate executions for the past hour (so between now and one hour before now). It uses a default window of 1 day.DAILY_TIME_DEADLINE
: this option declares that some SLA conditions should be met "before a specific time in a day". With the string propertydeadline
, you can configure a daily cutoff for checking conditions. For example,deadline: "09: 00: 00"
means that the defined SLA conditions should be met from midnight until 9 AM each day; otherwise, the flow will not be triggered.DAILY_TIME_WINDOW
: this option declares that some SLA conditions should be met "within a given time range in a day". For example, a window fromstartTime: "06: 00: 00"
toendTime: "09: 00: 00"
evaluates executions within that interval each day. This option is particularly useful for declarative definition of freshness conditions when building data pipelines. For example, if you only need one successful execution within a given time range to guarantee that some data has been successfully refreshed in order for you to proceed with the next steps of your pipeline, this option can be more useful than a strict DAG-based approach. Usually, each failure in your flow would block the entire pipeline, whereas with this option, you can proceed with the next steps of the pipeline as soon as the data is successfully refreshed at least once within the given time range.
duration
The duration of the window
Deprecated, use timeWindow.window
instead.
duration
The window advance duration
Deprecated, use timeWindow.windowAdvance
instead.
The blob container.
The full blob path on the container.
1
The list of conditions to exclude.
If any conditions is true, it will prevent the event's execution.
FIRST
LAST
SECOND
THIRD
FOURTH
Are you looking for the first or the last day in the month?
MONDAY
TUESDAY
WEDNESDAY
THURSDAY
FRIDAY
SATURDAY
SUNDAY
The day of week.
{{ trigger.date }}
The date to test.
Can be any variable or any valid ISO 8601 datetime. By default, it will use the trigger date.
MONDAY
TUESDAY
WEDNESDAY
THURSDAY
FRIDAY
SATURDAY
SUNDAY
The day of week.
{{ trigger.date }}
The date to test.
Can be any variable or any valid ISO 8601 datetime. By default, it will use the trigger date.
CREATED
RUNNING
PAUSED
RESTARTED
KILLING
SUCCESS
WARNING
FAILED
KILLED
CANCELLED
QUEUED
RETRYING
RETRIED
SKIPPED
List of states that are authorized.
CREATED
RUNNING
PAUSED
RESTARTED
KILLING
SUCCESS
WARNING
FAILED
KILLED
CANCELLED
QUEUED
RETRYING
RETRIED
SKIPPED
List of states that aren't authorized.
date-time
The date to test must be after this one.
Must be a valid ISO 8601 datetime with the zone identifier (use 'Z' for the default zone identifier).
date-time
The date to test must be before this one.
Must be a valid ISO 8601 datetime with the zone identifier (use 'Z' for the default zone identifier).
{{ trigger.date }}
The date to test.
Can be any variable or any valid ISO 8601 datetime. By default, it will use the trigger date.
CREATED
RUNNING
PAUSED
RESTARTED
KILLING
SUCCESS
WARNING
FAILED
KILLED
CANCELLED
QUEUED
RETRYING
RETRIED
SKIPPED
List of states that are authorized.
CREATED
RUNNING
PAUSED
RESTARTED
KILLING
SUCCESS
WARNING
FAILED
KILLED
CANCELLED
QUEUED
RETRYING
RETRIED
SKIPPED
List of states that aren't authorized.
ISO 3166-1 alpha-2 country code. If not set, it uses the country code from the default locale.
It uses the Jollyday library for public holiday calendar that supports more than 70 countries.
{{ trigger.date }}
The date to test.
Can be any variable or any valid ISO 8601 datetime. By default, it will use the trigger date.
ISO 3166-2 country subdivision (e.g., provinces and states) code.
It uses the Jollyday library for public holiday calendar that supports more than 70 countries.