Source
yaml
id: cassandra-to-bigquery
namespace: company.team
tasks:
- id: query_cassandra
type: io.kestra.plugin.cassandra.Query
session:
endpoints:
- hostname: localhost
port: 9042
localDatacenter: datacenter1
cql: |
SELECT salary_id, work_year, experience_level, employment_type,
job_title, salary, salary_currency, salary_in_usd, employee_residence,
remote_ratio, company_location, company_size
FROM test.salary
store: true
- id: write_to_csv
type: io.kestra.plugin.serdes.csv.IonToCsv
from: "{{ outputs.query_cassandra.uri }}"
- id: load_bigquery
type: io.kestra.plugin.gcp.bigquery.Load
from: "{{ outputs.write_to_csv.uri }}"
destinationTable: my_project.my_dataset.my_table
serviceAccount: "{{ secret('GCP_SERVICE_ACCOUNT_JSON') }}"
projectId: my_project
format: CSV
csvOptions:
fieldDelimiter: ","
skipLeadingRows: 1
About this blueprint
Ingest BigQuery SQL
This flow extracts data from a Cassandra table, writes it to a CSV file, and then loads the CSV data into BigQuery. It uses data from the following public dataset.
The GCP credentials are extracted from the environment variable GCP_SERVICE_ACCOUNT_JSON.
You need to create the table in BigQuery prior to executing this flow, or you can choose to create it via this flow by adding the schema to the BigQuery task.
You can set up Cassandra locally with Docker using: docker run -p 9042:9042 cassandra
.
- Use
docker ps
to get the container ID where Cassandra is running. - Then,cqlsh
into Cassandra usingdocker exec -it <contianer_id> cqlsh
. - Finally, run the following commands to create the table in Cassandra:
sql
USE test;
CREATE TABLE salary(
salary_id int primary key,
work_year int,
experience_level text,
employment_type text,
job_title text,
salary int,
salary_currency text,
salary_in_usd int,
employee_residence text,
remote_ratio int,
company_location text,
company_size text
); ```
Then, either use a COPY command: `COPY salary (salary_id, work_year, experience_level, employment_type, job_title, salary, salary_currency, salary_in_usd, employee_residence, remote_ratio, company_location, company_size) FROM '/yourfile.csv' WITH HEADER = TRUE;`
Or run a few INSERT commands:
```sql insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (1, 2023,'EN','FT','Data Analyst',75000,'USD',75000,'US',100,'US','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (2, 2023,'EN','FT','Data Analyst',60000,'USD',60000,'US',100,'US','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (3, 2023,'MI','FT','Analytics Engineer',185700,'USD',185700,'US',0,'US','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (4, 2023,'MI','FT','Analytics Engineer',165000,'USD',165000,'US',0,'US','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (5, 2023,'SE','FT','Data Engineer',160000,'USD',160000,'US',100,'US','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (6, 2023,'SE','FT','Data Engineer',130000,'USD',130000,'US',100,'US','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (7, 2023,'SE','FT','Data Analyst',169000,'USD',169000,'US',0,'US','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (8, 2023,'SE','FT','Data Analyst',110600,'USD',110600,'US',0,'US','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (9, 2023,'SE','FT','Data Operations Engineer',193000,'USD',193000,'US',100,'US','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (10, 2023,'SE','FT','Data Operations Engineer',136850,'USD',136850,'US',100,'US','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (11, 2023,'SE','FT','Machine Learning Engineer',139500,'USD',139500,'US',0,'US','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (12, 2023,'SE','FT','Machine Learning Engineer',109400,'USD',109400,'US',0,'US','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (13, 2023,'SE','FT','Data Engineer',276000,'USD',276000,'US',100,'US','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (14, 2023,'SE','FT','Data Engineer',178500,'USD',178500,'US',100,'US','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (15, 2023,'MI','FT','Data Scientist',55000,'EUR',59188,'ES',0,'ES','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (16, 2023,'MI','FT','Data Scientist',45000,'EUR',48426,'ES',0,'ES','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (17, 2023,'MI','FT','Data Engineer',70000,'EUR',75330,'SI',100,'SI','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (18, 2023,'MI','FT','Data Engineer',45000,'EUR',48426,'SI',100,'SI','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (19, 2023,'SE','FT','Machine Learning Engineer',161000,'GBP',195940,'GB',0,'GB','M');
insert into salary(salary_id, work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size) values (20, 2023,'SE','FT','Machine Learning Engineer',83300,'GBP',101378,'GB',0,'GB','M'); ```
More Related Blueprints