Blueprints

Extract data from Cassandra into a CSV file, and load it to BigQuery

About this blueprint

BigQuery Ingest SQL

This flow extracts data from a Cassandra table, writes it to a CSV file, and then loads the CSV data into BigQuery. It uses data from the following public dataset.

The GCP credentials are extracted from the environment variable GCP_SERVICE_ACCOUNT_JSON. You need to create the table in BigQuery prior to executing this flow, or you can choose to create it via this flow by adding the schema to the BigQuery task.

You can set up Cassandra locally with Docker using: docker run -p 9042:9042 cassandra.

  • Use docker ps to get the container ID where Cassandra is running.
  • Then, cqlsh into Cassandra using docker exec -it <contianer_id> cqlsh.
  • Finally, run the following commands to create the table in Cassandra:
sql
CREATE KEYSPACE test;
USE test;

CREATE TABLE salary(
  salary_id int primary key,
  work_year int,
  experience_level text,
  employment_type text,
  job_title text,
  salary int,
  salary_currency text,
  salary_in_usd int,
  employee_residence text,
  remote_ratio int,
  company_location text,
  company_size text
);

Then, either use a COPY command: COPY salary (salary_id, work_year, experience_level, employment_type, job_title, salary, salary_currency, salary_in_usd, employee_residence, remote_ratio, company_location, company_size) FROM '/yourfile.csv' WITH HEADER = TRUE;

Or run a few INSERT commands:

sql
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (1, 2023,'EN','FT','Data Analyst',75000,'USD',75000,'US',100,'US','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (2, 2023,'EN','FT','Data Analyst',60000,'USD',60000,'US',100,'US','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (3, 2023,'MI','FT','Analytics Engineer',185700,'USD',185700,'US',0,'US','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (4, 2023,'MI','FT','Analytics Engineer',165000,'USD',165000,'US',0,'US','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (5, 2023,'SE','FT','Data Engineer',160000,'USD',160000,'US',100,'US','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (6, 2023,'SE','FT','Data Engineer',130000,'USD',130000,'US',100,'US','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (7, 2023,'SE','FT','Data Analyst',169000,'USD',169000,'US',0,'US','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (8, 2023,'SE','FT','Data Analyst',110600,'USD',110600,'US',0,'US','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (9, 2023,'SE','FT','Data Operations Engineer',193000,'USD',193000,'US',100,'US','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (10, 2023,'SE','FT','Data Operations Engineer',136850,'USD',136850,'US',100,'US','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (11, 2023,'SE','FT','Machine Learning Engineer',139500,'USD',139500,'US',0,'US','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (12, 2023,'SE','FT','Machine Learning Engineer',109400,'USD',109400,'US',0,'US','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (13, 2023,'SE','FT','Data Engineer',276000,'USD',276000,'US',100,'US','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (14, 2023,'SE','FT','Data Engineer',178500,'USD',178500,'US',100,'US','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (15, 2023,'MI','FT','Data Scientist',55000,'EUR',59188,'ES',0,'ES','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (16, 2023,'MI','FT','Data Scientist',45000,'EUR',48426,'ES',0,'ES','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (17, 2023,'MI','FT','Data Engineer',70000,'EUR',75330,'SI',100,'SI','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (18, 2023,'MI','FT','Data Engineer',45000,'EUR',48426,'SI',100,'SI','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (19, 2023,'SE','FT','Machine Learning Engineer',161000,'GBP',195940,'GB',0,'GB','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (20, 2023,'SE','FT','Machine Learning Engineer',83300,'GBP',101378,'GB',0,'GB','M');
yaml
id: cassandra_to_bigquery
namespace: blueprint

tasks:
  # The data from https://gist.githubusercontent.com/Ben8t/f182c57f4f71f350a54c65501d30687e/raw/940654a8ef6010560a44ad4ff1d7b24c708ebad4/salary-data.csv has been pushed into Cassandra for this example. 
  - id: query_cassandra
    type: io.kestra.plugin.cassandra.Query
    session:
      endpoints:
        - hostname: localhost
          port: 9042
      localDatacenter: datacenter1
    cql: |
      SELECT salary_id, work_year, experience_level, employment_type, 
      job_title, salary, salary_currency, salary_in_usd, employee_residence,
      remote_ratio, company_location, company_size
      FROM test.salary
    store: true

  - id: write_to_csv
    type: io.kestra.plugin.serdes.csv.CsvWriter
    from: "{{ outputs.query_cassandra.uri }}" 

  - id: load_bigquery
    type: "io.kestra.plugin.gcp.bigquery.Load"
    from: "{{ outputs.write_to_csv.uri }}"
    destinationTable: "my_project.my_dataset.my_table"
    serviceAccount: "{{ secret('GCP_SERVICE_ACCOUNT_JSON') }}"
    projectId: "my_project"
    format: CSV
    csvOptions:
      fieldDelimiter: ","
      skipLeadingRows: 1

Query

Csv Writer

Load

More Related Blueprints

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra