id: auditlogs-to-bigquery
- id: consume
type: io.kestra.plugin.kafka.Consume
auto.offset.reset: earliest
bootstrap.servers: local:9092
topic: kestra_auditlogs
valueDeserializer: JSON
maxRecords: 500
- id: transform
type: io.kestra.plugin.scripts.nashorn.FileTransform
from: "{{ outputs.consume.uri }}"
script: |
var jacksonMapper = Java.type('io.kestra.core.serializers.JacksonMapper');
delete row['headers'];
var value = row['value']
row['id'] = value['id']
row['type'] = value['type']
row['detail'] = value['detail']
row['date'] = value['date']
row['deleted'] = value['deleted']
row['value'] = jacksonMapper.ofJson().writeValueAsString(value)
row['detail_type'] = value['detail']['type']
row['detail_cls'] = value['detail']['cls']
row['detail_permission'] = value['detail']['permission']
row['detail_id'] = value['detail']['id']
row['detail_namespace'] = value['detail']['namespace']
row['detail_flowId'] = value['detail']['flowId']
row['detail_executionId'] = value['detail']['executionId']
- id: avro
type: io.kestra.plugin.serdes.avro.IonToAvro
from: "{{ outputs.transform.uri }}"
description: convert the file from Kestra internal storage to avro.
schema: |
"type": "record",
"name": "Root",
{ "name": "id", "type": ["null", "string"] },
{ "name": "type", "type": ["null", "string"] },
{ "name": "detail", "type": ["null", "string"] },
{ "name": "date", "type": ["null", "string"] },
{ "name": "deleted", "type": ["null", "string"] },
{ "name": "value", "type": ["null", "string"] },
{ "name": "detail_type", "type": ["null", "string"] },
{ "name": "detail_cls", "type": ["null", "string"] },
{ "name": "detail_permission", "type": ["null", "string"] },
{ "name": "detail_id", "type": ["null", "string"] },
{ "name": "detail_namespace", "type": ["null", "string"] },
{ "name": "detail_flowId", "type": ["null", "string"] },
{ "name": "detail_executionId", "type": ["null", "string"] }
- id: load
type: io.kestra.plugin.gcp.bigquery.Load
useAvroLogicalTypes: true
destinationTable: your_gcp_project.dwh.autditlogs
format: AVRO
from: "{{outputs.avro.uri }}"
writeDisposition: WRITE_TRUNCATE
serviceAccount: "{{ secret('GCP_CREDS') }}"
projectId: your_gcp_project
- id: schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: 0 10 * * *
About this blueprint
Trigger Ingest BigQuery Outputs
This flow shows how to stream Kestra audit logs to BigQuery. The audit logs
are stored in the kestra_auditlogs
Kafka topic. The flow consumes the
audit logs from the Kafka topic, transforms the data, and loads it into
The flow is triggered every day at 10 AM UTC. You can customize the trigger by changing the cron expression, timezone and more. For more information about cron expressions, visit the following documentation.