RealtimeTrigger
Consume a message in real-time from a Kafka topic and create one execution per message.
If you would like to consume multiple messages processed within a given time frame and process them in batch, you can use the io.kestra.plugin.kafka.Trigger instead.
type: "io.kestra.plugin.kafka.RealtimeTrigger"
Consume a message from a Kafka topic in real time.
id: kafka_realtime_trigger
namespace: company.team
tasks:
- id: log
type: io.kestra.plugin.core.log.Log
message: "{{ trigger.value }}"
triggers:
- id: realtime_trigger
type: io.kestra.plugin.kafka.RealtimeTrigger
topic: test_kestra
properties:
bootstrap.servers: localhost:9092
serdeProperties:
schema.registry.url: http://localhost:8085
keyDeserializer: STRING
valueDeserializer: AVRO
groupId: kafkaConsumerGroupId
Use Kafka Realtime Trigger to push events into MongoDB
id: kafka_realtime_trigger
namespace: company.team
tasks:
- id: insert_into_mongodb
type: io.kestra.plugin.mongodb.InsertOne
connection:
uri: mongodb://mongoadmin:secret@localhost:27017/?authSource=admin
database: kestra
collection: products
document: |
{
"product_id": "{{ trigger.value | jq('.product_id') | first }}",
"product_name": "{{ trigger.value | jq('.product_name') | first }}",
"category": "{{ trigger.value | jq('.product_category') | first }}",
"brand": "{{ trigger.value | jq('.brand') | first }}"
}
triggers:
- id: realtime_trigger
type: io.kestra.plugin.kafka.RealtimeTrigger
topic: products
properties:
bootstrap.servers: localhost:9092
serdeProperties:
valueDeserializer: JSON
groupId: kestraConsumer
Kafka consumer group ID.
Using a consumer group, we will fetch only records that haven't been consumed yet.
Kafka connection properties.
The bootstrap.servers
property is a minimal required configuration to connect to a Kafka topic.
This property can reference any valid Consumer Configs or
Producer Configs as key-value pairs.
If you want to pass a truststore or a keystore, you must provide a base64 encoded string for ssl.keystore.location
and ssl.truststore.location
.
STRING
STRING
INTEGER
FLOAT
DOUBLE
LONG
SHORT
BYTE_ARRAY
BYTE_BUFFER
BYTES
UUID
VOID
AVRO
JSON
The deserializer used for the value.
Possible values are: STRING
, INTEGER
, FLOAT
, DOUBLE
, LONG
, SHORT
, BYTE_ARRAY
, BYTE_BUFFER
, BYTES
, UUID
, VOID
, AVRO
, JSON
.
STRING
STRING
INTEGER
FLOAT
DOUBLE
LONG
SHORT
BYTE_ARRAY
BYTE_BUFFER
BYTES
UUID
VOID
AVRO
JSON
The deserializer used for the key.
Possible values are: STRING
, INTEGER
, FLOAT
, DOUBLE
, LONG
, SHORT
, BYTE_ARRAY
, BYTE_BUFFER
, BYTES
, UUID
, VOID
, AVRO
, JSON
.
Topic partitions to consume messages from.
Manually assign a list of partitions to the consumer.
{}
Serializer configuration
Configuration that will be passed to serializer or deserializer. The avro.use.logical.type.converters
is always passed when you have any values set to true
.
Timestamp of a message to start consuming messages from.
By default, we consume all messages from the topics with no consumer group or depending on the configuration of the auto.offset.reset
property. However, you can provide an arbitrary start time.
This property is ignored if a consumer group is used.
It must be a valid ISO 8601 date.
CREATED
RUNNING
PAUSED
RESTARTED
KILLING
SUCCESS
WARNING
FAILED
KILLED
CANCELLED
QUEUED
RETRYING
RETRIED
SKIPPED
List of execution states after which a trigger should be stopped (a.k.a. disabled).
Kafka topic(s) to consume messages from.
It can be a string or a list of strings to consume from one or multiple topics.
Kafka topic pattern to consume messages from.
Consumer will subscribe to all topics matching the specified pattern to get dynamically assigned partitions.