Blueprints

Getting started with Kestra — a Data Warehouse and Analytics workflow example

Source

yaml
id: dwh-and-analytics
namespace: tutorial
description: Data Warehouse and Analytics

tasks:
  - id: dbt
    type: io.kestra.plugin.core.flow.WorkingDirectory
    tasks:
      - id: clone_repository
        type: io.kestra.plugin.git.Clone
        url: https://github.com/kestra-io/dbt-demo
        branch: main

      - id: dbt_build
        type: io.kestra.plugin.dbt.cli.DbtCLI
        taskRunner:
          type: io.kestra.plugin.scripts.runner.docker.Docker
        containerImage: ghcr.io/kestra-io/dbt-duckdb:latest
        commands:
          - dbt deps
          - dbt build
        profiles: |
          jaffle_shop:
            outputs:
              dev:
                type: duckdb
                path: dbt.duckdb
                extensions: 
                  - parquet
                fixed_retries: 1
                threads: 16
                timeout_seconds: 300
            target: dev      

      - id: python
        type: io.kestra.plugin.scripts.python.Script
        outputFiles:
          - "*.csv"
        taskRunner:
          type: io.kestra.plugin.scripts.runner.docker.Docker
        containerImage: ghcr.io/kestra-io/duckdb:latest
        script: >
          import duckdb import pandas as pd

          conn = duckdb.connect(database='dbt.duckdb', read_only=False)

          tables_query = "SELECT table_name FROM information_schema.tables WHERE
          table_schema = 'main';"

          tables = conn.execute(tables_query).fetchall()

          # Export each table to CSV, excluding tables that start with 'raw' or
          'stg'

          for table_name in tables:
              table_name = table_name[0]
              # Skip tables with names starting with 'raw' or 'stg'
              if not table_name.startswith('raw') and not table_name.startswith('stg'):
                  query = f"SELECT * FROM {table_name}"
                  df = conn.execute(query).fetchdf()
                  df.to_csv(f"{table_name}.csv", index=False)

          conn.close()

About this blueprint

Getting Started Docker Python Data Git dbt DuckDB

This flow is a simple example of a data warehouse and analytics use case. It clones a dbt repository, builds the dbt project, and exports the data to CSV files. The flow has three tasks: 1. The first task clones a dbt repository. 2. The second task builds the dbt models and tests using DuckDB. 3. The third task exports the transformed data to CSV files.

Working Directory

Clone

Dbt CLI

Docker

Script

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra