trigger
Trigger a flow on a new file arrival in an S3 bucket.
Trigger a flow on a new file arrival in an S3 bucket.
Trigger a flow on a new file arrival in an S3 bucket.
This trigger will poll every interval s3 bucket. You can search for all files in a bucket or directory in from or you can filter the files with a regExp. The detection is atomic, internally we do a list and interact only with files listed.
Once a file is detected, we download the file on internal storage and process with a declared action in order to move or delete the files from the bucket (to avoid double detection on new poll).
type: "io.kestra.plugin.aws.s3.trigger"Examples
Wait for a list of files on a s3 bucket and iterate through the files.
id: s3_listen
namespace: company.team
tasks:
- id: each
type: io.kestra.plugin.core.flow.ForEach
values: "{{ trigger.objects | jq('.[].uri') }}"
tasks:
- id: return
type: io.kestra.plugin.core.debug.Return
format: "{{ taskrun.value }}"
triggers:
- id: watch
type: io.kestra.plugin.aws.s3.Trigger
interval: "PT5M"
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
region: "eu-central-1"
bucket: "my-bucket"
prefix: "sub-dir"
action: MOVE
moveTo:
key: archive
bucket: "new-bucket"
Wait for a list of files on a s3 bucket and iterate through the files. Delete files manually after processing to prevent infinite triggering.
id: s3_listen
namespace: company.team
tasks:
- id: each
type: io.kestra.plugin.core.flow.ForEach
values: "{{ trigger.objects | jq('.[].key') }}"
tasks:
- id: return
type: io.kestra.plugin.core.debug.Return
format: "{{ taskrun.value }}"
- id: delete
type: io.kestra.plugin.aws.s3.Delete
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
region: "eu-central-1"
bucket: "my-bucket"
key: "{{ taskrun.value }}"
triggers:
- id: watch
type: io.kestra.plugin.aws.s3.Trigger
interval: "PT5M"
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
region: "eu-central-1"
bucket: "my-bucket"
prefix: "sub-dir"
action: NONE
Properties
action*Requiredstring
MOVEDELETENONEThe action to perform on the retrieved files. If using 'NONE' make sure to handle the files inside your flow to avoid infinite triggering.
bucket*Requiredstring
The S3 bucket where to download the file.
accessKeyIdstring
Access Key Id in order to connect to AWS.
compatibilityModebooleanstring
falseEnable compatibility mode.
Use it to connect to S3 bucket with S3 compatible services that don't support the new transport client.
conditionsNon-dynamic
List of conditions in order to limit the flow trigger.
Condition to allow events between two specific datetime values.
date-timeThe date to test must be after this one.
date-timeThe date to test must be before this one.
Must be a valid ISO 8601 datetime with the zone identifier (use 'Z' for the default zone identifier).
{{ trigger.date }}Condition to allow events on a particular day of the week.
MONDAYTUESDAYWEDNESDAYTHURSDAYFRIDAYSATURDAYSUNDAY{{ trigger.date }}Condition to execute tasks on a specific day of the week relative to the current month (first, last, ...)
FIRSTLASTSECONDTHIRDFOURTHAre you looking for the first or the last day in the month?
MONDAYTUESDAYWEDNESDAYTHURSDAYFRIDAYSATURDAYSUNDAYThe day of week.
{{ trigger.date }}Condition for a specific flow of an execution.
Condition that checks labels of an execution.
List of labels to match in the execution.
Condition for an execution namespace.
String against which to match the execution namespace depending on the provided comparison.
EQUALSPREFIXSUFFIXComparison to use when checking if namespace matches. If not provided, it will use EQUALS by default.
falseWhether to look at the flow namespace by prefix. Shortcut for comparison: PREFIX.
Only used when comparison is not set
Condition based on the outputs of an upstream execution.
Condition based on execution status.
CREATEDSUBMITTEDRUNNINGPAUSEDRESTARTEDKILLINGSUCCESSWARNINGFAILEDKILLEDCANCELLEDQUEUEDRETRYINGRETRIEDSKIPPEDBREAKPOINTRESUBMITTEDCREATEDSUBMITTEDRUNNINGPAUSEDRESTARTEDKILLINGSUCCESSWARNINGFAILEDKILLEDCANCELLEDQUEUEDRETRYINGRETRIEDSKIPPEDBREAKPOINTRESUBMITTEDCondition based on variable expression.
Condition for a specific flow. Note that this condition is deprecated, use `io.kestra.plugin.core.condition.ExecutionFlow` instead.
The flow id.
The namespace of the flow.
Condition for a flow namespace.
The namespace of the flow or the prefix if prefix is true.
falseIf we must look at the flow namespace by prefix (checked using startsWith). The prefix is case sensitive.
Condition that matches if any taskRun has retry attempts.
CREATEDSUBMITTEDRUNNINGPAUSEDRESTARTEDKILLINGSUCCESSWARNINGFAILEDKILLEDCANCELLEDQUEUEDRETRYINGRETRIEDSKIPPEDBREAKPOINTRESUBMITTEDList of states that are authorized.
CREATEDSUBMITTEDRUNNINGPAUSEDRESTARTEDKILLINGSUCCESSWARNINGFAILEDKILLEDCANCELLEDQUEUEDRETRYINGRETRIEDSKIPPEDBREAKPOINTRESUBMITTEDList of states that aren't authorized.
Run a flow if the list of preconditions is met in a time window.
The list of preconditions to wait for
The key must be unique for a trigger because it will be used to store the previous evaluation result.
^[a-zA-Z0-9][a-zA-Z0-9_-]*1A unique id for the condition
trueWhether to reset the evaluation results of SLA conditions after a first successful evaluation within the given time period.
By default, after a successful evaluation of the set of SLA conditions, the evaluation result is reset, so, the same set of conditions needs to be successfully evaluated again in the same time period to trigger a new execution.
This means that to create multiple executions, the same set of conditions needs to be evaluated to true multiple times.
You can disable this by setting this property to false so that, within the same period, each time one of the conditions is satisfied again after a successful evaluation, it will trigger a new execution.
{
"type": "DURATION_WINDOW"
}Define the time period (or window) for evaluating preconditions.
You can set the type of sla to one of the following values:
DURATION_WINDOW: this is the defaulttype. It uses a start time (windowAdvance) and end time (window) that are moving forward to the next interval whenever the evaluation time reaches the end time, based on the defined durationwindow. For example, with a 1-day window (the default option:window: PT1D), the SLA conditions are always evaluated during 24h starting at midnight (i.e. at time 00: 00: 00) each day. If you setwindowAdvance: PT6H, the window will start at 6 AM each day. If you setwindowAdvance: PT6Hand you also override thewindowproperty toPT6H, the window will start at 6 AM and last for 6 hours — as a result, Kestra will check the SLA conditions during the following time periods: 06: 00 to 12: 00, 12: 00 to 18: 00, 18: 00 to 00: 00, and 00: 00 to 06: 00, and so on.SLIDING_WINDOW: this option also evaluates SLA conditions over a fixed timewindow, but it always goes backward from the current time. For example, a sliding window of 1 hour (window: PT1H) will evaluate executions for the past hour (so between now and one hour before now). It uses a default window of 1 day.DAILY_TIME_DEADLINE: this option declares that some SLA conditions should be met "before a specific time in a day". With the string propertydeadline, you can configure a daily cutoff for checking conditions. For example,deadline: "09: 00: 00"means that the defined SLA conditions should be met from midnight until 9 AM each day; otherwise, the flow will not be triggered.DAILY_TIME_WINDOW: this option declares that some SLA conditions should be met "within a given time range in a day". For example, a window fromstartTime: "06: 00: 00"toendTime: "09: 00: 00"evaluates executions within that interval each day. This option is particularly useful for declarative definition of freshness conditions when building data pipelines. For example, if you only need one successful execution within a given time range to guarantee that some data has been successfully refreshed in order for you to proceed with the next steps of your pipeline, this option can be more useful than a strict DAG-based approach. Usually, each failure in your flow would block the entire pipeline, whereas with this option, you can proceed with the next steps of the pipeline as soon as the data is successfully refreshed at least once within the given time range.
io.kestra.core.models.triggers.TimeWindow
partial-timeSLA daily deadline
Use it only for DAILY_TIME_DEADLINE SLA.
partial-timeSLA daily end time
partial-timeSLA daily start time
Use it only for DAILY_TIME_WINDOW SLA.
DURATION_WINDOWDAILY_TIME_DEADLINEDAILY_TIME_WINDOWDURATION_WINDOWSLIDING_WINDOWThe type of the SLA
The default SLA is a sliding window (DURATION_WINDOW) with a window of 24 hours.
durationUse it only for DURATION_WINDOW or SLIDING_WINDOW SLA.
See ISO_8601 Durations for more information of available duration value.
The start of the window is always based on midnight except if you set windowAdvance parameter. Eg if you have a 10 minutes (PT10M) window,
the first window will be 00: 00 to 00: 10 and a new window will be started each 10 minutes
durationUse it only for DURATION_WINDOW SLA.
Allow to specify the start time of the window
Eg: you want a window of 6 hours (window=PT6H), by default the check will be done between: 00: 00 and 06: 00, 06: 00 and 12: 00, 12: 00 and 18: 00, and 18: 00 and 00: 00.
If you want to check the window between 03: 00 and 09: 00, 09: 00 and 15: 00, 15: 00 and 21: 00, and 21: 00 and 3: 00, you will have to shift the window of 3 hours by settings windowAdvance: PT3H
durationThe duration of the window
Deprecated, use timeWindow.window instead.
durationThe window advance duration
Deprecated, use timeWindow.windowAdvance instead.
Condition to exclude other conditions.
1The list of conditions to exclude.
If any condition is true, it will prevent the event's execution.
Condition to have at least one condition validated.
1The list of conditions to validate.
If any condition is true, it will allow the event's execution.
Condition to allow events on public holidays.
ISO 3166-1 alpha-2 country code. If not set, it uses the country code from the default locale.
{{ trigger.date}}ISO 3166-2 country subdivision (e.g., provinces and states) code.
It uses the Jollyday library for public holiday calendar that supports more than 70 countries.
Condition to allow events between two specific times.
timeThe time to test must be after this one.
timeThe time to test must be before this one.
Must be a valid ISO 8601 time with offset.
{{ trigger.date }}The time to test.
Can be any variable or any valid ISO 8601 time. By default, it will use the trigger date.
Condition to allow events on weekend.
{{ trigger.date }}The date to test.
Can be any variable or any valid ISO 8601 datetime. By default, it will use the trigger date.
delimiterstring
A delimiter is a character you use to group keys.
encodingTypestring
The EncodingType property for this object.
endpointOverridestring
The endpoint with which the SDK should communicate.
This property allows you to use a different S3 compatible storage backend.
expectedBucketOwnerstring
The account ID of the expected bucket owner.
If the bucket is owned by a different account, the request fails with the HTTP status code 403 Forbidden (access denied).
filterstring
BOTHFILESDIRECTORYBOTHThe type of objects to filter: files, directory, or both.
forcePathStylebooleanstring
falseForce path style access.
Must only be used when compatibilityMode is enabled.
intervalNon-dynamicstring
PT1MdurationInterval between polling.
The interval between 2 different polls of schedule, this can avoid to overload the remote system with too many calls. For most of the triggers that depend on external systems, a minimal interval must be at least PT30S. See ISO_8601 Durations for more information of available interval values.
markerstring
Marker is where you want Amazon S3 to start listing from.
Amazon S3 starts listing after this specified key. Marker can be any key in the bucket.
maxKeysintegerstring
1000Sets the maximum number of keys returned in the response.
By default, the action returns up to 1,000 key names. The response might contain fewer keys but will never contain more.
moveTo
The destination bucket and key for MOVE action.
io.kestra.plugin.aws.s3.Copy-CopyObject
The bucket name
The bucket key
KMS Key ARN or Key ID to use when server side encryption is AWS_KMS
NONEAES256AWS_KMSServer side encryption to apply to the target object.
Example: AES256 or AWS_KMS
onstring
CREATE_OR_UPDATECREATEUPDATECREATE_OR_UPDATETrigger event type
Defines when the trigger fires.
CREATE: only for newly discovered entities.UPDATE: only when an already-seen entity changes.CREATE_OR_UPDATE: fires on either event.
prefixstring
Limits the response to keys that begin with the specified prefix.
regexpstring
A regexp to filter on full key.
ex:
regExp: .* to match all files
regExp: .*2020-01-0.\\.csv to match files between 01 and 09 of january ending with .csv
regionstring
AWS region with which the SDK should communicate.
requestPayerstring
Sets the value of the RequestPayer property for this object.
secretKeyIdstring
Secret Key Id in order to connect to AWS.
sessionTokenstring
AWS session token, retrieved from an AWS token service, used for authenticating that this user has received temporary permissions to access a given resource.
If no credentials are defined, we will use the default credentials provider chain to fetch credentials.
stateKeystring
State key
JSON-type KV key for persisted state.
Default: <namespace>__<flowId>__<triggerId>
stateTtlstring
durationState TTL
TTL for persisted state entries (e.g., PT24H, P7D).
stopAfterNon-dynamicarray
CREATEDSUBMITTEDRUNNINGPAUSEDRESTARTEDKILLINGSUCCESSWARNINGFAILEDKILLEDCANCELLEDQUEUEDRETRYINGRETRIEDSKIPPEDBREAKPOINTRESUBMITTEDList of execution states after which a trigger should be stopped (a.k.a. disabled).
stsEndpointOverridestring
The AWS STS endpoint with which the SDKClient should communicate.
stsRoleArnstring
AWS STS Role.
The Amazon Resource Name (ARN) of the role to assume. If set the task will use the StsAssumeRoleCredentialsProvider. If no credentials are defined, we will use the default credentials provider chain to fetch credentials.
stsRoleExternalIdstring
AWS STS External Id.
A unique identifier that might be required when you assume a role in another account. This property is only used when an stsRoleArn is defined.
stsRoleSessionDurationstring
PT15MdurationAWS STS Session duration.
The duration of the role session (default: 15 minutes, i.e., PT15M). This property is only used when an stsRoleArn is defined.
stsRoleSessionNamestring
AWS STS Session name.
This property is only used when an stsRoleArn is defined.
Outputs
objectsarray
List of S3 objects that triggered the flow, each with its change type.
io.kestra.plugin.aws.s3.Trigger-TriggeredObject
CREATEUPDATEdate-timeio.kestra.plugin.aws.s3.models.Owner
uri