RealtimeTrigger RealtimeTrigger

yaml
type: "io.kestra.plugin.gcp.pubsub.RealtimeTrigger"

Consume a message in real-time from a Pub/Sub topic and create one execution per message.

If you would like to consume multiple messages processed within a given time frame and process them in batch, you can use the io.kestra.plugin.gcp.pubsub.Trigger instead.

Examples

Consume a message from a Pub/Sub topic in real-time.

yaml
id: realtime-pubsub
namespace: company.team

tasks:
  - id: log
    type: io.kestra.plugin.core.log.Log
    message: "Received: {{ trigger.data }}"

triggers:
  - id: trigger
    type: io.kestra.plugin.gcp.pubsub.RealtimeTrigger
    projectId: test-project-id
    topic: test-topic
    subscription: test-subscription

Properties

serdeType

  • Type: string
  • Dynamic:
  • Required: ✔️
  • Default: STRING
  • Possible Values:
    • STRING
    • JSON

The serializer/deserializer to use.

topic

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

The Pub/Sub topic

The Pub/Sub topic. It must be created before executing the task.

autoCreateSubscription

  • Type: boolean
  • Dynamic:
  • Required:
  • Default: true

Whether the Pub/Sub subscription should be created if not exist

conditions

  • Type: array
  • SubType: Condition
  • Dynamic:
  • Required:

List of conditions in order to limit the flow trigger.

impersonatedServiceAccount

  • Type: string
  • Dynamic: ✔️
  • Required:

The GCP service account to impersonate.

interval

  • Type: string
  • Dynamic:
  • Required:
  • Default: 60.000000000
  • Format: duration

maxDuration

  • Type: string
  • Dynamic:
  • Required:
  • Format: duration

Max duration in the Duration ISO format, after that the task will end.

maxRecords

  • Type: integer
  • Dynamic:
  • Required:

Max number of records, when reached the task will end.

projectId

  • Type: string
  • Dynamic: ✔️
  • Required:

The GCP project ID.

scopes

  • Type: array
  • SubType: string
  • Dynamic: ✔️
  • Required:
  • Default: [https://www.googleapis.com/auth/cloud-platform]

The GCP scopes to be used.

serviceAccount

  • Type: string
  • Dynamic: ✔️
  • Required:

The GCP service account.

stopAfter

  • Type: array
  • SubType: string
  • Dynamic:
  • Required:

List of execution states after which a trigger should be stopped (a.k.a. disabled).

subscription

  • Type: string
  • Dynamic: ✔️
  • Required:

The Pub/Sub subscription

The Pub/Sub subscription. It will be created automatically if it didn't exist and 'autoCreateSubscription' is enabled.

Outputs

attributes

  • Type: object
  • SubType: string
  • Required:

The message attributes map

data

  • Type: object
  • Required:

The message data, must be a string if serde type is 'STRING', otherwise a JSON object

If it's a string, it can be a dynamic property otherwise not.

messageId

  • Type: string
  • Required:

The message identifier

orderingKey

  • Type: string
  • Required:

The message ordering key

Was this page helpful?