Source
yaml
id: influxdb-load-query-data.yaml
namespace: company.team
variables:
influxdb_url: "{{ secrets('INFLUXDB_URL') }}"
api_token: "{{ secrets('INFLUXDB_TOKEN') }}"
inputs:
- id: city
type: STRING
defaults: "Paris"
tasks:
- id: geocoding
type: io.kestra.plugin.core.http.Request
uri: https://geocoding-api.open-meteo.com/v1/search?name={{ inputs.city
}}&count=1&language=en&format=json
- id: geo_output
type: io.kestra.plugin.core.output.OutputValues
values:
latitude: '{{ json(outputs.geocoding.body).results[0].latitude ?? 52 }}'
longitude: '{{ json(outputs.geocoding.body).results[0].longitude ?? 13 }}'
- id: download_weather_data
type: io.kestra.plugin.core.http.Download
uri: https://archive-api.open-meteo.com/v1/archive?latitude={{
outputs.geo_output.values.latitude }}&longitude={{
outputs.geo_output.values.longitude
}}&start_date=2024-01-01&end_date=2024-01-01&hourly=temperature_2m
- id: normalize_data
type: io.kestra.plugin.jdbc.duckdb.Query
store: true
inputFiles:
data.json: "{{ outputs.download_weather_data.uri }}"
sql: |
INSTALL json;
LOAD json;
COPY(
SELECT
unnest(hourly.time) AS time,
unnest(hourly.temperature_2m) AS temperature_2m,
latitude AS latitude,
longitude AS longitude,
'{{ inputs.city }}' AS city
FROM '{{workingDir}}/data.json'
) TO
'{{outputFiles["result.csv"]}}' (HEADER, DELIMITER ',');
outputFiles:
- result.csv
- id: csv_to_ion
type: io.kestra.plugin.serdes.csv.CsvToIon
from: "{{ outputs.normalize_data.outputFiles['result.csv'] }}"
- id: load
type: io.kestra.plugin.influxdb.Load
connection:
url: "{{ secrets('INFLUXDB_URL') }}"
token: "{{ secrets('INFLUXDB_TOKEN') }}"
org: "test"
bucket: "test"
from: "{{ outputs.csv_to_ion.uri }}"
measurement: "temperature_2m"
About this blueprint
Database
This flow downloads and normalizes time series data and loads it into InfluxDB
More Related Blueprints