Blueprints

Query data from Druid and load it to Redshift

About this blueprint

AWS Ingest S3 SQL

This flow will:

  1. Query data from a Druid server
  2. Convert the result to a CSV file
  3. Upload the CSV file to S3
  4. Create a table in Redshift if it does not exist
  5. Insert data from the CSV file into the Redshift table.

To set up Apache Druid locally, follow the instructions mentioned in the official documentation. The example Druid dataset wikipedia has been used in this flow.

yaml
id: druid_to_redshift
namespace: blueprint

tasks:
  - id: query_druid
    type: io.kestra.plugin.jdbc.druid.Query
    url: jdbc:avatica:remote:url=http://localhost:8888/druid/v2/sql/avatica/;transparent_reconnection=true
    sql: |
      SELECT __time as edit_time, channel, page, user, delta, added, deleted
      FROM wikipedia
    store: true

  - id: write_to_csv
    type: io.kestra.plugin.serdes.csv.CsvWriter
    from: "{{ outputs.query_druid.uri }}" 

  - id: upload
    type: io.kestra.plugin.aws.s3.Upload
    accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
    secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
    region: "eu-central-1"
    from: "{{ outputs.write_to_csv.uri }}"
    bucket: "<bucket-name>"
    key: "wikipedia/input/wikipedia.csv"

  - id: create_table
    type: io.kestra.plugin.jdbc.redshift.Query
    url: "jdbc:redshift://123456789.eu-central-1.redshift.amazonaws.com:5439/dev"
    username: "{{ secret('REDSHIFT_USER') }}"
    password: "{{ secret('REDSHIFT_PASSWORD') }}"
    sql: |
      create table if not exists wikipedia
      (
          edit_time    varchar(100),
          channel      varchar(100),
          page         varchar(1000),
          wiki_user    varchar(100),
          edit_delta   integer,      
          added        integer,
          deleted      integer
      );

  - id: insert_into_redshift
    type: io.kestra.plugin.jdbc.redshift.Query
    url: "jdbc:redshift://123456789.eu-central-1.redshift.amazonaws.com:5439/dev"
    username: "{{ secret('REDSHIFT_USER') }}"
    password: "{{ secret('REDSHIFT_PASSWORD') }}"
    sql: |
      COPY wikipedia
      FROM 's3://<bucket-name>/wikipedia/input/wikipedia.csv'
      credentials
      'aws_access_key_id=<access-key>;aws_secret_access_key=<secret-key>'
      IGNOREHEADER 1
      CSV;

Query

Csv Writer

Upload

Query

More Related Blueprints

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra