PythonSubmit
PythonSubmit
type: "io.kestra.plugin.spark.PythonSubmit"
Submit a PySpark job to a remote cluster.
Examples
id: "python_submit"
type: "io.kestra.plugin.spark.PythonSubmit"
runner: DOCKER
docker:
networkMode: host
user: root
master: spark://localhost:7077
args:
- "10"
mainScript: |
import sys
from random import random
from operator import add
from pyspark.sql import SparkSession
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("PythonPi") \
.getOrCreate()
partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
n = 100000 * partitions
def f(_: int) -> float:
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 <= 1 else 0
count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n))
spark.stop()
Properties
mainScript
- Type: string
- Dynamic: ✔️
- Required: ✔️
The main Python script.
master
- Type: string
- Dynamic: ✔️
- Required: ✔️
Spark master hostname for the application.
Spark master URL formats.
appFiles
- Type: object
- SubType: string
- Dynamic: ✔️
- Required: ❌
Adds a file to be submitted with the application.
Must be an internal storage URI.
args
- Type: array
- SubType: string
- Dynamic: ✔️
- Required: ❌
Command line arguments for the application.
configurations
- Type: object
- SubType: string
- Dynamic: ✔️
- Required: ❌
Configuration properties for the application.
containerImage
- Type: string
- Dynamic: ✔️
- Required: ❌
- Default:
bitnami/spark
The task runner container image, only used if the task runner is container-based.
deployMode
- Type: string
- Dynamic: ✔️
- Required: ❌
- Possible Values:
CLIENT
CLUSTER
Deploy mode for the application.
docker
⚠ Deprecated
- Type: DockerOptions
- Dynamic: ❌
- Required: ❌
Deprecated, use 'taskRunner' instead
env
- Type: object
- SubType: string
- Dynamic: ✔️
- Required: ❌
Additional environment variables for the current process.
name
- Type: string
- Dynamic: ✔️
- Required: ❌
Spark application name.
pythonFiles
- Type: object
- SubType: string
- Dynamic: ✔️
- Required: ❌
Adds a Python file/zip/egg package to be submitted with the application.
Must be an internal storage URI.
runner
- Type: string
- Dynamic: ❌
- Required: ❌
- Possible Values:
PROCESS
DOCKER
Script runner to use.
Deprecated - use 'taskRunner' instead.
sparkSubmitPath
- Type: string
- Dynamic: ✔️
- Required: ❌
- Default:
spark-submit
The spark-submit
binary path.
taskRunner
- Type: TaskRunner
- Dynamic: ❌
- Required: ❌
- Default:
{type=io.kestra.plugin.scripts.runner.docker.Docker}
The task runner to use.
Task runners are provided by plugins, each have their own properties.
verbose
- Type: boolean
- Dynamic: ❌
- Required: ❌
- Default:
false
Enables verbose reporting.
Outputs
exitCode
- Type: integer
- Dynamic: ❓
- Required: ✔️
- Default:
0
The exit code of the entire flow execution.
outputFiles
- Type: object
- SubType: string
- Dynamic: ❌
- Required: ❌
The output files' URIs in Kestra's internal storage.
vars
- Type: object
- Dynamic: ❓
- Required: ❌
The value extracted from the output of the executed commands
.
Definitions
io.kestra.plugin.scripts.runner.docker.Cpu
Properties
cpus
- Type: integer
- Dynamic: ❌
- Required: ❌
The maximum amount of CPU resources a container can use.
Make sure to set that to a numeric value e.g.
cpus: "1.5"
orcpus: "4"
or For instance, if the host machine has two CPUs and you setcpus: "1.5"
, the container is guaranteed at most one and a half of the CPUs.
io.kestra.core.models.tasks.runners.TaskRunner
Properties
io.kestra.plugin.scripts.runner.docker.Memory
Properties
kernelMemory
- Type: string
- Dynamic: ✔️
- Required: ❌
The maximum amount of kernel memory the container can use.
The minimum allowed value is
4MB
. Because kernel memory cannot be swapped out, a container which is starved of kernel memory may block host machine resources, which can have side effects on the host machine and on other containers. See the kernel-memory docs for more details.
memory
- Type: string
- Dynamic: ✔️
- Required: ❌
The maximum amount of memory resources the container can use.
Make sure to use the format
number
+unit
(regardless of the case) without any spaces. The unit can be KB (kilobytes), MB (megabytes), GB (gigabytes), etc.
Given that it's case-insensitive, the following values are equivalent:
"512MB"
"512Mb"
"512mb"
"512000KB"
"0.5GB"
It is recommended that you allocate at least 6MB
.
memoryReservation
- Type: string
- Dynamic: ✔️
- Required: ❌
Allows you to specify a soft limit smaller than memory
which is activated when Docker detects contention or low memory on the host machine.
If you use
memoryReservation
, it must be set lower thanmemory
for it to take precedence. Because it is a soft limit, it does not guarantee that the container doesn’t exceed the limit.
memorySwap
- Type: string
- Dynamic: ✔️
- Required: ❌
The total amount of memory
and swap
that can be used by a container.
If
memory
andmemorySwap
are set to the same value, this prevents containers from using any swap. This is becausememorySwap
includes both the physical memory and swap space, whilememory
is only the amount of physical memory that can be used.
memorySwappiness
- Type: string
- Dynamic: ✔️
- Required: ❌
A setting which controls the likelihood of the kernel to swap memory pages.
By default, the host kernel can swap out a percentage of anonymous pages used by a container. You can set
memorySwappiness
to a value between 0 and 100 to tune this percentage.
oomKillDisable
- Type: boolean
- Dynamic: ❌
- Required: ❌
By default, if an out-of-memory (OOM) error occurs, the kernel kills processes in a container.
To change this behavior, use the
oomKillDisable
option. Only disable the OOM killer on containers where you have also set thememory
option. If thememory
flag is not set, the host can run out of memory, and the kernel may need to kill the host system’s processes to free the memory.
io.kestra.plugin.scripts.exec.scripts.models.DockerOptions
Properties
image
- Type: string
- Dynamic: ✔️
- Required: ✔️
- Min length:
1
Docker image to use.
config
- Type:
- string
- object
- Dynamic: ✔️
- Required: ❌
Docker configuration file.
Docker configuration file that can set access credentials to private container registries. Usually located in
~/.docker/config.json
.
cpu
- Type: Cpu
- Dynamic: ❌
- Required: ❌
Limits the CPU usage to a given maximum threshold value.
By default, each container’s access to the host machine’s CPU cycles is unlimited. You can set various constraints to limit a given container’s access to the host machine’s CPU cycles.
credentials
- Type: Credentials
- Dynamic: ✔️
- Required: ❌
deviceRequests
- Type: array
- SubType: DeviceRequest
- Dynamic: ❌
- Required: ❌
A list of device requests to be sent to device drivers.
entryPoint
- Type: array
- SubType: string
- Dynamic: ✔️
- Required: ❌
Docker entrypoint to use.
extraHosts
- Type: array
- SubType: string
- Dynamic: ✔️
- Required: ❌
Extra hostname mappings to the container network interface configuration.
host
- Type: string
- Dynamic: ✔️
- Required: ❌
Docker API URI.
memory
- Type: Memory
- Dynamic: ❌
- Required: ❌
Limits memory usage to a given maximum threshold value.
Docker can enforce hard memory limits, which allow the container to use no more than a given amount of user or system memory, or soft limits, which allow the container to use as much memory as it needs unless certain conditions are met, such as when the kernel detects low memory or contention on the host machine. Some of these options have different effects when used alone or when more than one option is set.
networkMode
- Type: string
- Dynamic: ✔️
- Required: ❌
Docker network mode to use e.g. host
, none
, etc.
pullPolicy
- Type: string
- Dynamic: ❌
- Required: ❌
- Default:
ALWAYS
- Possible Values:
IF_NOT_PRESENT
ALWAYS
NEVER
The image pull policy for a container image and the tag of the image, which affect when Docker attempts to pull (download) the specified image.
shmSize
- Type: string
- Dynamic: ✔️
- Required: ❌
Size of /dev/shm
in bytes.
The size must be greater than 0. If omitted, the system uses 64MB.
user
- Type: string
- Dynamic: ✔️
- Required: ❌
User in the Docker container.
volumes
- Type: array
- SubType: string
- Dynamic: ✔️
- Required: ❌
List of volumes to mount.
Must be a valid mount expression as string, example :
/home/user:/app
.
Volumes mount are disabled by default for security reasons; you must enable them on server configuration by setting kestra.tasks.scripts.docker.volume-enabled
to true
.
io.kestra.plugin.scripts.runner.docker.Credentials
Properties
auth
- Type: string
- Dynamic: ✔️
- Required: ❌
The registry authentication.
The
auth
field is a base64-encoded authentication string ofusername:password
or a token.
identityToken
- Type: string
- Dynamic: ✔️
- Required: ❌
The identity token.
password
- Type: string
- Dynamic: ✔️
- Required: ❌
The registry password.
registry
- Type: string
- Dynamic: ✔️
- Required: ❌
The registry URL.
If not defined, the registry will be extracted from the image name.
registryToken
- Type: string
- Dynamic: ✔️
- Required: ❌
The registry token.
username
- Type: string
- Dynamic: ✔️
- Required: ❌
The registry username.
io.kestra.plugin.scripts.runner.docker.DeviceRequest
Properties
capabilities
- Type: array
- SubType: array
- Dynamic: ❌
- Required: ❌
A list of capabilities; an OR list of AND lists of capabilities.
count
- Type: integer
- Dynamic: ❌
- Required: ❌
deviceIds
- Type: array
- SubType: string
- Dynamic: ✔️
- Required: ❌
driver
- Type: string
- Dynamic: ✔️
- Required: ❌
options
- Type: object
- SubType: string
- Dynamic: ❌
- Required: ❌
Driver-specific options, specified as key/value pairs.
These options are passed directly to the driver.
Was this page helpful?