Source
yaml
id: produce-to-rabbitmq
namespace: company.team
inputs:
- id: order
type: STRING
tasks:
- id: publish_to_rabbitmq
type: io.kestra.plugin.amqp.Publish
url: amqp://guest:guest@localhost:5672/
exchange: test-queue
from:
- data: "{{ read(inputs.order) }}"
About this blueprint
This blueprint has two flows: read_orders
and produce_to_rabbitmq
.
read_orders
reads the CSV file from a URL, converts it into ION, and generates an execution ofproduce_to_rabbitmq
flow for each row of the ION file.produce_to_rabbitmq
publishes the record into RabbitMQ. Here is the code of the parent flowread_orders
:
yaml
tasks:
- id: csv
type: io.kestra.plugin.fs.http.Download
uri: "https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv"
- id: csv_to_ion
type: io.kestra.plugin.serdes.csv.CsvToIon
from: "{{ outputs.csv.uri }}"
- id: each
type: io.kestra.core.tasks.flows.ForEachItem
items: "{{ outputs.csv_to_ion.uri }}"
batch:
rows: 1
namespace: company.team
flowId: produce_to_rabbitmq
wait: true # wait for the subflow execution
transmitFailed: true # fail the task run if the subflow execution fails
inputs:
order: "{{ taskrun.items }}"
Execute the read_orders
flow. This flow execution will trigger the produce_to_rabbitmq
flow for each record.
You can run RabbitMQ locally using Docker with the following command:
bash docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:latest
You can open the RabbitMQ UI locally on http://localhost:15672/
and login using guest
/guest
.