PythonSubmit PythonSubmit

yaml
type: "io.kestra.plugin.spark.PythonSubmit"

Submit a PySpark job to a remote cluster.

Examples

yaml
id: "python_submit"
type: "io.kestra.plugin.spark.PythonSubmit"
runner: DOCKER
docker:
  networkMode: host
  user: root
master: spark://localhost:7077
args:
- "10"
mainScript: |
  import sys
  from random import random
  from operator import add
  from pyspark.sql import SparkSession


  if __name__ == "__main__":
      spark = SparkSession \
          .builder \
          .appName("PythonPi") \
          .getOrCreate()

      partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
      n = 100000 * partitions

      def f(_: int) -> float:
          x = random() * 2 - 1
          y = random() * 2 - 1
          return 1 if x ** 2 + y ** 2 <= 1 else 0

      count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
      print("Pi is roughly %f" % (4.0 * count / n))

      spark.stop()

Properties

mainScript

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

The main Python script.

master

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

Spark master hostname for the application.

Spark master URL formats.

appFiles

  • Type: object
  • SubType: string
  • Dynamic: ✔️
  • Required:

Adds a file to be submitted with the application.

Must be an internal storage URI.

args

  • Type: array
  • SubType: string
  • Dynamic: ✔️
  • Required:

Command line arguments for the application.

configurations

  • Type: object
  • SubType: string
  • Dynamic: ✔️
  • Required:

Configuration properties for the application.

containerImage

  • Type: string
  • Dynamic: ✔️
  • Required:
  • Default: bitnami/spark

The task runner container image, only used if the task runner is container-based.

deployMode

  • Type: string
  • Dynamic: ✔️
  • Required:
  • Possible Values:
    • CLIENT
    • CLUSTER

Deploy mode for the application.

docker

Deprecated, use 'taskRunner' instead

env

  • Type: object
  • SubType: string
  • Dynamic: ✔️
  • Required:

Additional environment variables for the current process.

name

  • Type: string
  • Dynamic: ✔️
  • Required:

Spark application name.

pythonFiles

  • Type: object
  • SubType: string
  • Dynamic: ✔️
  • Required:

Adds a Python file/zip/egg package to be submitted with the application.

Must be an internal storage URI.

runner

  • Type: string
  • Dynamic:
  • Required:
  • Possible Values:
    • PROCESS
    • DOCKER

Script runner to use.

Deprecated - use 'taskRunner' instead.

sparkSubmitPath

  • Type: string
  • Dynamic: ✔️
  • Required:
  • Default: spark-submit

The spark-submit binary path.

taskRunner

  • Type: TaskRunner
  • Dynamic:
  • Required:
  • Default: {type=io.kestra.plugin.scripts.runner.docker.Docker}

The task runner to use.

Task runners are provided by plugins, each have their own properties.

verbose

  • Type: boolean
  • Dynamic:
  • Required:
  • Default: false

Enables verbose reporting.

Outputs

exitCode

  • Type: integer
  • Dynamic:
  • Required: ✔️
  • Default: 0

The exit code of the entire flow execution.

outputFiles

  • Type: object
  • SubType: string
  • Dynamic:
  • Required:

The output files' URIs in Kestra's internal storage.

vars

  • Type: object
  • Dynamic:
  • Required:

The value extracted from the output of the executed commands.

Definitions

io.kestra.plugin.scripts.runner.docker.Cpu

Properties

cpus
  • Type: integer
  • Dynamic:
  • Required:

The maximum amount of CPU resources a container can use.

Make sure to set that to a numeric value e.g. cpus: "1.5" or cpus: "4" or For instance, if the host machine has two CPUs and you set cpus: "1.5", the container is guaranteed at most one and a half of the CPUs.

io.kestra.core.models.tasks.runners.TaskRunner

Properties

io.kestra.plugin.scripts.runner.docker.Memory

Properties

kernelMemory
  • Type: string
  • Dynamic: ✔️
  • Required:

The maximum amount of kernel memory the container can use.

The minimum allowed value is 4MB. Because kernel memory cannot be swapped out, a container which is starved of kernel memory may block host machine resources, which can have side effects on the host machine and on other containers. See the kernel-memory docs for more details.

memory
  • Type: string
  • Dynamic: ✔️
  • Required:

The maximum amount of memory resources the container can use.

Make sure to use the format number + unit (regardless of the case) without any spaces. The unit can be KB (kilobytes), MB (megabytes), GB (gigabytes), etc.

Given that it's case-insensitive, the following values are equivalent:

  • "512MB"
  • "512Mb"
  • "512mb"
  • "512000KB"
  • "0.5GB"

It is recommended that you allocate at least 6MB.

memoryReservation
  • Type: string
  • Dynamic: ✔️
  • Required:

Allows you to specify a soft limit smaller than memory which is activated when Docker detects contention or low memory on the host machine.

If you use memoryReservation, it must be set lower than memory for it to take precedence. Because it is a soft limit, it does not guarantee that the container doesn’t exceed the limit.

memorySwap
  • Type: string
  • Dynamic: ✔️
  • Required:

The total amount of memory and swap that can be used by a container.

If memory and memorySwap are set to the same value, this prevents containers from using any swap. This is because memorySwap includes both the physical memory and swap space, while memory is only the amount of physical memory that can be used.

memorySwappiness
  • Type: string
  • Dynamic: ✔️
  • Required:

A setting which controls the likelihood of the kernel to swap memory pages.

By default, the host kernel can swap out a percentage of anonymous pages used by a container. You can set memorySwappiness to a value between 0 and 100 to tune this percentage.

oomKillDisable
  • Type: boolean
  • Dynamic:
  • Required:

By default, if an out-of-memory (OOM) error occurs, the kernel kills processes in a container.

To change this behavior, use the oomKillDisable option. Only disable the OOM killer on containers where you have also set the memory option. If the memory flag is not set, the host can run out of memory, and the kernel may need to kill the host system’s processes to free the memory.

io.kestra.plugin.scripts.exec.scripts.models.DockerOptions

Properties

image
  • Type: string
  • Dynamic: ✔️
  • Required: ✔️
  • Min length: 1

Docker image to use.

config
  • Type:
    • string
    • object
  • Dynamic: ✔️
  • Required:

Docker configuration file.

Docker configuration file that can set access credentials to private container registries. Usually located in ~/.docker/config.json.

cpu
  • Type: Cpu
  • Dynamic:
  • Required:

Limits the CPU usage to a given maximum threshold value.

By default, each container’s access to the host machine’s CPU cycles is unlimited. You can set various constraints to limit a given container’s access to the host machine’s CPU cycles.

credentials
deviceRequests

A list of device requests to be sent to device drivers.

entryPoint
  • Type: array
  • SubType: string
  • Dynamic: ✔️
  • Required:

Docker entrypoint to use.

extraHosts
  • Type: array
  • SubType: string
  • Dynamic: ✔️
  • Required:

Extra hostname mappings to the container network interface configuration.

host
  • Type: string
  • Dynamic: ✔️
  • Required:

Docker API URI.

memory
  • Type: Memory
  • Dynamic:
  • Required:

Limits memory usage to a given maximum threshold value.

Docker can enforce hard memory limits, which allow the container to use no more than a given amount of user or system memory, or soft limits, which allow the container to use as much memory as it needs unless certain conditions are met, such as when the kernel detects low memory or contention on the host machine. Some of these options have different effects when used alone or when more than one option is set.

networkMode
  • Type: string
  • Dynamic: ✔️
  • Required:

Docker network mode to use e.g. host, none, etc.

pullPolicy
  • Type: string
  • Dynamic:
  • Required:
  • Default: ALWAYS
  • Possible Values:
    • IF_NOT_PRESENT
    • ALWAYS
    • NEVER

The image pull policy for a container image and the tag of the image, which affect when Docker attempts to pull (download) the specified image.

shmSize
  • Type: string
  • Dynamic: ✔️
  • Required:

Size of /dev/shm in bytes.

The size must be greater than 0. If omitted, the system uses 64MB.

user
  • Type: string
  • Dynamic: ✔️
  • Required:

User in the Docker container.

volumes
  • Type: array
  • SubType: string
  • Dynamic: ✔️
  • Required:

List of volumes to mount.

Must be a valid mount expression as string, example : /home/user:/app.

Volumes mount are disabled by default for security reasons; you must enable them on server configuration by setting kestra.tasks.scripts.docker.volume-enabled to true.

io.kestra.plugin.scripts.runner.docker.Credentials

Properties

auth
  • Type: string
  • Dynamic: ✔️
  • Required:

The registry authentication.

The auth field is a base64-encoded authentication string of username:password or a token.

identityToken
  • Type: string
  • Dynamic: ✔️
  • Required:

The identity token.

password
  • Type: string
  • Dynamic: ✔️
  • Required:

The registry password.

registry
  • Type: string
  • Dynamic: ✔️
  • Required:

The registry URL.

If not defined, the registry will be extracted from the image name.

registryToken
  • Type: string
  • Dynamic: ✔️
  • Required:

The registry token.

username
  • Type: string
  • Dynamic: ✔️
  • Required:

The registry username.

io.kestra.plugin.scripts.runner.docker.DeviceRequest

Properties

capabilities
  • Type: array
  • SubType: array
  • Dynamic:
  • Required:

A list of capabilities; an OR list of AND lists of capabilities.

count
  • Type: integer
  • Dynamic:
  • Required:
deviceIds
  • Type: array
  • SubType: string
  • Dynamic: ✔️
  • Required:
driver
  • Type: string
  • Dynamic: ✔️
  • Required:
options
  • Type: object
  • SubType: string
  • Dynamic:
  • Required:

Driver-specific options, specified as key/value pairs.

These options are passed directly to the driver.

Was this page helpful?