Here are the instructions to develop a new task.

Runnable Task

Here is a simple Runnable Task that reverses a string:

java
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
    title = "Reverse a string",
    description = "Reverse all letters from a string"
)
public class ReverseString extends Task implements RunnableTask<ReverseString.Output> {
    @Schema(
        title = "The base string you want to reverse"
    )
    @PluginProperty(dynamic = true)
    private String format;

    @Override
    public ReverseString.Output run(RunContext runContext) throws Exception {
        Logger logger = runContext.logger();

        String render = runContext.render(format);
        logger.debug(render);

        return Output.builder()
            .reverse(StringUtils.reverse(render))
            .build();
    }

    @Builder
    @Getter
    public static class Output implements io.kestra.core.models.tasks.Output {
        @Schema(
            title = "The reverse string "
        )
        private final String reverse;
    }
}

Let look at these one deeply:

Class annotations

java
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor

These are required in order to make your plugin works with Kestra, these are Lombok annotations that allow Kestra and its internal serialization to work properly.

Class declaration

java
public class ReverseString extends Task implements RunnableTask<ReverseString.Output>
  • ReverseString is the name of your task and can be use on Kestra with type: io.kestra.plugin.templates.ReverseString (aka: {{package}}.{{className}}).
  • Class must extends Task to be usable.
  • implements RunnableTask<ReverseString.Output>: must implements RunnableTask to be discovered and must declared the output of the task ReverseString.Output.

Properties

java
    @PluginProperty(dynamic = true)
    private String format;

Declare all the properties that you can pass to the current task on a flow. For example, this will be a valid yaml for this task:

yaml
type: io.kestra.plugin.templates.ReverseString
format: "{{outputs.previousTask.name}}"

You can declare as many properties as you want, all of these will be filled by Kestra executors.

You can use any serializable by Jackson for your properties (ex: Double, boolean, ...). You can create any class as long as the class is Serializable.

Properties validation

Properties can be validated using javax.validation.constraints.* annotations. When the user creates a flow, your task properties will be validated before insertion and prevent wrong definition to be inserted.

The default available annotations are:

  • @Positive
  • @AssertFalse
  • @AssertTrue
  • @Max
  • @Min
  • @Negative
  • @NegativeOrZero
  • @Positive
  • @PositiveOrZero
  • @NotBlank
  • @NotNull
  • @Null
  • @NotEmpty
  • @Past
  • @PastOrPresent
  • @Future
  • @FutureOrPresent

You can also create your custom validation, you must defined the annotation:

java
@Retention(RetentionPolicy.RUNTIME)
@Constraint(validatedBy = { })
public @interface CronExpression {
    String message() default "invalid cron expression ({validatedValue})";
}

and a factory to inject the validation method:

java
@Factory
public class ValidationFactory {
private static final CronParser CRON_PARSER = new CronParser(CronDefinitionBuilder.instanceDefinitionFor(CronType.UNIX));

    @Singleton
    ConstraintValidator<CronExpression, CharSequence> cronExpressionValidator() {
        return (value, annotationMetadata, context) -> {
            if (value == null) {
                return true;
            }

            try {
                Cron parse = CRON_PARSER.parse(value.toString());
                parse.validate();
            } catch (IllegalArgumentException e) {
                return false;
            }

            return true;
        };
    }
}

Run

java
    @Override
    public ReverseString.Output run(RunContext runContext) throws Exception {
        Logger logger = runContext.logger();

        String render = runContext.render(format);
        logger.debug(render);

        return Output.builder()
            .reverse(StringUtils.reverse(render))
            .build();
    }

The run method is where the main logic of your task will do all the works needed. You can used any Java code here with any libs you need until you have declared it in the Gradle configuration.

Log

java
Logger logger = runContext.logger();

To have a logger, you need to use this instruction, this will provide a logger for the current execution and will be log properly, don't have your own logger in order to track log on the UI.

Render variables

java
String render = runContext.render(format);

In order to use dynamic expressions, you need to render them i.e. transform the properties with Pebble. Just don't forget to render variables if you need to pass some output from previous variables.

You also need to add the annotation @PluginProperty(dynamic = true) in order to explain in the documentation that you can pass some dynamic variables. Provide a @PluginProperty annotation even if you didn't set any of its attributes for all variables or the generated documentation will not be accurate.

Kestra storage

You can read any files from Kestra storage using the method runContext.uriToInputStream()

java
final URI from = new URI(runContext.render(this.from));
final InputStream inputStream = runContext.uriToInputStream(from);

You will get an InputStream in order to read the file from Kestra storage (coming from inputs or task outputs).

You can also write files to Kestra storage using runContext.putTempFile(File file). The local file will be deleted, so you must use a temporary file.

java
File tempFile = File.createTempFile("concat_", "");
runContext.putTempFile(tempFile)

Don't forgot to provide Outputs with the link generated by putTempFile in order to be usable by others tasks.

Outputs

java
public class ReverseString extends Task implements RunnableTask<ReverseString.Output> {
    @Override
    public ReverseString.Output run(RunContext runContext) throws Exception {
        return Output.builder()
            .reverse(StringUtils.reverse(render))
            .build();
    }

    @Builder
    @Getter
    public static class Output implements io.kestra.core.models.tasks.Output {
        @Schema(
            title = "The reverse string "
        )
        private final String reverse;
    }
}

Each task must return a class instance with outputs values that can be used from next tasks. You must return a class that implements io.kestra.core.models.tasks.Output. You can add as many properties as you want, just keep in mind that outputs needs to be serializable. At execution time, outputs can be accessed by downstream tasks by leveraging outputs expressions e.g. {{ outputs.task_id.output_attribute }}.

If your task don't provide any outputs (mostly never), you use io.kestra.core.models.tasks.VoidOutput:

java
public class NoOutput extends Task implements FlowableTask<VoidOutput> {
    @Override
    public VoidOutput run(RunContext runContext) throws Exception {
        return null;
    }
}

Exception

In the run method, you can throw any Exception that will be catch by Kestra and will fail the execution. We advise you to throw any Exception that can break your task as soon as possible.

Metrics

You can expose metrics to add observability to your task. Metrics will be recorded with the execution and can be accessed via the UI or as Prometheus metrics.

There are two kinds of metrics available:

  • Counter: Counter.of("your.counter", count, tags); with args
    • String name: The name of the metric
    • Double|Long|Integer|Float count: the associated counter
    • String... tags: a list of tags associated with your metric
  • Timer: Timer.of("your.duration", duration, tags);
    • String name: The name of the metric
    • Duration duration: the recorded duration
    • String... tags: a list of tags associated with your metric

To save metrics with the execution, you need to use runContext.metric(metric).

Name

Must be lowercase separated by dots.

Tags

Must be pairs of tag key and value. An example of two valid tags (zone and location) is:

java
Counter.of("your.counter", count, "zone", "EU", "location", "France");

Documentation

Remember to document your tasks. For this, we provide a set of annotations explained in the Document each plugin section.

Flowable Task

Flowable tasks are the most complex tasks to develop, and will usually be available from the Kestra core. You will rarely need to create a flowable task by yourself.

Keep in mind that a flowable task will be evaluated very frequently inside the Executor and must be low CPU usage; no I/O should be done by this kind of task.

In the future, complete documentation will be available here. In the meantime, you can find all the actual Flowable tasks here to have some inspiration for Sequential or Parallel tasks development.