Source
yaml
id: postgres-s3-python-script
namespace: company.team
tasks:
- id: api_to_postgres
type: io.kestra.plugin.scripts.python.Script
beforeCommands:
- pip install requests pandas psycopg2 sqlalchemy > /dev/null
warningOnStdErr: false
outputFiles:
- users.json
script: |
import pandas as pd
import requests
from sqlalchemy import create_engine
URL = "https://gorest.co.in/public/v2/users"
req = requests.get(url=URL)
res = req.json()
df_users = pd.DataFrame(res)
df_users["inserted_from"] = "kestra"
df_users.head()
password = "{{secret('DB_PASSWORD')}}"
host = "host.docker.internal"
engine = create_engine(f"postgresql://postgres:{password}@{host}:5432")
df_users.to_sql("users", engine, if_exists="append", index=False)
df_users.to_json("users.json")
- id: s3_upload
type: io.kestra.plugin.aws.s3.Upload
from: "{{ outputs.api_to_postgres.outputFiles['users.json'] }}"
key: users.json
bucket: kestraio
region: eu-central-1
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
About this blueprint
S3 Ingest pip Postgres Python
This flow runs a Python script that:
- Extracts data from an API
- Loads that extracted data to Postgres and a local JSON file. The local file is then uploaded to S3 in the following task.
The Python task runs in a Docker container. Before starting the script, Kestra will install custom package dependencies, as defined by the
beforeCommands
property.