Blueprints

Load time series data into InfluxDB

Source

yaml
id: influxdb-load-query-data.yaml
namespace: company.team

variables:
  influxdb_url: "{{ secrets('INFLUXDB_URL') }}"
  api_token: "{{ secrets('INFLUXDB_TOKEN') }}"

inputs:
  - id: city
    type: STRING
    defaults: "Paris"

tasks:
  - id: geocoding
    type: io.kestra.plugin.core.http.Request
    uri: https://geocoding-api.open-meteo.com/v1/search?name={{ inputs.city
      }}&count=1&language=en&format=json

  - id: geo_output
    type: io.kestra.plugin.core.output.OutputValues
    values:
      latitude: '{{ json(outputs.geocoding.body).results[0].latitude ?? 52 }}'
      longitude: '{{ json(outputs.geocoding.body).results[0].longitude ?? 13 }}'

  - id: download_weather_data
    type: io.kestra.plugin.core.http.Download
    uri: https://archive-api.open-meteo.com/v1/archive?latitude={{
      outputs.geo_output.values.latitude }}&longitude={{
      outputs.geo_output.values.longitude
      }}&start_date=2024-01-01&end_date=2024-01-01&hourly=temperature_2m

  - id: normalize_data
    type: io.kestra.plugin.jdbc.duckdb.Query
    store: true
    inputFiles:
      data.json: "{{ outputs.download_weather_data.uri }}"
    sql: |
      INSTALL json;
      LOAD json;
      COPY(
        SELECT
          unnest(hourly.time) AS time,
          unnest(hourly.temperature_2m) AS temperature_2m,
          latitude AS latitude,
          longitude AS longitude,
          '{{ inputs.city }}' AS city
        FROM '{{workingDir}}/data.json'
      ) TO 
      '{{outputFiles["result.csv"]}}' (HEADER, DELIMITER ',');
    outputFiles:
      - result.csv

  - id: csv_to_ion
    type: io.kestra.plugin.serdes.csv.CsvToIon
    from: "{{ outputs.normalize_data.outputFiles['result.csv'] }}"

  - id: load
    type: io.kestra.plugin.influxdb.Load
    connection:
      url: "{{ secrets('INFLUXDB_URL') }}"
      token: "{{ secrets('INFLUXDB_TOKEN') }}"
    org: "test"
    bucket: "test"
    from: "{{ outputs.csv_to_ion.uri }}"
    measurement: "temperature_2m"

About this blueprint

Database

This flow downloads and normalizes time series data and loads it into InfluxDB

Request

Output Values

Download

Query

Csv To Ion

Load

More Related Blueprints

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra