Source
id: azure-blob-to-bigquery
namespace: company.team
tasks:
  - id: each
    type: io.kestra.plugin.core.flow.ForEach
    concurrencyLimit: 0
    values: "{{ trigger.blobs | jq('.[].uri') }}"
    tasks:
      - id: upload_from_file
        type: io.kestra.plugin.gcp.bigquery.Load
        destinationTable: gcpProject.dataset.table
        from: "{{ taskrun.value }}"
        writeDisposition: WRITE_APPEND
        projectId: yourGcpProject
        serviceAccount: "{{ secret('GCP_CREDS') }}"
        ignoreUnknownValues: true
        autodetect: true
        format: CSV
        csvOptions:
          allowJaggedRows: true
          encoding: UTF-8
          fieldDelimiter: ","
  - id: dbt_cloud_job
    type: io.kestra.plugin.dbt.cloud.TriggerRun
    accountId: "{{ secret('DBT_CLOUD_ACCOUNT_ID') }}"
    token: "{{ secret('DBT_CLOUD_API_TOKEN') }}"
    jobId: "366381"
    wait: true
triggers:
  - id: watch
    type: io.kestra.plugin.azure.storage.blob.Trigger
    interval: PT30S
    endpoint: https://kestra.blob.core.windows.net
    connectionString: "{{ secret('AZURE_CONNECTION_STRING') }}"
    container: stage
    prefix: marketplace/
    action: MOVE
    moveTo:
      container: stage
      name: archive/marketplace/
About this blueprint
dbt GCP Azure
When a new file arrives in Azure Blob Storage, this flow processes that file and uploads it to BigQuery. The flow is configured to process multiple files in parallel if multiple files are uploaded to Azure BLOB container.
Then, the flow runs a dbt Cloud job and waits for its completion. Kestra retrieves dbt Cloud job's artifacts and dynamically parses them within the Gantt view. This way, you can inspect how long it took to run each dbt model and dbt test.
This flow assumes that sensitive information such as Azure Connection string, Google Service Account, and dbt Cloud API token are stored as secrets. For testing, you can copy-paste those directly into the flow.
