SubmitSql SubmitSql
SubmitSql Certified

yaml
type: "io.kestra.plugin.flink.SubmitSql"
yaml
id: flink-sql-streaming
namespace: company.team

tasks:
  - id: run-sql
    type: io.kestra.plugin.flink.SubmitSql
    gatewayUrl: "http://flink-sql-gateway:8083"
    statement: |
      INSERT INTO enriched_orders
      SELECT o.order_id, o.customer_id, c.name, o.amount, o.order_time
      FROM orders o
      JOIN customers c ON o.customer_id = c.id
    sessionConfig:
      catalog: "default_catalog"
      database: "default_database"
      configuration:
        execution.runtime-mode: "streaming"
        execution.checkpointing.interval: "30s"

yaml
id: flink-sql-batch
namespace: company.team

tasks:
  - id: run-batch-sql
    type: io.kestra.plugin.flink.SubmitSql
    gatewayUrl: "http://flink-sql-gateway:8083"
    statement: |
      CREATE TABLE daily_summary AS
      SELECT DATE(order_time) as order_date,
             COUNT(*) as order_count,
             SUM(amount) as total_amount
      FROM orders
      WHERE order_time >= '2024-01-01'
      GROUP BY DATE(order_time)
    sessionConfig:
      configuration:
        execution.runtime-mode: "batch"
Properties
SubTypestring
Default30
Definitions
catalogstring
configurationobject
SubTypestring
databasestring
Default300