Trigger Trigger

yaml
type: "io.kestra.plugin.azure.storage.blob.Trigger"

Run a flow as soon as files are uploaded to Azure Blob Storage

This trigger will poll the specified Azure Blob Storage bucket every interval. Using the from and regExp properties, you can define which files arrival will trigger the flow. Under the hood, we use the Azure Blob Storage API to list the files in a specified location and download them to the internal storage and process them with the declared action. You can use the action property to move or delete the files from the container after processing to avoid the trigger to be fired again for the same files during the next polling interval.

Examples

Run a flow if one or more files arrived in the specified Azure Blob Storage bucket location. Then, process all files in a for-loop either sequentially or concurrently, depending on the concurrencyLimit property.

yaml
id: react_to_files
namespace: company.team

tasks:
  - id: each
    type: io.kestra.plugin.core.flow.ForEach
    concurrencyLimit: 1
    values: "{{ trigger.blobs | jq('.[].uri') }}"
    tasks:
      - id: return
        type: io.kestra.plugin.core.debug.Return
        format: "{{ taskrun.value }}"

triggers:
  - id: watch
    type: io.kestra.plugin.azure.storage.blob.Trigger
    interval: PT5M
    endpoint: "https://yourblob.blob.core.windows.net"
    connectionString: "DefaultEndpointsProtocol=...=="
    container: myBlobContainer
    prefix: yourDirectory/subdirectory
    action: MOVE
    moveTo:
      container: mydata
      name: archive

Run a flow whenever one or more files arrived in the specified Azure Blob Storage bucket location. Then, process files and delete processed files to avoid re-triggering the flow for the same Blob objects during the next polling interval.

yaml
id: process_and_delete_files
namespace: company.team

tasks:
  - id: each
    type: io.kestra.plugin.core.flow.ForEach
    values: "{{ trigger.blobs | jq('.[].name') }}"
    tasks:
      - id: return
        type: io.kestra.plugin.core.debug.Return
        format: "{{ taskrun.value }}"

      - id: delete
        type: io.kestra.plugin.azure.storage.blob.Delete
        endpoint: "https://yourblob.blob.core.windows.net"
        connectionString: "DefaultEndpointsProtocol=...=="
        container: myBlobContainer
        name: "{{ taskrun.value }}"

triggers:
  - id: watch
    type: io.kestra.plugin.azure.storage.blob.Trigger
    endpoint: "https://yourblob.blob.core.windows.net"
    connectionString: "DefaultEndpointsProtocol=...=="
    container: myBlobContainer
    prefix: yourDirectory/subdirectory
    action: NONE
    moveTo:
      container: myBlobContainer
      name: archive

Properties

action

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️
  • Possible Values:
    • MOVE
    • DELETE
    • NONE

The action to perform on the retrieved files. If using NONE, make sure to handle the files inside your flow to avoid infinite triggering.

container

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

The blob container.

filter

  • Type: string
  • Dynamic:
  • Required: ✔️
  • Default: FILES
  • Possible Values:
    • FILES
    • DIRECTORY
    • BOTH

The filter for files or directories.

conditions

  • Type: array
  • SubType: Condition
  • Dynamic:
  • Required:

List of conditions in order to limit the flow trigger.

connectionString

  • Type: string
  • Dynamic: ✔️
  • Required:

Connection string of the Storage Account.

delimiter

  • Type: string
  • Dynamic: ✔️
  • Required:

The delimiter for blob hierarchy, "/" for hierarchy based on directories.

endpoint

  • Type: string
  • Dynamic: ✔️
  • Required:

The blob service endpoint.

interval

  • Type: string
  • Dynamic:
  • Required:
  • Default: 60.000000000
  • Format: duration

Interval between polling.

The interval between 2 different polls of schedule, this can avoid to overload the remote system with too many calls. For most of the triggers that depend on external systems, a minimal interval must be at least PT30S. See ISO_8601 Durations for more information of available interval values.

moveTo

The destination container and key.

prefix

  • Type: string
  • Dynamic: ✔️
  • Required:

Limits the response to keys that begin with the specified prefix.

regexp

  • Type: string
  • Dynamic: ✔️
  • Required:

A regular expression to filter on the full key.

ex: regExp: .* to match all files regExp: .*2020-01-0.\\.csv to match files between 01 and 09 of january ending with .csv

sasToken

  • Type: string
  • Dynamic: ✔️
  • Required:

The SAS token to use for authenticating requests.

This string should only be the query parameters (with or without a leading '?') and not a full URL.

sharedKeyAccountAccessKey

  • Type: string
  • Dynamic: ✔️
  • Required:

Shared Key access key for authenticating requests.

sharedKeyAccountName

  • Type: string
  • Dynamic: ✔️
  • Required:

Shared Key account name for authenticating requests.

stopAfter

  • Type: array
  • SubType: string
  • Dynamic:
  • Required:

List of execution states after which a trigger should be stopped (a.k.a. disabled).

Outputs

blobs

  • Type: array
  • SubType: Blob
  • Required:

The list of blobs.

Definitions

io.kestra.plugin.azure.storage.blob.models.Blob

Properties

container
  • Type: string
  • Dynamic:
  • Required:
name
  • Type: string
  • Dynamic:
  • Required:
size
  • Type: integer
  • Dynamic:
  • Required:
uri
  • Type: string
  • Dynamic:
  • Required:
  • Format: uri

io.kestra.plugin.azure.storage.blob.Copy-CopyObject

Properties

container
  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

The blob container.

name
  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

The full blob path on the container.

Was this page helpful?