Blueprints

Read a CSV file and load each row into RabbitMQ

About this blueprint

This blueprint has two flows: read_orders and produce_to_rabbitmq.

  1. read_orders reads the CSV file from a URL, converts it into ION, and generates an execution of produce_to_rabbitmq flow for each row of the ION file.
  2. produce_to_rabbitmq publishes the record into RabbitMQ.

Here is the code of the parent flow read_orders:

yaml
id: read_orders
namespace: blueprint

tasks:
  - id: csv
    type: io.kestra.plugin.fs.http.Download
    uri: "https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv"

  - id: csv_to_ion
    type: io.kestra.plugin.serdes.csv.CsvToIon
    from: "{{ outputs.csv.uri }}"

  - id: each
    type: io.kestra.core.tasks.flows.ForEachItem
    items: "{{ outputs.csv_to_ion.uri }}"
    batch:
      rows: 1
    namespace: prod
    flowId: orders
    wait: true # wait for the subflow execution
    transmitFailed: true # fail the task run if the subflow execution fails
    inputs:
      order: "{{ taskrun.items }}"

Execute the read_orders flow. This flow execution will trigger the produce_to_rabbitmq flow for each record.

You can run RabbitMQ locally using Docker with the following command:

bash
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:latest

You can open the RabbitMQ UI locally on http://localhost:15672/ and login using guest/guest.

yaml
id: produce_to_rabbitmq
namespace: company.team

inputs:
  - id: order
    type: STRING

tasks:
  - id: publish_to_rabbitmq
    type: "io.kestra.plugin.amqp.Publish"
    url: amqp://guest:guest@localhost:5672/
    exchange: test-queue
    from:
    -  data: "{{ read(inputs.order) }}"

Publish

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra