Blueprints

Fetch data from Couchbase and transform it with Pandas in Python

About this blueprint

Ingest Python SQL

This flow extracts data from a Couchbase database and then performs transformation (aggregation) on the fetched results using Pandas in a Python script.

The sample bucket travel-sample is used in this blueprint.

You can setup Couchbase locally on Docker using:

bash
docker run -d --name db -p 8091-8096:8091-8096 -p 11210-11211:11210-11211 couchbase

Once the container is up and running, go to http://localhost:8091/ in your browser and select "Setup New Cluster". Set up the new cluster by providing the appropriate Cluster Name, Admin Username, and Password.

Once the cluster is created, load data from Sample Buckets present under the Settings tab. The data from the sample bucket travel-sample is used in this blueprint.

yaml
id: couchbase_to_pandas
namespace: blueprint

tasks:
  - id: "query_couchbase"
    type: "io.kestra.plugin.couchbase.Query"
    connectionString: couchbase://10.57.233.41
    username: admin
    password: admin_password
    query: |
      SELECT id, country, name, type, iata, icao 
      FROM `travel-sample`.`inventory`.`airline`
    fetchType: FETCH
    
  - id: pandas
    type: io.kestra.plugin.scripts.python.Script
    beforeCommands:
      - pip install pandas > /dev/null
    warningOnStdErr: false
    script: |
      import pandas as pd

      data = {{ outputs.query_couchbase.rows }}
      df = pd.DataFrame(data)
      agg_df = df.groupby('country')['country'].count().reset_index(name="count")
      print(agg_df.head())
      agg_df.to_csv("{{ outputDir }}/final.csv", index=False)

Query

Script

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra