Source
yaml
id: produce-kafka-message
namespace: company.team
tasks:
- id: api
type: io.kestra.plugin.core.http.Request
uri: https://dummyjson.com/products
- id: produce
type: io.kestra.plugin.kafka.Produce
from:
key: mykey
value: "{{ outputs.api.body }}"
timestamp: "{{ execution.startDate }}"
headers:
x-header: some value
keySerializer: STRING
valueSerializer: JSON
topic: mytopic
properties:
bootstrap.servers: my.kafka.k8s.com:9094
About this blueprint
Variables Queue API
This flow shows how to extract data from an HTTP API and send it to a Kafka topic using the Kafka plugin. It assumes that you have a Kafka cluster running, and that you created a topic named mytopic
. Make sure to replace the bootstrap.servers
value with your Kafka cluster URL.
The from
argument expects a map or a list of maps with key-value pairs. The allowed keys are: key
, value
, partition
, timestamp
, and headers
.
In this example, we're using the outputs.api.body
value, which is a JSON-formatted response body from the api
task. This is why the valueSerializer
argument is set to JSON
.