Blueprints
Process partitions in parallel in Python, report outputs and metrics about number of rows and processing time for all partitioned files
Source
yaml
id: python-partitions-metrics
namespace: company.team
description: Process partitions in parallel
tasks:
- id: get_partitions
type: io.kestra.plugin.scripts.python.Script
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.Docker
containerImage: ghcr.io/kestra-io/pydata:latest
script: |
from kestra import Kestra
partitions = [f"file_{nr}.parquet" for nr in range(1, 10)]
Kestra.outputs({'partitions': partitions})
- id: process_partitions
type: io.kestra.plugin.core.flow.ForEach
concurrencyLimit: 0
values: "{{ outputs.get_partitions.vars.partitions }}"
tasks:
- id: partition
type: io.kestra.plugin.scripts.python.Script
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.Docker
containerImage: ghcr.io/kestra-io/pydata:latest
script: |
import random
import time
from kestra import Kestra
filename = '{{ taskrun.value }}'
print(f"Reading and processing partition {filename}")
nr_rows = random.randint(1, 1000)
processing_time = random.randint(1, 20)
time.sleep(processing_time)
Kestra.counter('nr_rows', nr_rows, {'partition': filename})
Kestra.timer('processing_time', processing_time, {'partition':
filename})
About this blueprint
Parallel Metrics Python Outputs
This flow extracts a list of partitions and then processes each partition in parallel in isolated Python scripts, each running in a separate Docker container. The flow will then track the number of rows and the processing time for each partition, which you can inspect in the Metrics tab.