IonToAvroIonToAvro
​Ion​To​AvroCertified

Convert an ION file into Avro.

yaml
type: "io.kestra.plugin.serdes.avro.IonToAvro"

Convert a CSV file to the Avro format.

yaml
id: divvy_tripdata
namespace: company.team

variables:
  file_id: "{{ execution.startDate | dateAdd(-3, 'MONTHS') | date('yyyyMM') }}"

tasks:
  - id: get_zipfile
    type: io.kestra.plugin.core.http.Download
    uri: "https://divvy-tripdata.s3.amazonaws.com/{{ render(vars.file_id) }}-divvy-tripdata.zip"

  - id: unzip
    type: io.kestra.plugin.compress.ArchiveDecompress
    algorithm: ZIP
    from: "{{ outputs.get_zipfile.uri }}"

  - id: convert
    type: io.kestra.plugin.serdes.csv.CsvToIon
    from: "{{ outputs.unzip.files[render(vars.file_id) ~ '-divvy-tripdata.csv'] }}"

  - id: to_avro
    type: io.kestra.plugin.serdes.avro.IonToAvro
    from: "{{ outputs.convert.uri }}"
    datetimeFormat: "yyyy-MM-dd' 'HH:mm:ss"
    schema: |
      {
        "type": "record",
        "name": "Ride",
        "namespace": "com.example.bikeshare",
        "fields": [
          {"name": "ride_id", "type": "string"},
          {"name": "rideable_type", "type": "string"},
          {"name": "started_at", "type": {"type": "long", "logicalType": "timestamp-millis"}},
          {"name": "ended_at", "type": {"type": "long", "logicalType": "timestamp-millis"}},
          {"name": "start_station_name", "type": "string"},
          {"name": "start_station_id", "type": "string"},
          {"name": "end_station_name", "type": "string"},
          {"name": "end_station_id", "type": "string"},
          {"name": "start_lat", "type": "double"},
          {"name": "start_lng", "type": "double"},
          {
            "name": "end_lat",
            "type": ["null", "double"],
            "default": null
          },
          {
            "name": "end_lng",
            "type": ["null", "double"],
            "default": null
          },
          {"name": "member_casual", "type": "string"}
        ]
      }
Properties

Source file URI

Defaultyyyy-MM-dd[XXX]

Format to use when parsing date

Defaultyyyy-MM-dd'T'HH:mm[:ss][.SSSSSS][XXX]

Format to use when parsing datetime

Default value is yyyy-MM-dd'T'HH: mm[: ss][.SSSSSS]XXX

Default.

Character to recognize as decimal point (e.g. use ‘,’ for European data).

Default value is '.'

SubTypestring
Default["f","false","disabled","0","off","no",""]

Values to consider as False

Defaultfalse

Try to infer all fields

If true, we try to infer all fields using trueValues, falseValues, and nullValues.If false, we infer booleans and nulls only on fields declared in the schema as null or bool.

SubTypestring
Default["","#N/A","#N/A N/A","#NA","-1.#IND","-1.#QNAN","-NaN","1.#IND","1.#QNAN","NA","n/a","nan","null"]

Values to consider as null

Default100

Number of rows that will be scanned while inferring. The more rows scanned, the more precise the output schema will be.

Only use when the 'schema' property is empty

DefaultERROR
Possible Values
ERRORWARNSKIP

How to handle bad records (e.g., null values in non-nullable fields or type mismatches).

Can be one of: FAIL, WARN or SKIP.

The avro schema associated with the data

If empty, the task will try to infer the schema from the current data; use the 'numberOfRowsToScan' property if needed

Defaultfalse

Whether to consider a field present in the data but not declared in the schema as an error

Default value is false

DefaultHH:mm[:ss][.SSSSSS][XXX]

Format to use when parsing time

DefaultEtc/UTC

Timezone to use when no timezone can be parsed on the source.

If null, the timezone defaults to UTC. Default value is the system timezone

SubTypestring
Default["t","true","enabled","1","on","yes"]

Values to consider as True

Formaturi

URI of a temporary result file

Number of records converted