Blueprints

Extract multiple tables from Postgres using SQL queries and process those as Pandas dataframes on schedule

About this blueprint

Local files Parallel Postgres Python SQL

This flow uses the Parallel task to run multiple tasks concurrently - the limit of how many tasks will run at the same time is defined using the concurrent property.

The flow extracts data from a Postgres database. That data is then passed to a Python task using LocalFiles. The Python task reads the local files, and performs operations on the data using Pandas.

yaml
id: postgresToPandasDataframes
namespace: blueprint

variables:
  db_host: host.docker.internal

tasks:
  - id: getTables
    type: io.kestra.core.tasks.flows.Parallel
    concurrent: 2
    tasks:
      - id: products
        type: io.kestra.plugin.jdbc.postgresql.CopyOut
        sql: SELECT * FROM products
      
      - id: orders
        type: io.kestra.plugin.jdbc.postgresql.CopyOut
        sql: SELECT * FROM orders
  
  - id: wdir
    type: io.kestra.core.tasks.flows.WorkingDirectory
    tasks:
      - id: inputs
        type: io.kestra.core.tasks.storages.LocalFiles
        inputs:
          products.csv: "{{ outputs.products.uri }}"
          orders.csv: "{{ outputs.orders.uri }}"
      
      - id: pandas
        type: io.kestra.plugin.scripts.python.Script
        warningOnStdErr: false
        docker:
          image: ghcr.io/kestra-io/pydata:latest
        script: |
          import pandas as pd
          products = pd.read_csv("products.csv")
          orders = pd.read_csv("orders.csv")
          df = orders.merge(products, on="product_id", how="left")

          top = (
              df.groupby("product_name", as_index=False)["total"]
              .sum()
              .sort_values("total", ascending=False)
              .head(10)
          )
          
          top.to_json("{{ outputDir }}/bestsellers_pandas.json", orient="records")

taskDefaults:
  - type: io.kestra.plugin.jdbc.postgresql.CopyOut
    values:
      url: jdbc:postgresql://{{ vars.db_host }}:5432/
      username: postgres
      password: "{{ secret('DB_PASSWORD') }}"
      format: CSV
      header: true
      delimiter: ","

triggers:
  - id: everyMorning
    type: io.kestra.core.models.triggers.types.Schedule
    cron: 0 9 * * *

Parallel

Copy Out

Working Directory

Local Files

Script

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra