Blueprints

Getting started with Kestra — a Data Warehouse and Analytics workflow example

Source

yaml
id: dwh-and-analytics
namespace: tutorial
labels:
  name: Data Warehouse and Analytics

tasks:
  - id: dbt
    type: io.kestra.plugin.core.flow.WorkingDirectory
    description: Run the dbt demo project in a dedicated working directory.
    tasks:
      - id: clone_repository
        type: io.kestra.plugin.git.Clone
        description: Clone the dbt demo repository.
        url: https://github.com/kestra-io/dbt-demo
        branch: main

      - id: dbt_build
        type: io.kestra.plugin.dbt.cli.DbtCLI
        description: Install dependencies and build the dbt project with DuckDB.
        taskRunner:
          type: io.kestra.plugin.scripts.runner.docker.Docker
        containerImage: ghcr.io/kestra-io/dbt-duckdb:latest
        commands:
          - dbt deps
          - dbt build
        profiles: |
          jaffle_shop:
            outputs:
              dev:
                type: duckdb
                path: dbt.duckdb
                extensions: 
                  - parquet
                fixed_retries: 1
                threads: 16
                timeout_seconds: 300
            target: dev      

      - id: python
        type: io.kestra.plugin.scripts.python.Script
        description: Export the dbt tables to CSV files for analysis.
        outputFiles:
          - "*.csv"
        taskRunner:
          type: io.kestra.plugin.scripts.runner.docker.Docker
        containerImage: ghcr.io/kestra-io/duckdb:latest
        script: |
          import duckdb
          import pandas as pd

          conn = duckdb.connect(database='dbt.duckdb', read_only=False)

          tables_query = "SELECT table_name FROM information_schema.tables WHERE table_schema = 'main';"

          tables = conn.execute(tables_query).fetchall()

          # Export each table to CSV, excluding tables that start with 'raw' or 'stg'

          for table_name in tables:
              table_name = table_name[0]
              # Skip tables with names starting with 'raw' or 'stg'
              if not table_name.startswith('raw') and not table_name.startswith('stg'):
                  query = f"SELECT * FROM {table_name}"
                  df = conn.execute(query).fetchdf()
                  df.to_csv(f"{table_name}.csv", index=False)

          conn.close()

description: |
  # Flow Description
  **Use case:** Data warehouse and analytics demo that builds a dbt project and exports modeled tables.
  **Highlights:**
  - Clone the dbt demo repository into an isolated working directory.
  - Run `dbt deps` and `dbt build` against DuckDB using the provided container image and profile.
  - Export modeled tables (excluding raw/stg) to CSV files via Python for downstream analysis.

About this blueprint

Data

This flow is a simple example of a data warehouse and analytics use case. It clones a dbt repository, builds the dbt project, and exports the data to CSV files.

The flow has three tasks:

  1. The first task clones a dbt repository.
  2. The second task builds the dbt models and tests using DuckDB.
  3. The third task exports the transformed data to CSV files.

Working Directory

Clone

Dbt CLI

Docker

Script

More Related Blueprints

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra