Source
yaml
id: dlt-pipedrive-to-bigquery
namespace: company.team
tasks:
- id: dlt_pipeline
type: io.kestra.plugin.scripts.python.Script
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.Docker
containerImage: python:3.11
beforeCommands:
- pip install dlt[bigquery]
- dlt --non-interactive init pipedrive bigquery
env:
DESTINATION__BIGQUERY__CREDENTIALS__PROJECT_ID: "{{ secret('BIGQUERY_PROJECT_ID') }}"
DESTINATION__BIGQUERY__CREDENTIALS__PRIVATE_KEY: "{{ secret('BIGQUERY_PRIVATE_KEY') }}"
DESTINATION__BIGQUERY__CREDENTIALS__CLIENT_EMAIL: "{{ secret('BIGQUERY_CLIENT_EMAIL') }}"
SOURCES__PIPEDRIVE__CREDENTIALS__PIPEDRIVE_API_KEY: "{{ secret('PIPEDRIVE_API_KEY') }}"
script: |
import dlt
from pipedrive import pipedrive_source
pipeline = dlt.pipeline(
pipeline_name="pipedrive_pipeline",
destination="biquery",
dataset_name="pipedrive",
)
load_info = pipeline.run(pipedrive_source())
triggers:
- id: hourly
type: io.kestra.plugin.core.trigger.Schedule
cron: "@hourly"
About this blueprint
Data
This flow ingests data from Pipedrive CRM and loads it into BigQuery using dlt (data load tool), running automatically on an hourly schedule.
All ingestion logic is implemented in a single Python script that:
- Connects to the Pipedrive API using an API key
- Extracts and normalizes CRM data with dlt
- Loads the data into BigQuery with schema management handled automatically
API credentials for both Pipedrive and BigQuery are securely managed using Kestra Secrets. This setup is well suited for keeping CRM data continuously up to date in BigQuery for analytics, reporting, or downstream transformations.
More Related Blueprints