Trigger
Run a flow as soon as files are uploaded to Azure Blob Storage
This trigger will poll the specified Azure Blob Storage bucket every interval
. Using the from
and regExp
properties, you can define which files arrival will trigger the flow. Under the hood, we use the Azure Blob Storage API to list the files in a specified location and download them to the internal storage and process them with the declared action
. You can use the action
property to move or delete the files from the container after processing to avoid the trigger to be fired again for the same files during the next polling interval.
type: "io.kestra.plugin.azure.storage.blob.Trigger"
Run a flow if one or more files arrived in the specified Azure Blob Storage bucket location. Then, process all files in a for-loop either sequentially or concurrently, depending on the concurrencyLimit
property.
id: react_to_files
namespace: company.team
tasks:
- id: each
type: io.kestra.plugin.core.flow.ForEach
concurrencyLimit: 1
values: "{{ trigger.blobs | jq('.[].uri') }}"
tasks:
- id: return
type: io.kestra.plugin.core.debug.Return
format: "{{ taskrun.value }}"
triggers:
- id: watch
type: io.kestra.plugin.azure.storage.blob.Trigger
interval: PT5M
endpoint: "https://yourblob.blob.core.windows.net"
connectionString: "DefaultEndpointsProtocol=...=="
container: myBlobContainer
prefix: yourDirectory/subdirectory
action: MOVE
moveTo:
container: mydata
name: archive
Run a flow whenever one or more files arrived in the specified Azure Blob Storage bucket location. Then, process files and delete processed files to avoid re-triggering the flow for the same Blob objects during the next polling interval.
id: process_and_delete_files
namespace: company.team
tasks:
- id: each
type: io.kestra.plugin.core.flow.ForEach
values: "{{ trigger.blobs | jq('.[].name') }}"
tasks:
- id: return
type: io.kestra.plugin.core.debug.Return
format: "{{ taskrun.value }}"
- id: delete
type: io.kestra.plugin.azure.storage.blob.Delete
endpoint: "https://yourblob.blob.core.windows.net"
connectionString: "DefaultEndpointsProtocol=...=="
container: myBlobContainer
name: "{{ taskrun.value }}"
triggers:
- id: watch
type: io.kestra.plugin.azure.storage.blob.Trigger
endpoint: "https://yourblob.blob.core.windows.net"
connectionString: "DefaultEndpointsProtocol=...=="
container: myBlobContainer
prefix: yourDirectory/subdirectory
action: NONE
moveTo:
container: myBlobContainer
name: archive
The action to perform on the retrieved files. If using NONE
, make sure to handle the files inside your flow to avoid infinite triggering.
The blob container.
The filter for files or directories.
Connection string of the Storage Account.
The delimiter for blob hierarchy, "/" for hierarchy based on directories.
The blob service endpoint.
Interval between polling.
The interval between 2 different polls of schedule, this can avoid to overload the remote system with too many calls. For most of the triggers that depend on external systems, a minimal interval must be at least PT30S. See ISO_8601 Durations for more information of available interval values.
The destination container and key.
Limits the response to keys that begin with the specified prefix.
A regular expression to filter on the full key.
ex:
regExp: .*
to match all files
regExp: .*2020-01-0.\\.csv
to match files between 01 and 09 of january ending with .csv
The SAS token to use for authenticating requests.
This string should only be the query parameters (with or without a leading '?') and not a full URL.
List of execution states after which a trigger should be stopped (a.k.a. disabled).
The blob container.
The full blob path on the container.