Here are the instructions to develop a new task.

Runnable Task

Here is a simple Runnable Task that reverses a string

Let's look at this one more deeply:

Class annotations

java
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor

These annotations required to make your plugin work with Kestra. They are Lombok annotations and allow Kestra and its internal serialization to work properly.

Class declaration

java
public class ReverseString extends Task implements RunnableTask<ReverseString.Output>
  • ReverseString is the name of your task, it can be used in Kestra with type: io.kestra.plugin.templates.ReverseString (aka: {{package}}.{{className}}).
  • The task class must extend Task, this is the base class for all tasks.
  • The task class must implement RunnableTask as it's a task that must run on the Worker, and must declare its output which is here of type ReverseString.Output.

Properties

java
    private Property<String> string;

All task properties must be declared as task class attributes. They will be passed to the task by the flow at execution time. If you want your attribute to be dynamic, you need to wrap the type of your attribute into the Property type. Dynamic properties are explained later.

For example, this will be a valid YAML for using this task. It uses an output from a previous task as its property, which is possible thanks to dynamic properties rendering inside the task via runContext.render(string).as(String.class).orElse(null):

yaml
type: io.kestra.plugin.templates.ReverseString
string: "{{ outputs.previousTask.name }}"

You can declare as many properties as you want.

You can use any serializable types by Jackson for your properties (ex: Double, boolean, ...). You can create any class as long as the class is Serializable.

Properties validation

Properties can be validated using jakarta.validation.constraints.* annotations. When the user creates a flow, your task properties will be validated before insertion and prevent any wrong flow definition from being saved.

The default available annotations are:

  • @Positive
  • @AssertFalse
  • @AssertTrue
  • @Max
  • @Min
  • @Negative
  • @NegativeOrZero
  • @Positive
  • @PositiveOrZero
  • @NotBlank
  • @NotNull
  • @Null
  • @NotEmpty
  • @Past
  • @PastOrPresent
  • @Future
  • @FutureOrPresent

You can also create your own custom validation. To do so, first define the annotation as follows:

java
@Retention(RetentionPolicy.RUNTIME)
@Constraint(validatedBy = { })
public @interface CronExpression {
    String message() default "invalid cron expression ({validatedValue})";
}

Then, define a factory to inject the validator:

java
@Factory
public class ValidationFactory {
    private static final CronParser CRON_PARSER = new CronParser(CronDefinitionBuilder.instanceDefinitionFor(CronType.UNIX));

    @Singleton
    ConstraintValidator<CronExpression, CharSequence> cronExpressionValidator() {
        return (value, annotationMetadata, context) -> {
            if (value == null) {
                return true;
            }

            try {
                Cron parse = CRON_PARSER.parse(value.toString());
                parse.validate();
            } catch (IllegalArgumentException e) {
                return false;
            }

            return true;
        };
    }
}

Run

java
    @Override
    public ReverseString.Output run(RunContext runContext) throws Exception {
      Logger logger = runContext.logger();

      String render = runContext.render(string).as(String.class).orElse(null);
      logger.debug(render);

      return Output.builder()
          .reversed(StringUtils.reverse(render))
          .build();
    }

The run method is where the main logic of your task will do all the work needed. You can use any Java code here with any required libraries as long as you have declared them in the Gradle configuration.

Log

java
Logger logger = runContext.logger();

You can access a logger via the run context. The run context will provide a logger for the current execution. Do not create your own custom logger so logs can be tracked inside Kestra.

Dynamic properties rendering

java
String rendered = runContext.render(string).as(String.class).orElse(null);

Kestra supports expressions as tasks parameters. To use them, your task attribute must be encapsulated into the Property carrier type.

A dynamic property must be rendered before usage; this will use our templating engine, Pebble, to render the property into the target type. Rendering properties using the Property carrier type via the run context is null-safe, it will return an empty Optional or an empty collection for lists and maps.

Dynamic properties supports all Java types that can be serialized via Jackson, for example, for using a Duration you can do:

java
// property definition
private Property<Duration> duration;

@Override
public Output run(RunContext runContext) throws Exception {
  Duration rendered = runContext.render(duration).as(Duration.class).orElse(null);
  // [...]
}

You can provide a default value at property definition time via Property.of():

java
private Property<Duration> duration = Property.of(Duration.ofSeconds(10));

Lists are supported via Property<List<String>>, for example:

java
// property definition
private Property<List<String>> list;

@Override
public Output run(RunContext runContext) throws Exception {
  List<String> rendered = runContext.render(list).asList(String.class);
  // [...]
}

Maps are supported via Property<Map<String, String>>, for example:

java
// property definition
private Property<Map<String, String>> map;

@Override
public Output run(RunContext runContext) throws Exception {
  Map<String, String> rendered = runContext.render(map).asMap(String.class, String.class);
  // [...]
}

Kestra uses a special type to carry data in a flexible way: Data. Data can be built via three different types or properties: a URI (which will points to a Kestra internal storage file), a list of map (for defining multiple items), or a map (for a single item). Thanks to this, the task user can pass data to it in a very flexible way, we strongly encourage you to use this type when it fits your needs.

Here is an example that defines a Data attibute of type Message, at run time you will need to render this property and map the message from a Map. It uses Project Reactor Flux under the cover to allow processing items one by one in a reactive manner allowing to process an arbitrary number of items. When couple with our internal storage files, it can process files or billions of items if needed:

java
// property definition
@NotNull
private Data<Message> data;

@Override
public Output run(RunContext runContext) throws Exception {
  List<Message> outputMessages = data.flux(runContext, Message.class, message -> Message.fromMap(message))
      .collectList()
      .block();
  // [...]
}

At flow definition, the user must define how to define the data property.

For using it with an internal storage file (in the ION format), which would allow to process the file line by line:

yaml
type: io.kestra.plugin.myplugin.MyTask
data:
  fromURI: "{{inputs.file}}"

For using it with a single item as a map:

yaml
type: io.kestra.plugin.myplugin.MyTask
data:
  fromMap:
    prop1: "{{inputs.value1}}"
    prop2: "{{inputs.value2}}"

For using it with a list of items as a list of maps:

yaml
type: io.kestra.plugin.myplugin.MyTask
data:
  fromList:
    - prop1: "{{inputs.value1}}"
      prop2: "{{inputs.value2}}"
    - prop1: "{{inputs.value3}}"
      prop2: "{{inputs.value4}}"

Static properties or the old @PluginProperty annotation

Alternatively to using the Property carrier type, if you don't want or can't use dynamic properties using the Property carrier type, you can define you task property like this:

java
    @PluginProperty
    private String string;

Always use the @PluginProperty if you don't use the Property carrier type for the task JSON Schema and documentation to be property generated. You can even use @PluginProperty(dynamic = true) and render the property in other ways that I will not explain here, this was the old way to deal with dynamic properties, but it's now strongly dis-encouraged.

Kestra internal storage

The run context object has a storage() method that allows accessing the Kestra internal storage. It also has a workingDir() method which allow managing files inside the task working directory, this is important to use it for all files manipulations, to avoid any task going out of its working directory for security reasons, the WorkingDir object will protect your task against that.

Example of reading a file from the internal storage:

java
final URI from = new URI(runContext.render(this.from).as(String.class).orElseThrow());
final InputStream inputStream = runContext.storage().getFile(from);

It will return an InputStream that will read the file from the internal storage.

You can also write files to Kestra's internal storage using runContext.storage().putFile(File file). The local file must be created inside the working directory and will be deleted after being put inside the internal storage.

java
File file = runContext.workingDir().createFile("items.csv");
// [...] -> fill the file
URI uri = runContext.storage().putFile(file);
// return the uri inside an Output so it can be used by other tasks

If a file with the same name already exist, the call to createFile() will fail, to avoid that you can use runContext.workingDir().createTempFile(".csv") instead, which will generate a unique file name for you.

runContext.storage().putFile() will return a URI pointing to the file inside the internal storage, for this file to be used by other tasks and available inside the execution outputs; you must return it as one of your task outputs.

Outputs

java
public class ReverseString extends Task implements RunnableTask<ReverseString.Output> {
    @Override
    public ReverseString.Output run(RunContext runContext) throws Exception {
        return Output.builder()
            .reversed(StringUtils.reverse(render))
            .build();
    }

    @Builder
    @Getter
    public static class Output implements io.kestra.core.models.tasks.Output {
        @Schema(
            title = "The reversed string"
        )
        private final String reversed;
    }
}

Each task must return an object of type io.kestra.core.models.tasks.Output with the output properties that would be available for the next tasks and as execution outputs.

You can add as many properties as you want, just keep in mind that outputs need to be serializable. At execution time, outputs can be accessed by downstream tasks by leveraging outputs expressions e.g. {{ outputs.task_id.output_attribute }}.

Tasks outputs will be stored inside the execution context, they are not designed to store data but task execution metadata, to store data, use an internal storage file and return the file URI inside the output.

If your task doesn't have any outputs, you can use io.kestra.core.models.tasks.VoidOutput and returns null:

java
public class NoOutput extends Task implements FlowableTask<VoidOutput> {
    @Override
    public VoidOutput run(RunContext runContext) throws Exception {
        return null;
    }
}

Exception

In the run method, you can throw any Java Exception, they will be caught by Kestra and will fail the execution. We advise you to throw any Exception that can fail your task as soon as possible.

Metrics

You can expose metrics to add observability to your task. Metrics will be recorded within the execution, and can be accessed via the UI or as Prometheus metrics.

There are two kinds of metrics available:

  • Counter: Counter.of("your.counter", count, tags); with args
    • String name: The name of the metric
    • Double|Long|Integer|Float count: the associated counter
    • String... tags: a list of tags associated with your metric
  • Timer: Timer.of("your.duration", duration, tags);
    • String name: The name of the metric
    • Duration duration: the recorded duration
    • String... tags: a list of tags associated with your metric

To save metrics with the execution, you need to use runContext.metric(metric).

Name

Must be lowercase separated by dots.

Tags

Must be pairs of tag key and value. An example of two valid tags (zone and location) is:

java
Counter.of("your.counter", count, "zone", "EU", "location", "France");

Documentation

Remember to document your tasks. For this, we provide a set of annotations explained in the Document each plugin section.

Flowable Task

Flowable tasks are the most complex tasks to develop, and will usually be available from Kestra core. You will rarely need to create a flowable task by yourself.

Keep in mind that a flowable task will be evaluated very frequently inside the Executor and must have low CPU usage; no I/O should be done by this kind of task.

In the future, complete documentation will be available here. In the meantime, you can find all the actual Flowable tasks here to have some inspiration for Sequential or Parallel tasks development.

Was this page helpful?