Getting started with Kestra — a File Processing workflow example

About this blueprint

API Getting Started Local files

This flow is a simple example of a file processing use case. It downloads a zip file, unzips it, reads a CSV file, and writes the data to a Parquet file, ensuring that the data follows a specific schema.

The flow has four tasks:

  1. The first task downloads a zip file.
  2. The second task unzips the file.
  3. The third task reads the CSV file.
  4. The fourth task writes the data to a Parquet file.
id: file_processing
namespace: tutorial
description: File Processing

  file_id: "{{ execution.startDate | dateAdd(-3, 'MONTHS') | date('yyyyMM') }}"

  - id: get_zipfile
    type: io.kestra.plugin.core.http.Download
    uri: "{{ render(vars.file_id) }}"

  - id: unzip
    type: io.kestra.plugin.compress.ArchiveDecompress
    algorithm: ZIP
    from: "{{ outputs.get_zipfile.uri }}"

  - id: csv_to_ion
    type: io.kestra.plugin.serdes.csv.CsvToIon
    from: "{{outputs.unzip.files[render(vars.file_id) ~ '-divvy-tripdata.csv']}}"

  - id: to_parquet
    type: io.kestra.plugin.serdes.avro.IonToAvro
    from: "{{ outputs.csv_to_ion.uri }}"
    datetimeFormat: "yyyy-MM-dd' 'HH:mm:ss"
    schema: |
        "type": "record",
        "name": "Ride",
        "namespace": "com.example.bikeshare",
        "fields": [
          {"name": "ride_id", "type": "string"},
          {"name": "rideable_type", "type": "string"},
          {"name": "started_at", "type": {"type": "long", "logicalType": "timestamp-millis"}},
          {"name": "ended_at", "type": {"type": "long", "logicalType": "timestamp-millis"}},
          {"name": "start_station_name", "type": "string"},
          {"name": "start_station_id", "type": "string"},
          {"name": "end_station_name", "type": "string"},
          {"name": "end_station_id", "type": "string"},
          {"name": "start_lat", "type": "double"},
          {"name": "start_lng", "type": "double"},
            "name": "end_lat", 
            "type": ["null", "double"],
            "default": null
            "name": "end_lng",
            "type": ["null", "double"],
            "default": null
          {"name": "member_casual", "type": "string"}


Archive Decompress

Csv To Ion

Ion To Avro

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra