Blueprints

Getting started with Kestra — a Data Warehouse and Analytics workflow example

About this blueprint

Docker DuckDB Getting Started Git Python dbt

This flow is a simple example of a data warehouse and analytics use case. It clones a dbt repository, builds the dbt project, and exports the data to CSV files.

The flow has three tasks:

  1. The first task clones a dbt repository.
  2. The second task builds the dbt models and tests using DuckDB.
  3. The third task exports the transformed data to CSV files.
yaml
id: dwh_and_analytics
namespace: tutorial

tasks:
  - id: dbt
    type: io.kestra.core.tasks.flows.WorkingDirectory
    tasks:
    - id: clone_repository
      type: io.kestra.plugin.git.Clone
      url: https://github.com/kestra-io/dbt-demo
      branch: main

    - id: dbt_build
      type: io.kestra.plugin.dbt.cli.DbtCLI
      runner: DOCKER
      docker:
        image: ghcr.io/kestra-io/dbt-duckdb:latest
      commands:
        - dbt deps
        - dbt build
      profiles: |
        jaffle_shop:
          outputs:
            dev:
              type: duckdb
              path: dbt.duckdb
              extensions: 
                - parquet
              fixed_retries: 1
              threads: 16
              timeout_seconds: 300
          target: dev      

    - id: python
      type: io.kestra.plugin.scripts.python.Script
      outputFiles:
        - "*.csv"
      docker:
        image: ghcr.io/kestra-io/duckdb:latest
      script: |
        import duckdb
        import pandas as pd

        conn = duckdb.connect(database='dbt.duckdb', read_only=False)

        tables_query = "SELECT table_name FROM information_schema.tables WHERE table_schema = 'main';"
        tables = conn.execute(tables_query).fetchall()

        # Export each table to CSV, excluding tables that start with 'raw' or 'stg'
        for table_name in tables:
            table_name = table_name[0]
            # Skip tables with names starting with 'raw' or 'stg'
            if not table_name.startswith('raw') and not table_name.startswith('stg'):
                query = f"SELECT * FROM {table_name}"
                df = conn.execute(query).fetchdf()
                df.to_csv(f"{table_name}.csv", index=False)

        conn.close()

Working Directory

Clone

Dbt CLI

Script

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra