scriptscript
scriptCertified

yaml
type: "io.kestra.plugin.scripts.python.script"
yaml
id: python_demo
namespace: company.team

tasks:
  - id: python
    type: io.kestra.plugin.scripts.python.Script
    dependencies:
      - requests
      - kestra
    script: |
      from kestra import Kestra
      import requests

      response = requests.get('https://kestra.io')
      print(response.status_code)

      Kestra.outputs({'status': response.status_code, 'text': response.text})

yaml
id: pip_packages_docker
namespace: company.team

tasks:
  - id: run_python
    type: io.kestra.plugin.scripts.python.Script
    dependencies:
      - requests
    script: |
      import requests
      import json

      response = requests.get("https://api.github.com")
      data = response.json()
      print(data)

yaml
id: python_logs
namespace: company.team

tasks:
  - id: python_logger
    type: io.kestra.plugin.scripts.python.Script
    allowFailure: true
    dependencies
      - kestra
    script: |
      import time
      from kestra import Kestra

      logger = Kestra.logger()

      logger.debug("DEBUG is used for diagnostic info.")
      time.sleep(0.5)

      logger.info("INFO confirms normal operation.")
      time.sleep(0.5)

      logger.warning("WARNING signals something unexpected.")
      time.sleep(0.5)

      logger.error("ERROR indicates a serious issue.")
      time.sleep(0.5)

      logger.critical("CRITICAL means a severe failure.")

yaml
id: pass_data_between_tasks
namespace: company.team

tasks:
  - id: download
    type: io.kestra.plugin.core.http.Download
    uri: https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv

  - id: python
    type: io.kestra.plugin.scripts.python.Script
    script: |
      with open('{{ outputs.download.uri }}', 'r') as f:
        print(f.read())

yaml
id: python_output_file
namespace: company.team

tasks:
  - id: python
    type: io.kestra.plugin.scripts.python.Script
    outputFiles:
      - "myfile.txt"
    script: |
      f = open("myfile.txt", "a")
      f.write("Hello from a Kestra task!")
      f.close()

yaml
id: python_outputs
namespace: company.team

tasks:
  - id: clean_dataset
    type: io.kestra.plugin.scripts.python.Script
    containerImage: ghcr.io/kestra-io/pydata:latest
    outputFiles:
      - "clean_dataset.csv"
    dependencies:
      - pandas
    script: |
      import pandas as pd
      df = pd.read_csv("https://huggingface.co/datasets/kestra/datasets/raw/main/csv/messy_dataset.csv")

      # Replace non-numeric age values with NaN
      df["Age"] = pd.to_numeric(df["Age"], errors="coerce")

      # mean imputation: fill NaN values with the mean age
      mean_age = int(df["Age"].mean())
      print(f"Filling NULL values with mean: {mean_age}")
      df["Age"] = df["Age"].fillna(mean_age)
      df.to_csv("clean_dataset.csv", index=False)

  - id: read_file_from_python
    type: io.kestra.plugin.scripts.shell.Commands
    taskRunner:
      type: io.kestra.plugin.core.runner.Process
    commands:
      - head -n 10 {{ outputs.clean_dataset.outputFiles['clean_dataset.csv'] }}

yaml
id: python_use_input_in_inline
namespace: company.team

inputs:
  - id: pokemon
    type: STRING
    defaults: pikachu

  - id: your_age
    type: INT
    defaults: 25

tasks:
  - id: inline_script
    type: io.kestra.plugin.scripts.python.Script
    description: Fetch the pokemon detail and compare its experience
    containerImage: ghcr.io/kestra-io/pydata:latest
    dependencies:
      - requests
    script: |
      import requests
      import json

      url = "https://pokeapi.co/api/v2/pokemon/{{ inputs.pokemon }}"
      response = requests.get(url)

      if response.status_code == 200:
          pokemon = json.loads(response.text)
          print(f"Base experience of {{ inputs.pokemon }} is { pokemon.get('base_experience') }")
          if pokemon.get('base_experience') > int("{{ inputs.your_age }}"):
              print("{{ inputs.pokemon }} has more base experience than your age")
          else:
              print("{{ inputs.pokemon}} is too young!")
      else:
          print(f"Failed to retrieve the webpage. Status code: {response.status_code}")

yaml
id: python_input_file
namespace: company.team

tasks:
  - id: download_file
    type: io.kestra.plugin.core.http.Download
    uri: https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv

  - id: get_total_rows
    type: io.kestra.plugin.scripts.python.Script
    dependencies:
      - pandas
    inputFiles:
      input.csv: "{{ outputs.download_file.uri }}"
    script: |
      import pandas as pd

      # Path to your CSV file
      csv_file_path = "input.csv"

      # Read the CSV file using pandas
      df = pd.read_csv(csv_file_path)

      # Get the number of rows
      num_rows = len(df)

      print(f"Number of rows: {num_rows}")

yaml
id: python_generate_outputs
namespace: company.team

tasks:
  - id: generate_output
    type: io.kestra.plugin.scripts.python.Script
    packageManager: PIP
    dependencies:
      - kestra
    script: |
      from kestra import Kestra

      marks = [79, 91, 85, 64, 82]
      Kestra.outputs({"total_marks": sum(marks),"average_marks": sum(marks)/len(marks)})

  - id: log_result
    type: io.kestra.plugin.core.log.Log
    message:
      - "Total Marks: {{ outputs.generate_output.vars.total_marks }}"
      - "Average Marks: {{ outputs.generate_output.vars.average_marks }}"
Properties
SubTypestring
Defaultpython:3.13-slim
SubTypestring
Defaulttrue
SubTypestring
Defaulttrue
SubTypestring
Default["/bin/sh","-c"]
Definitions
enabledbooleanstring
Defaulttrue
excludearray
SubTypestring
folderPerNamespacebooleanstring
Defaultfalse
ifExistsstring
DefaultOVERWRITE
Possible Values
OVERWRITEFAILWARNIGNORE
includearray
SubTypestring
namespacesarray
SubTypestring
Default["{{flow.namespace}}"]
SubTypestring
DefaultUV
Possible Values
PIPUV
DefaultAUTO
Possible Values
LINUXWINDOWSAUTO
Definitions
type*Requiredobject
configstringobject
cpu
cpusnumberstring
credentials
authstring
identityTokenstring
passwordstring
registrystring
registryTokenstring
usernamestring
deletebooleanstring
Defaulttrue
deviceRequestsarray
capabilitiesarray
SubTypearray
countintegerstring
deviceIdsarray
SubTypestring
driverstring
optionsobject
SubTypestring
entryPointarray
SubTypestring
Default[ "" ]
extraHostsarray
SubTypestring
fileHandlingStrategystring
DefaultVOLUME
Possible Values
MOUNTVOLUME
hoststring
killGracePeriodstring
DefaultPT0S
Formatduration
memory
kernelMemorystring
memorystring
memoryReservationstring
memorySwapstring
memorySwappinessstring
oomKillDisablebooleanstring
networkModestring
portBindingsarray
SubTypestring
privilegedbooleanstring
pullPolicyobject
resumebooleanstring
Defaulttrue
shmSizestring
userstring
versionstring
volumesarray
SubTypestring
waitbooleanstring
Defaulttrue
type*Requiredobject
versionstring
region*Requiredstring
type*Requiredobject
bucketstring
completionCheckIntervalstring
DefaultPT5S
Formatduration
computeResource
bootDiskstring
cpustring
memorystring
deletebooleanstring
Defaulttrue
entryPointarray
SubTypestring
impersonatedServiceAccountstring
lifecyclePoliciesarray
actionstring
Possible Values
ACTION_UNSPECIFIEDRETRY_TASKFAIL_TASKUNRECOGNIZED
actionCondition
exitCodesarray
SubTypeinteger
machineTypestring
Defaulte2-medium
maxCreateJobRetryCountintegerstring
Default2
maxRetryCountinteger
Minimum>= 0
Maximum<= 10
networkInterfacesarray
network*Requiredstring
subnetworkstring
projectIdstring
reservationstring
resumebooleanstring
Defaulttrue
scopesarray
SubTypestring
Default["https://www.googleapis.com/auth/cloud-platform"]
serviceAccountstring
syncWorkingDirectorybooleanstring
Defaultfalse
versionstring
waitForLogIntervalstring
DefaultPT5S
Formatduration
waitUntilCompletionstring
DefaultPT1H
Formatduration
region*Requiredstring
type*Requiredobject
bucketstring
completionCheckIntervalstring
DefaultPT5S
Formatduration
deletebooleanstring
Defaulttrue
impersonatedServiceAccountstring
maxRetriesintegerstring
Default3
projectIdstring
resumebooleanstring
Defaulttrue
scopesarray
SubTypestring
Default["https://www.googleapis.com/auth/cloud-platform"]
serviceAccountstring
syncWorkingDirectorybooleanstring
versionstring
vpcAccessConnectorstring
vpcEgressstring
Possible Values
VPC_EGRESS_UNSPECIFIEDALL_TRAFFICPRIVATE_RANGES_ONLYUNRECOGNIZED
waitForLogIntervalstring
DefaultPT5S
Formatduration
waitUntilCompletionstring
DefaultPT1H
Formatduration
account*Requiredstring
endpoint*Requiredstring
poolId*Requiredstring
type*Requiredobject
accessKeystring
blobStorage
containerName*Requiredstring
connectionStringstring
endpointstring
sharedKeyAccountAccessKeystring
sharedKeyAccountNamestring
completionCheckIntervalstring
DefaultPT5S
Formatduration
deletebooleanstring
Defaulttrue
registry
identityReference
resourceIdstring
passwordstring
registryServerstring
userNamestring
resumebooleanstring
Defaulttrue
streamLogsbooleanstring
Defaultfalse
syncWorkingDirectorybooleanstring
Defaultfalse
versionstring
waitUntilCompletionstring
DefaultPT1H
Formatduration
type*Requiredobject
config
apiVersionstring
Defaultv1
caCertDatastring
caCertFilestring
clientCertDatastring
clientCertFilestring
clientKeyAlgostring
DefaultRSA
clientKeyDatastring
clientKeyFilestring
clientKeyPassphrasestring
disableHostnameVerificationbooleanstring
keyStoreFilestring
keyStorePassphrasestring
masterUrlstring
Defaulthttps://kubernetes.default.svc
namespacestring
oauthTokenstring
oauthTokenProvider
outputstring
task
passwordstring
trustCertsbooleanstring
trustStoreFilestring
trustStorePassphrasestring
usernamestring
containerDefaultSpecobject
containerSpecobject
deletebooleanstring
Defaulttrue
fileSideCarSpecobject
fileSidecar
Default{ "image": "busybox" }
defaultSpecobject
imagestring
Defaultbusybox
resourcesobject
killed
Defaultfalse
labelsobject
namespacestring
Defaultdefault
nodeSelectorobject
podSpecobject
pullPolicystring
DefaultALWAYS
Possible Values
IF_NOT_PRESENTALWAYSNEVER
resources
limit
cpustring
memorystring
request
cpustring
memorystring
resumebooleanstring
Defaulttrue
serviceAccountNamestring
syncWorkingDirectorybooleanstring
Defaultfalse
versionstring
waitForLogsstring
DefaultPT30S
Formatduration
waitUntilCompletionstring
DefaultPT1H
Formatduration
waitUntilRunningstring
DefaultPT10M
Formatduration
computeEnvironmentArn*Requiredstring
region*Requiredstring
type*Requiredobject
accessKeyIdstring
bucketstring
completionCheckIntervalstring
DefaultPT5S
Formatduration
deletebooleanstring
Defaulttrue
endpointOverridestring
executionRoleArnstring
jobQueueArnstring
resources
Default{ "request": { "memory": "2048", "cpu": "1" } }
request*Required
cpu*Requiredstring
memory*Requiredstring
resumebooleanstring
Defaulttrue
secretKeyIdstring
sessionTokenstring
sidecarResources
request*Required
cpu*Requiredstring
memory*Requiredstring
stsEndpointOverridestring
stsRoleArnstring
stsRoleExternalIdstring
stsRoleSessionDurationstring
DefaultPT15M
Formatduration
stsRoleSessionNamestring
syncWorkingDirectorybooleanstring
Defaultfalse
taskRoleArnstring
versionstring
waitUntilCompletionstring
DefaultPT1H
Formatduration
Default0
SubTypestring
Definitions