​Develop your ​Runnable​Task

Here are the instructions to develop the most common Runnable Task.

Here is a simple task example that reverse a string:

java
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
    title = "Reverse a string",
    description = "Reverse all letters from a string"
)
public class ReverseString extends Task implements RunnableTask<ReverseString.Output> {
    @Schema(
        title = "The base string you want to reverse"
    )
    @PluginProperty(dynamic = true)
    private String format;

    @Override
    public Example.Output run(RunContext runContext) throws Exception {
        Logger logger = runContext.logger();

        String render = runContext.render(format);
        logger.debug(render);

        return Output.builder()
            .reverse(StringUtils.reverse(render))
            .build();
    }

    @Builder
    @Getter
    public static class Output implements io.kestra.core.models.tasks.Output {
        @Schema(
            title = "The reverse string "
        )
        private final String reverse;
    }
}

Let look at these one deeply:

Class annotations

java
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor

These are required in order to make your plugin works with Kestra, these are Lombok annotations that allow Kestra and its internal serialization to work properly.

Class declaration

java
public class ReverseString extends Task implements RunnableTask<Example.Output>
  • ReverseString is the name of your task and can be use on Kestra with type: io.kestra.plugin.templates.ReverseString (aka: {{package}}.{{className}}).
  • Class must extends Task to be usable.
  • implements RunnableTask<ReverseString.Output>: must implements RunnableTask to be discovered and must declared the output of the task ReverseString.Output.

Properties

java
    @PluginProperty(dynamic = true)
    private String format;

Declare all the properties that you can pass to the current task on a flow. For example, this will be a valid yaml for this task:

yaml
type: io.kestra.plugin.templates.ReverseString
format: "{{outputs.previousTask.name}}"

You can declare as many properties as you want, all of these will be filled by Kestra executors. You can use any serializable by Jackson for your properties (ex: Double, boolean, ...). You can create any class as long as the class is Serializable.

Properties validation

Properties can be validated using javax.validation.constraints.* annotations. When the user creates a flow, your task properties will be validated before insertion and prevent wrong definition to be inserted.

The default available annotations are:

  • @Positive
  • @AssertFalse
  • @AssertTrue
  • @Max
  • @Min
  • @Negative
  • @NegativeOrZero
  • @Positive
  • @PositiveOrZero
  • @NotBlank
  • @NotNull
  • @Null
  • @NotEmpty
  • @Past
  • @PastOrPresent
  • @Future
  • @FutureOrPresent

You can also create your custom validation, you must defined the annotation:

java
@Retention(RetentionPolicy.RUNTIME)
@Constraint(validatedBy = { })
public @interface CronExpression {
    String message() default "invalid cron expression ({validatedValue})";
}

and a factory to inject the validation method:

java
@Factory
public class ValidationFactory {
private static final CronParser CRON_PARSER = new CronParser(CronDefinitionBuilder.instanceDefinitionFor(CronType.UNIX));

    @Singleton
    ConstraintValidator<CronExpression, CharSequence> cronExpressionValidator() {
        return (value, annotationMetadata, context) -> {
            if (value == null) {
                return true;
            }

            try {
                Cron parse = CRON_PARSER.parse(value.toString());
                parse.validate();
            } catch (IllegalArgumentException e) {
                return false;
            }

            return true;
        };
    }
}

Run

java
    @Override
    public Example.Output run(RunContext runContext) throws Exception {
        Logger logger = runContext.logger();

        String render = runContext.render(format);
        logger.debug(render);

        return Output.builder()
            .reverse(StringUtils.reverse(render))
            .build();
    }

The run method is where the main logic of your task will do all the works needed. You can used any Java code here with any libs you need until you have declared it in the Gradle configuration.

Log

java
Logger logger = runContext.logger();

To have a logger, you need to use this instruction, this will provide a logger for the current execution and will be log properly, don't have your own logger in order to track log on the UI.

Render variables

java
String render = runContext.render(format);

In order to use variables, you need to render the variables, aka: transform the properties with Pebble. Just don't forget to render variables if you need to pass some output from previous variables.

You also need to add the annotation @PluginProperty(dynamic = true) in order to explain in the documentation that you can pass some dynamic variables. Provide a @PluginProperty annotation even if you didn't set any of its attributes for all variables or the generated documentation will not be accurate.

Kestra storage

You can read any files from Kestra storage using the method runContext.uriToInputStream()

java
final URI from = new URI(runContext.render(this.from));
final InputStream inputStream = runContext.uriToInputStream(from);

You will get an InputStream in order to read the file from Kestra storage (coming from inputs or task outputs).

You can also write files to Kestra storage using runContext.putTempFile(File file). The local file will be deleted, so you must use a temporary file.

java
File tempFile = File.createTempFile("concat_", "");
runContext.putTempFile(tempFile)

Don't forgot to provide Outputs with the link generated by putTempFile in order to be usable by others tasks.

Outputs

java
public class ReverseString extends Task implements RunnableTask<ReverseString.Output> {
    @Override
    public Example.Output run(RunContext runContext) throws Exception {
        return Output.builder()
            .reverse(StringUtils.reverse(render))
            .build();
    }

    @Builder
    @Getter
    public static class Output implements io.kestra.core.models.tasks.Output {
        @Schema(
            title = "The reverse string "
        )
        private final String reverse;
    }
}

Each task must return a class instance with outputs values that can be used from next tasks. You must return a class that implements io.kestra.core.models.tasks.Output. You can add as many properties as you want, but keep in mind that all theses must be serializable. All outputs will be available for next tasks through variables.

If your task don't provide any outputs (mostly never), you use io.kestra.core.models.tasks.VoidOutput:

java
public class NoOutput extends Task implements FlowableTask<VoidOutput> {
    @Override
    public VoidOutput run(RunContext runContext) throws Exception {
        return null;
    }
}

Exception

In the run method, you can throw any Exception that will be catch by Kestra and will fail the execution. We advise you to throw any Exception that can break your task as soon as possible.

Metrics

You can send metrics to add some observability on your task. Metrics will be recorded with the execution and will be shown on the UI.

There is two kinds of metrics :

  • Counter: Counter.of("your.counter", count, tags); with args
    • String name: The name of the metrics
    • Double|Long|Integer|Float count: the associated counter
    • String... tags: a list of tags associated with your metrics
  • Timer: Timer.of("your.duration", duration, tags);
    • String name: The name of the metrics
    • Duration duration: the recorded duration
    • String... tags: a list of tags associated with your metrics

To save metrics with the execution, you need to use runContext.metric(metrics).

Name

Must be lowercase separated by dots.

Tags

Must be a key and value of tags. Example of a valid tags are :

java
Counter.of("your.counter", count, "zone", "EU", "location", "france");

Documentation

Remember to document your tasks. For this, we provide a set of annotations explained in the Document each plugin section.