PipelinewiseRedshift
PipelinewiseRedshift
type: "io.kestra.plugin.singer.targets.PipelinewiseRedshift"
A Singer target loads data into a Redshift database.
Full documentation can be found here
Properties
defaultTargetSchema
- Type: string
- Dynamic: ✔️
- Required: ✔️
- Min length:
1
Name of the schema where the tables will be created.
If schema_mapping is not defined then every stream sent by the tap is loaded into this schema.
exitOnFailed
- Type: boolean
- Dynamic: ❌
- Required: ✔️
- Default:
true
Exit if any non true return value
This tells bash that it should exit the script if any statement returns a non-true return value. The benefit of using -e is that it prevents errors snowballing into serious issues when they could have been caught earlier.
from
- Type: string
- Dynamic: ❓
- Required: ✔️
The raw data from a tap
host
- Type: string
- Dynamic: ✔️
- Required: ✔️
- Min length:
1
The database hostname
interpreter
- Type: string
- Dynamic: ❌
- Required: ✔️
- Default:
/bin/sh
- Min length:
1
Interpreter to used
port
- Type: integer
- Dynamic: ❌
- Required: ✔️
The database port
pythonPath
- Type: string
- Dynamic: ✔️
- Required: ✔️
- Default:
python
- Min length:
1
The python interpreter to use
Set the python interpreter path to use
runner
- Type: string
- Dynamic: ❌
- Required: ✔️
- Default:
PROCESS
- Possible Values:
PROCESS
DOCKER
Runner to use
s3Bucket
- Type: string
- Dynamic: ✔️
- Required: ✔️
- Min length:
1
The S3 bucket name
stateName
- Type: string
- Dynamic: ❌
- Required: ✔️
- Default:
singer-state
The name of singer state file
username
- Type: string
- Dynamic: ✔️
- Required: ✔️
- Min length:
1
The database user
warningOnStdErr
- Type: boolean
- Dynamic: ❌
- Required: ✔️
- Default:
true
Use
WARNING
state if any stdErr is sent
accessKeyId
- Type: string
- Dynamic: ✔️
- Required: ❌
S3 Access Key Id.
Used for S3 and Redshift copy operations.
addMetadataColumns
- Type: boolean
- Dynamic: ❌
- Required: ❌
- Default:
false
Add metadata columns.
Metadata columns add extra row level information about data ingestions, (i.e. when was the row read in source, when was inserted or deleted in redshift etc.) Metadata columns are creating automatically by adding extra columns to the tables with a column prefix SDC. The metadata columns are documented at here. Enabling metadata columns will flag the deleted rows by setting the _SDC_DELETED_AT metadata column. Without the addMetadataColumns
option the deleted rows from singer taps will not be recongisable in Redshift.
args
- Type: array
- SubType: string
- Dynamic: ✔️
- Required: ❌
Python command args
Arguments list to pass to main python script
batchSizeRows
- Type: integer
- Dynamic: ❌
- Required: ❌
- Default:
100000
Maximum number of rows in each batch.
At the end of each batch, the rows in the batch are loaded into Redshift.
command
- Type: string
- Dynamic: ✔️
- Required: ❌
Override default singer command
compression
- Type: string
- Dynamic: ❌
- Required: ❌
- Default:
bzip2
- Possible Values:
gzip
bzip2
The compression method to use when writing files to S3 and running Redshift COPY.
copyOptions
- Type: string
- Dynamic: ✔️
- Required: ❌
COPY options.
Parameters to use in the COPY command when loading data to Redshift. Some basic file formatting parameters are fixed values and not recommended overriding them by custom values. They are like: CSV GZIP DELIMITER ',' REMOVEQUOTES ESCAPE
.
dataFlatteningMaxLevel
- Type: integer
- Dynamic: ❌
- Required: ❌
- Default:
0
Object type RECORD items from taps can be transformed to flattened columns by creating columns automatically.
When hardDelete
option is true then DELETE SQL commands will be performed in Redshift to delete rows in tables. It's achieved by continuously checking the _SDC_DELETED_AT metadata column sent by the singer tap. Due to deleting rows requires metadata columns, hardDelete
option automatically enables the addMetadataColumns
option as well..
dbName
- Type: string
- Dynamic: ✔️
- Required: ❌
The database name
defaultTargetSchemaSelectPermissions
- Type: string
- Dynamic: ✔️
- Required: ❌
Grant USAGE privilege on newly created schemas and grant SELECT privilege on newly created tables to a specific list of users or groups.
If schemaMapping
is not defined then every stream sent by the tap is granted accordingly.
disableTableCache
- Type: boolean
- Dynamic: ❌
- Required: ❌
- Default:
false
Disable table cache.
By default the connector caches the available table structures in Redshift at startup. In this way it doesn't need to run additional queries when ingesting data to check if altering the target tables is required. With disable_table_cache option you can turn off this caching. You will always see the most recent table structures but will cause an extra query runtime.
dockerOptions
- Type: DockerOptions
- Dynamic: ❌
- Required: ❌
Docker options when using runner
DOCKER
env
- Type: object
- SubType: string
- Dynamic: ✔️
- Required: ❌
Additional environments variable to add for current process.
files
🔒 Deprecated
- Type: array
- SubType: string
- Dynamic: ✔️
- Required: ❌
The list of files that will be uploaded to internal storage,
use outputFiles
property instead
flushAllStreams
- Type: boolean
- Dynamic: ❌
- Required: ❌
- Default:
false
Flush and load every stream into Redshift when one batch is full.
Warning: This may trigger the COPY command to use files with low number of records..
hardDelete
- Type: boolean
- Dynamic: ❌
- Required: ❌
- Default:
false
Delete rows on Redshift.
When hardDelete
option is true then DELETE SQL commands will be performed in Redshift to delete rows in tables. It's achieved by continuously checking the _SDC_DELETED_AT metadata column sent by the singer tap. Due to deleting rows requires metadata columns, hardDelete
option automatically enables the addMetadataColumns
option as well.
inputFiles
- Type: object
- SubType: string
- Dynamic: ✔️
- Required: ❌
Input files are extra files that will be available in the script working directory.
You can define the files as map or a JSON string.Each file can be defined inlined or can reference a file from Kestra's internal storage.
interpreterArgs
- Type: array
- SubType: string
- Dynamic: ❌
- Required: ❌
- Default:
[-c]
Interpreter args used
maxParallelism
- Type: integer
- Dynamic: ❌
- Required: ❌
- Default:
16
Max number of parallel threads to use when flushing tables.
outputDirs
- Type: array
- SubType: string
- Dynamic: ❌
- Required: ❌
Output dirs list that will be uploaded to internal storage
List of key that will generate temporary directories.
On the command, just can use with special variable named outputDirs.key
.
If you add a files with ["myDir"]
, you can use the special vars echo 1 >> {[ outputDirs.myDir }}/file1.txt
and echo 2 >> {[ outputDirs.myDir }}/file2.txt
and both files will be uploaded to internal storage. Then you can used them on others tasks using null
outputFiles
- Type: array
- SubType: string
- Dynamic: ❌
- Required: ❌
Output file list that will be uploaded to internal storage
List of key that will generate temporary files.
On the command, just can use with special variable named outputFiles.key
.
If you add a files with ["first"]
, you can use the special vars echo 1 >> {[ outputFiles.first }}
and you used on others tasks using null
outputsFiles
🔒 Deprecated
- Type: array
- SubType: string
- Dynamic: ❌
- Required: ❌
Deprecated Output file
use outputFiles
parallelism
- Type: integer
- Dynamic: ❌
- Required: ❌
- Default:
0
The number of threads used to flush tables.
0 will create a thread for each stream, up to parallelism_max. -1 will create a thread for each CPU core. Any other positive number will create that number of threads, up to parallelism_max.
password
- Type: string
- Dynamic: ✔️
- Required: ❌
The database user's password
pipPackages
- Type: array
- SubType: string
- Dynamic: ✔️
- Required: ❌
Override default pip packages to use a specific version
primaryKeyRequired
- Type: boolean
- Dynamic: ❌
- Required: ❌
- Default:
true
Log based and Incremental replications on tables with no Primary Key cause duplicates when merging UPDATE events.
When set to true, stop loading data if no Primary Key is defined..
redshiftCopyRoleArn
- Type: string
- Dynamic: ✔️
- Required: ❌
AWS Redshift COPY role ARN.
AWS Role ARN to be used for the Redshift COPY operation. Used instead of the given AWS keys for the COPY operation if provided - the keys are still used for other S3 operations.
requirements
- Type: array
- SubType: string
- Dynamic: ✔️
- Required: ❌
Requirements are python dependencies to add to the python execution process
Python dependencies list to setup in the virtualenv, in the same format than requirements.txt
s3Acl
- Type: string
- Dynamic: ✔️
- Required: ❌
AWS S3 ACL.
S3 Object ACL.
s3KeyPrefix
- Type: string
- Dynamic: ✔️
- Required: ❌
S3 Key Prefix.
A static prefix before the generated S3 key names. Using prefixes you can upload files into specific directories in the S3 bucket. Default(None).
schema_mapping
- Type: string
- Dynamic: ❌
- Required: ❌
Schema mapping.
Useful if you want to load multiple streams from one tap to multiple Redshift schemas. If the tap sends the stream_id in <schema_name>-<table_name> format then this option overwrites the default_target_schema
value. Note, that using schema_mapping you can overwrite the default_target_schema_select_permissions
value to grant SELECT permissions to different groups per schemas or optionally you can create indices automatically for the replicated tables.
secretAccessKey
- Type: string
- Dynamic: ✔️
- Required: ❌
S3 Secret Access Key.
Used for S3 and Redshift copy operations.
sessionToken
- Type: string
- Dynamic: ✔️
- Required: ❌
AWS S3 Session Token.
S3 AWS STS token for temporary credentials.
skipUpdates
- Type: boolean
- Dynamic: ❌
- Required: ❌
- Default:
false
Do not update existing records when Primary Key is defined.
Useful to improve performance when records are immutable, e.g. events.
slices
- Type: integer
- Dynamic: ❌
- Required: ❌
- Default:
1
number of slices to split files into prior to running COPY on Redshift.
This should be set to the number of Redshift slices. The number of slices per node depends on the node size of the cluster - run SELECT COUNT(DISTINCT slice) slices FROM stv_slices to calculate this.
validateRecords
- Type: boolean
- Dynamic: ❌
- Required: ❌
- Default:
false
Validate every single record message to the corresponding JSON schema.
This option is disabled by default and invalid RECORD messages will fail only at load time by Redshift. Enabling this option will detect invalid records earlier but could cause performance degradation..
Outputs
state
- Type: string
Uri of the state file
Definitions
DockerOptions-Memory
kernelMemory
- Type: string
- Dynamic: ✔️
- Required: ❌
The maximum amount of kernel memory the container can use.
The minimum allowed value is 4m. Because kernel memory cannot be swapped out, a container which is starved of kernel memory may block host machine resources, which can have side effects on the host machine and on other containers. See --kernel-memory details.
memory
- Type: string
- Dynamic: ✔️
- Required: ❌
The maximum amount of memory the container can use.
That is, you must set the value to at least 6 megabytes.
memoryReservation
- Type: string
- Dynamic: ✔️
- Required: ❌
Allows you to specify a soft limit smaller than --memory which is activated when Docker detects contention or low memory on the host machine.
If you use memoryReservation
, it must be set lower than memory
for it to take precedence. Because it is a soft limit, it does not guarantee that the container doesn’t exceed the limit.
memorySwap
- Type: string
- Dynamic: ✔️
- Required: ❌
The amount of memory this container is allowed to swap to disk
If memory
and memorySwap
are set to the same value, this prevents containers from using any swap. This is because memorySwap
is the amount of combined memory and swap that can be used, while memory
is only the amount of physical memory that can be used.
memorySwappiness
- Type: string
- Dynamic: ✔️
- Required: ❌
The amount of memory this container is allowed to swap to disk
By default, the host kernel can swap out a percentage of anonymous pages used by a container. You can set memorySwappiness
to a value between 0 and 100, to tune this percentage.
oomKillDisable
- Type: boolean
- Dynamic: ❌
- Required: ❌
By default, if an out-of-memory (OOM) error occurs, the kernel kills processes in a container.
To change this behavior, use the oomKillDisable
option. Only disable the OOM killer on containers where you have also set the memory
option. If the memory
flag is not set, the host can run out of memory and the kernel may need to kill the host system’s processes to free memory.
DockerOptions
image
- Type: string
- Dynamic: ✔️
- Required: ✔️
- Min length:
1
Docker image to use
cpu
- Type: DockerOptions-Cpu
- Dynamic: ❌
- Required: ❌
Limits cpu usage.
By default, each container’s access to the host machine’s CPU cycles is unlimited. You can set various constraints to limit a given container’s access to the host machine’s CPU cycles.
deviceRequests
- Type: array
- SubType: DockerOptions-DeviceRequest
- Dynamic: ❌
- Required: ❌
A list of request for devices to be sent to device drivers
dockerConfig
- Type: string
- Dynamic: ✔️
- Required: ❌
Docker config file
Full file that can be used to configure private registries, ...
dockerHost
- Type: string
- Dynamic: ✔️
- Required: ❌
Docker api uri
entryPoint
- Type: array
- SubType: string
- Dynamic: ✔️
- Required: ❌
Docker entrypoint to use
extraHosts
- Type: array
- SubType: string
- Dynamic: ✔️
- Required: ❌
Docker extra host to use
memory
- Type: DockerOptions-Memory
- Dynamic: ❌
- Required: ❌
Limits memory usage.
Docker can enforce hard memory limits, which allow the container to use no more than a given amount of user or system memory, or soft limits, which allow the container to use as much memory as it needs unless certain conditions are met, such as when the kernel detects low memory or contention on the host machine. Some of these options have different effects when used alone or when more than one option is set.
networkMode
- Type: string
- Dynamic: ✔️
- Required: ❌
Docker network mode to use
pullImage
- Type: boolean
- Dynamic: ❌
- Required: ❌
- Default:
true
Is a pull of image must be done before starting the container
Mostly used for local image with registry
user
- Type: string
- Dynamic: ✔️
- Required: ❌
Docker user to use
volumes
- Type: array
- SubType: string
- Dynamic: ✔️
- Required: ❌
List of volumes to mount
Must be a valid mount expression as string, example : /home/user:/app
Volumes mount are disabled by default for security reasons, you must enabled on server configuration with kestra.tasks.scripts.docker.volume-enabled
to true
DockerOptions-Cpu
cpus
- Type: integer
- Dynamic: ❓
- Required: ❌
Specify how much of the available CPU resources a container can use.
For instance, if the host machine has two CPUs and you set cpus:"1.5"
, the container is guaranteed at most one and a half of the CPUs
DockerOptions-DeviceRequest
capabilities
- Type: array
- SubType: array
- Dynamic: ❓
- Required: ❌
A list of capabilities; an OR list of AND lists of capabilities.
count
- Type: integer
- Dynamic: ❓
- Required: ❌
A request for devices to be sent to device drivers
deviceIds
- Type: array
- SubType: string
- Dynamic: ❓
- Required: ❌
A request for devices to be sent to device drivers
driver
- Type: string
- Dynamic: ❓
- Required: ❌
A request for devices to be sent to device drivers
options
- Type: object
- SubType: string
- Dynamic: ❓
- Required: ❌
Driver-specific options, specified as a key/value pairs.
These options are passed directly to the driver.