Blueprints
Extract multiple tables from Postgres using SQL queries and process those as Pandas dataframes on schedule
Source
yaml
id: postgres-to-pandas-dataframes
namespace: company.team
variables:
db_host: host.docker.internal
tasks:
- id: get_tables
type: io.kestra.plugin.core.flow.Parallel
concurrent: 2
tasks:
- id: products
type: io.kestra.plugin.jdbc.postgresql.CopyOut
sql: SELECT * FROM products
- id: orders
type: io.kestra.plugin.jdbc.postgresql.CopyOut
sql: SELECT * FROM orders
- id: pandas
type: io.kestra.plugin.scripts.python.Script
warningOnStdErr: false
inputFiles:
products.csv: "{{ outputs.products.uri }}"
orders.csv: "{{ outputs.orders.uri }}"
outputFiles:
- bestsellers_pandas.json
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.Docker
containerImage: ghcr.io/kestra-io/pydata:latest
script: |
import pandas as pd
products = pd.read_csv("products.csv")
orders = pd.read_csv("orders.csv")
df = orders.merge(products, on="product_id", how="left")
top = (
df.groupby("product_name", as_index=False)["total"]
.sum()
.sort_values("total", ascending=False)
.head(10)
)
top.to_json("bestsellers_pandas.json", orient="records")
pluginDefaults:
- type: io.kestra.plugin.jdbc.postgresql.CopyOut
values:
url: jdbc:postgresql://{{ vars.db_host }}:5432/
username: postgres
password: "{{ secret('DB_PASSWORD') }}"
format: CSV
header: true
delimiter: ","
triggers:
- id: every_morning
type: io.kestra.plugin.core.trigger.Schedule
cron: 0 9 * * *
About this blueprint
Parallel Postgres Python SQL
This flow uses the Parallel
task to run multiple tasks concurrently - the limit of how many tasks will run at the same time is defined using the concurrent
property.
The flow extracts data from a Postgres database. That data is then passed to a Python task using inputFiles
. The Python task reads the input files, and performs operations on the data using Pandas.