Orchestrate dbt Projects with SQLMesh in Kestra

For the complete documentation index, see llms.txt. For a full content snapshot, see llms-full.txt. Append .md to any kestra.io/docs/* URL for plain Markdown.

Using SQLMesh to run dbt project with Kestra.

SQLMesh is an open source python data transformation and modelling framework. It automates everything needed to run a scalable data transformation platform. SQLMesh works with a variety of engines and orchestrators.

SQLMesh enables data teams to efficiently run and deploy data transformations written in SQL or Python.

This guide shows how to run dbt projects on BigQuery using SQLMesh with Kestra.

Example

Our Flow will do the following steps:

  1. Download orders.csv using HTTP download task.
  2. Create the table in BigQuery.
  3. Upload the data from the csv file into the BigQuery table.
  4. Create a dbt project which will create the BigQuery view from the BigQuery table.
  5. Create SQLMeshCLI task that will run the dbt project.

SQLMesh supports integration with a variety of tools like Airflow, dbt, dlt, etc. One of the common use-cases of SQLMesh is to run dbt projects.

You can choose to pull your dbt project from a Git repository as mentioned in the How-to guide on dbt or create namespace files for the project. This guide creates the complete project using namespace files built up step by step. You can later choose to push all the namespace files to a GitHub repository using PushNamespaceFiles.

Creating the flow with the SQLMeshCLI task

Create tasks for each step:

id: sqlmesh_transform
namespace: company.team
tasks:
- id: orders_http_download
type: io.kestra.plugin.core.http.Download
description: Download orders.csv using HTTP Download
uri: https://huggingface.co/datasets/kestra/datasets/raw/#main/csv/orders.csv
- id: create_orders_table
type: io.kestra.plugin.gcp.bigquery.CreateTable
description: Create orders table in BigQuery
serviceAccount: "{{ secret('GCP_SERVICE_ACCOUNT_JSON') }}"
projectId: <gcp-project-id>
dataset: ecommerce
table: orders
tableDefinition:
type: TABLE
schema:
fields:
- name: order_id
type: INT64
- name: customer_name
type: STRING
- name: customer_email
type: STRING
- name: product_id
type: INT64
- name: price
type: FLOAT64
- name: quantity
type: INT64
- name: total
type: FLOAT64
- id: load_orders_table
type: io.kestra.plugin.gcp.bigquery.Load
description: Load orders table with data from orders.csv
from: "{{ outputs.orders_http_download.uri }}"
projectId: <gcp-project-id>
serviceAccount: "{{ secret('GCP_SERVICE_ACCOUNT_JSON') }}"
destinationTable: "<gcp-project-id>.ecommerce.orders"
format: CSV
csvOptions:
fieldDelimiter: ","
skipLeadingRows: 1
- id: sqlmesh_transform
type: io.kestra.plugin.sqlmesh.cli.SQLMeshCLI
description: Use SQLMesh to run the dbt project
inputFiles:
sa.json: "{{ secret('GCP_SERVICE_ACCOUNT_JSON') }}"
namespaceFiles:
enabled : true
beforeCommands:
- pip install "sqlmesh[bigquery]"
- pip install dbt-bigquery
commands:
- sqlmesh init -t dbt
- sqlmesh plan --auto-apply

It’s important that we have the following properties configured:

  • namespaceFiles property has enabled set to true to ensure that the task has access to your namespace files.
  • Provide the GCP service account JSON file so that the task can connect to your GCP account to access BigQuery. See the dedicated guide on how to add it. This file is referenced in the dbt project file.
  • Install the sqlmesh[bigquery] and dbt-bigquery depenedencies with beforeCommands. These allow SQLMesh and dbt to perform operations on BigQuery.

Once the task is created and configured correctly, save the flow.

Creating dbt project

Now go to the Editor, create a new file called profiles.yml with the following content:

bq_dbt_project:
outputs:
dev:
type: bigquery
method: service-account
dataset: ecommerce
project: <gcp-project-id>
keyfile: sa.json
location: US
priority: interactive
threads: 16
timeout_seconds: 300
fixed_retries: 1
target: dev

Next, we will create dbt_project.yml with the following content:

name: 'bq_dbt_project'
version: '1.0.0'
config-version: 2
profile: 'bq_dbt_project'
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
clean-targets:
- "target"
- "dbt_packages"
models:
bq_dbt_project:
example:
+materialized: view
+start: Nov 10 2024

Now create a folder called models in the namespace. In the models folder, create sources.yml to define the source models:

version: 2
sources:
- name: ecommerce
database: <gcp-project-id>
schema: ecommerce
tables:
- name: orders

Lastly, create stg_orders.sql to materialize the stg_orders view for the orders table.

{{ config(materialized="view") }}
select order_id,
customer_name,
customer_email,
product_id,
price,
quantity,
total
from {{ source('ecommerce', 'orders') }}

Thats it! We are now ready to run the flow.

Once the flow runs successfully, you can go to BigQuery console, and ensure that the view stg_orders has been created.

This is how we can run SQLMeshCLI for the dbt project. These instructions can also help you integrate the SQLMeshCLI task with other SQLMesh integrations and execution engines.

Was this page helpful?