Blueprints

Query data from Dremio and process it in Polars with a Python script

About this blueprint

Outputs Python SQL

This flow queries data from a Dremio lakehouse and passes the query results to a downstream Python task that further transforms fetched data using Polars. The only required inputs are the project ID and a Dremio token.

The project ID can be found in your Dremio URL. For example, if your Dremio URL is https://app.dremio.cloud/sonar/ead79cc0-9e93-4d50-b364-77639a56d4a6, then your project ID is the last string ead79cc0-9e93-4d50-b364-77639a56d4a6.

To create a Dremio token, go to your Dremio account settings and then to the section "Personal Access Token". From here, you can create the token, copy it and store it as a kestra secret. For more detailed description of how to create your token, visit the following Dremio documentation. It's recommended to store the token as a Secret to avoid exposing it directly in your flow.

If you are new to Dremio, the easiest way to familiarize yourself with the platform is to create a Dremio Test Drive account, which provides you with a Dremio instance and sample datasets to query. You can reproduce this flow without any changes using the test drive account.

yaml
id: dremio_sql_python
namespace: blueprint

variables:
  project_id: ead79cc0-9e93-4d50-b364-77639a56d4a6

tasks:
  - id: query
    type: io.kestra.plugin.jdbc.dremio.Query
    disabled: false
    url: "jdbc:dremio:direct=sql.dremio.cloud:443;ssl=true;PROJECT_ID={{vars.project_id}};schema=postgres.public"
    username: $token
    password: "{{ secret('DREMIO_TOKEN') }}"
    sql: SELECT first_name, last_name, hire_date, salary FROM postgres.public.employees LIMIT 100;
    store: true

  - id: python
    type: io.kestra.plugin.scripts.python.Script
    warningOnStdErr: false
    docker:
      image: ghcr.io/kestra-io/polars:latest
    script: |
      import polars as pl
      import amazon.ion.simpleion as ion
      from amazon.ion.simple_types import IonPyDecimal, IonPyNull


      def convert_ion_types(value):
        if isinstance(value, IonPyNull):
            return None
        elif isinstance(value, IonPyDecimal):
            return float(value)
        else:
            return value

      uri = "{{outputs.query.uri}}"
      with open(uri, 'rb') as f:
          ion_content = f.read()

      ion_data = ion.loads(ion_content, single_value=False)
      list_of_dicts = [dict(record) for record in ion_data]
      list_of_dicts = [
        {k: convert_ion_types(v) for k, v in record.items()} for record in list_of_dicts
      ]

      polars_df = pl.DataFrame(list_of_dicts)
      print(polars_df.glimpse())

Query

Script

More Related Blueprints

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra