Using SQLMesh to run dbt Projects
Available on: >= 0.18.0
Using SQLMesh to run dbt project with Kestra.
What is SQLMesh?
SQLMesh is an open source python data transformation and modelling framework. It automates everything needed to run a scalable data transformation platform. SQLMesh works with a variety of engines and orchestrators.
SQLMesh enables data teams to efficiently run and deploy data transformations written in SQL or Python.
In this guide, we will learn how to run dbt projects based on BigQuery using SQLMesh with Kestra.
Example
Our Flow will do the following steps:
- Download
orders.csv
using HTTP download task. - Create the table in BigQuery.
- Upload the data from the csv file into the BigQuery table.
- Create a dbt project which will create the BigQuery view from the BigQuery table.
- Create SQLMeshCLI task that will run the dbt project.
SQLMesh supports integration with a variety of tools like Airflow, dbt, dlt, etc. One of the common use-cases of SQLMesh is to run dbt projects.
You can choose to pull your dbt project from a Git repository as mentioned in the How-to guide on dbt or create namespace files for the project. Here, we will create the complete project using namespace files which we will create as we go. You can later choose to push all the namespace files to a GitHub repository using PushNamespaceFiles.
Creating our Flow with SQLMeshCLI Task
Based on the steps mentioned in the description, let's create tasks for each step.
id: sqlmesh_transform
namespace: company.team
tasks:
- id: orders_http_download
type: io.kestra.plugin.core.http.Download
description: Download orders.csv using HTTP Download
uri: https://huggingface.co/datasets/kestra/datasets/raw/#main/csv/orders.csv
- id: create_orders_table
type: io.kestra.plugin.gcp.bigquery.CreateTable
description: Create orders table in BigQuery
serviceAccount: "{{ secret('GCP_SERVICE_ACCOUNT_JSON') }}"
projectId: <gcp-project-id>
dataset: ecommerce
table: orders
tableDefinition:
type: TABLE
schema:
fields:
- name: order_id
type: INT64
- name: customer_name
type: STRING
- name: customer_email
type: STRING
- name: product_id
type: INT64
- name: price
type: FLOAT64
- name: quantity
type: INT64
- name: total
type: FLOAT64
- id: load_orders_table
type: io.kestra.plugin.gcp.bigquery.Load
description: Load orders table with data from orders.csv
from: "{{ outputs.orders_http_download.uri }}"
projectId: <gcp-project-id>
serviceAccount: "{{ secret('GCP_SERVICE_ACCOUNT_JSON') }}"
destinationTable: "<gcp-project-id>.ecommerce.orders"
format: CSV
csvOptions:
fieldDelimiter: ","
skipLeadingRows: 1
- id: sqlmesh_transform
type: io.kestra.plugin.sqlmesh.cli.SQLMeshCLI
description: Use SQLMesh to run the dbt project
inputFiles:
sa.json: "{{ secret('GCP_SERVICE_ACCOUNT_JSON') }}"
namespaceFiles:
enabled : true
beforeCommands:
- pip install "sqlmesh[bigquery]"
- pip install dbt-bigquery
commands:
- sqlmesh init -t dbt
- sqlmesh plan --auto-apply
It's important that we have the following properties configured:
namespaceFiles
property hasenabled
set totrue
to ensure that the task has access to your namespace files.- Provide the GCP service account JSON file so that the task is able to connect to your GCP account in order to access BigQuery. Check out the dedicated guide on how to add it. This file is referenced in the dbt project file.
- Install the
sqlmesh[bigquery]
anddbt-bigquery
depenedencies withbeforeCommands
. These allow SQLMesh and dbt to perform operations on BigQuery.
Once the task is created and configured correctly, save the flow.
Creating dbt project
Now go to the Editor, create a new file called profiles.yml
with the following content:
bq_dbt_project:
outputs:
dev:
type: bigquery
method: service-account
dataset: ecommerce
project: <gcp-project-id>
keyfile: sa.json
location: US
priority: interactive
threads: 16
timeout_seconds: 300
fixed_retries: 1
target: dev
Next, we will create dbt_project.yml
with the following content:
name: 'bq_dbt_project'
version: '1.0.0'
config-version: 2
profile: 'bq_dbt_project'
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
clean-targets:
- "target"
- "dbt_packages"
models:
bq_dbt_project:
example:
+materialized: view
+start: Nov 10 2024
Note that models
require a start date for backfilling data through use of the start
configuration parameter.
Now create a folder called models
in the namespace. In the models
folder, we will create sources.yml
which will define the source models, with the following content:
version: 2
sources:
- name: ecommerce
database: <gcp-project-id>
schema: ecommerce
tables:
- name: orders
Lastly, we will create stg_orders.sql
which will materialize the stg_orders
view for the orders
table.
{{ config(materialized="view") }}
select order_id,
customer_name,
customer_email,
product_id,
price,
quantity,
total
from {{ source('ecommerce', 'orders') }}
Thats it! We are now ready to run the flow.
Once the flow runs successfully, you can go to BigQuery console, and ensure that the view stg_orders
has been created.
This is how we can run SQLMeshCLI for the dbt project. These instructions can also help you integrate the SQLMeshCLI task with other SQLMesh integrations and execution engines.
Was this page helpful?