Source
yaml
id: zip-to-parquet
namespace: company.team
variables:
file_id: "{{ execution.startDate | dateAdd(-3, 'MONTHS') | date('yyyyMM') }}"
tasks:
- id: get_zipfile
type: io.kestra.plugin.core.http.Download
uri: https://divvy-tripdata.s3.amazonaws.com/{{ render(vars.file_id)
}}-divvy-tripdata.zip
- id: unzip
type: io.kestra.plugin.compress.ArchiveDecompress
algorithm: ZIP
from: "{{ outputs.get_zipfile.uri }}"
- id: parquet_output
type: io.kestra.plugin.scripts.python.Script
warningOnStdErr: false
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.Docker
containerImage: ghcr.io/kestra-io/pydata:latest
env:
FILE_ID: "{{ render(vars.file_id) }}"
inputFiles: "{{ outputs.unzip.files }}"
script: |
import os
import pandas as pd
file_id = os.environ["FILE_ID"]
file = f"{file_id}-divvy-tripdata.csv"
df = pd.read_csv(file)
df.to_parquet(f"{file_id}.parquet")
outputFiles:
- "*.parquet"
About this blueprint
Variables Ingest Python
This flow downloads a Divvy Trip Data dataset in a zip file format from an HTTP endpoint, unzips it, and converts it to a parquet format in Python using Pandas.