Blueprints

Process partitions in parallel in Python, report outputs and metrics about number of rows and processing time for all partitioned files

Source

yaml
id: python-partitions-metrics
namespace: company.team
description: Process partitions in parallel

tasks:
  - id: get_partitions
    type: io.kestra.plugin.scripts.python.Script
    taskRunner:
      type: io.kestra.plugin.scripts.runner.docker.Docker
    containerImage: ghcr.io/kestra-io/pydata:latest
    script: |
      from kestra import Kestra
      partitions = [f"file_{nr}.parquet" for nr in range(1, 10)]
      Kestra.outputs({'partitions': partitions})

  - id: process_partitions
    type: io.kestra.plugin.core.flow.ForEach
    concurrencyLimit: 0
    values: "{{ outputs.get_partitions.vars.partitions }}"
    tasks:
      - id: partition
        type: io.kestra.plugin.scripts.python.Script
        taskRunner:
          type: io.kestra.plugin.scripts.runner.docker.Docker
        containerImage: ghcr.io/kestra-io/pydata:latest
        script: |
          import random
          import time
          from kestra import Kestra

          filename = '{{ taskrun.value }}'
          print(f"Reading and processing partition {filename}")
          nr_rows = random.randint(1, 1000)
          processing_time = random.randint(1, 20)
          time.sleep(processing_time)
          Kestra.counter('nr_rows', nr_rows, {'partition': filename})

          Kestra.timer('processing_time', processing_time, {'partition':
          filename})

About this blueprint

Parallel Metrics Python Outputs

This flow extracts a list of partitions and then processes each partition in parallel in isolated Python scripts, each running in a separate Docker container. The flow will then track the number of rows and the processing time for each partition, which you can inspect in the Metrics tab.

Script

Docker

For Each

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra