Blueprints
Extract multiple tables from Postgres using SQL queries and process those as Pandas dataframes on schedule
About this blueprint
Local files Parallel Postgres Python SQL
This flow uses the Parallel
task to run multiple tasks concurrently - the limit of how many tasks will run at the same time is defined using the concurrent
property.
The flow extracts data from a Postgres database. That data is then passed to a Python task using LocalFiles
. The Python task reads the local files, and performs operations on the data using Pandas.
yaml
id: postgresToPandasDataframes
namespace: blueprint
variables:
db_host: host.docker.internal
tasks:
- id: getTables
type: io.kestra.core.tasks.flows.Parallel
concurrent: 2
tasks:
- id: products
type: io.kestra.plugin.jdbc.postgresql.CopyOut
sql: SELECT * FROM products
- id: orders
type: io.kestra.plugin.jdbc.postgresql.CopyOut
sql: SELECT * FROM orders
- id: wdir
type: io.kestra.core.tasks.flows.WorkingDirectory
tasks:
- id: inputs
type: io.kestra.core.tasks.storages.LocalFiles
inputs:
products.csv: "{{ outputs.products.uri }}"
orders.csv: "{{ outputs.orders.uri }}"
- id: pandas
type: io.kestra.plugin.scripts.python.Script
warningOnStdErr: false
docker:
image: ghcr.io/kestra-io/pydata:latest
script: |
import pandas as pd
products = pd.read_csv("products.csv")
orders = pd.read_csv("orders.csv")
df = orders.merge(products, on="product_id", how="left")
top = (
df.groupby("product_name", as_index=False)["total"]
.sum()
.sort_values("total", ascending=False)
.head(10)
)
top.to_json("{{ outputDir }}/bestsellers_pandas.json", orient="records")
taskDefaults:
- type: io.kestra.plugin.jdbc.postgresql.CopyOut
values:
url: jdbc:postgresql://{{ vars.db_host }}:5432/
username: postgres
password: "{{ secret('DB_PASSWORD') }}"
format: CSV
header: true
delimiter: ","
triggers:
- id: everyMorning
type: io.kestra.core.models.triggers.types.Schedule
cron: 0 9 * * *