Here are the instructions to develop a new task.

Runnable Task

Here is a simple Runnable Task that reverses a string

Lets look at this one more deeply:

Class annotations

java
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor

These are required in order to make your plugin work with Kestra. These are Lombok annotations that allow Kestra and its internal serialization to work properly.

Class declaration

java
public class ReverseString extends Task implements RunnableTask<ReverseString.Output>
  • ReverseString is the name of your task, and it can be used on Kestra with type: io.kestra.plugin.templates.ReverseString (aka: {{package}}.{{className}}).
  • Class must extend Task to be usable.
  • implements RunnableTask<ReverseString.Output>: must implement RunnableTask to be discovered and must declare the output of the task ReverseString.Output.

Properties

java
    @PluginProperty(dynamic = true)
    private String format;

Declare all the properties that you can pass to the current task in a flow. For example, this will be a valid yaml for this task:

yaml
type: io.kestra.plugin.templates.ReverseString
format: "{{ outputs.previousTask.name }}"

You can declare as many properties as you want. All of these will be filled by Kestra executors.

You can use any serializable by Jackson for your properties (ex: Double, boolean, ...). You can create any class as long as the class is Serializable.

Properties validation

Properties can be validated using javax.validation.constraints.* annotations. When the user creates a flow, your task properties will be validated before insertion and prevent wrong definition to be inserted.

The default available annotations are:

  • @Positive
  • @AssertFalse
  • @AssertTrue
  • @Max
  • @Min
  • @Negative
  • @NegativeOrZero
  • @Positive
  • @PositiveOrZero
  • @NotBlank
  • @NotNull
  • @Null
  • @NotEmpty
  • @Past
  • @PastOrPresent
  • @Future
  • @FutureOrPresent

You can also create your own custom validation. You must defined the annotation as follows:

java
@Retention(RetentionPolicy.RUNTIME)
@Constraint(validatedBy = { })
public @interface CronExpression {
    String message() default "invalid cron expression ({validatedValue})";
}

And you must also define a factory to inject the validation method:

java
@Factory
public class ValidationFactory {
private static final CronParser CRON_PARSER = new CronParser(CronDefinitionBuilder.instanceDefinitionFor(CronType.UNIX));

    @Singleton
    ConstraintValidator<CronExpression, CharSequence> cronExpressionValidator() {
        return (value, annotationMetadata, context) -> {
            if (value == null) {
                return true;
            }

            try {
                Cron parse = CRON_PARSER.parse(value.toString());
                parse.validate();
            } catch (IllegalArgumentException e) {
                return false;
            }

            return true;
        };
    }
}

Run

java
    @Override
    public ReverseString.Output run(RunContext runContext) throws Exception {
        Logger logger = runContext.logger();

        String render = runContext.render(format);
        logger.debug(render);

        return Output.builder()
            .reverse(StringUtils.reverse(render))
            .build();
    }

The run method is where the main logic of your task will do all the work needed. You can use any Java code here with any required libraries as long as you have declared them in the Gradle configuration.

Log

java
Logger logger = runContext.logger();

To have a logger, you need to use this instruction. This will provide a logger for the current execution and will log appropriately. Do not create your own custom logger in order to track logs on the UI.

Render variables

java
String render = runContext.render(format);

In order to use dynamic expressions, you need to render them i.e. transform the properties with Pebble. Do not forget to render variables if you need to pass an output from previous variables.

You also need to add the annotation @PluginProperty(dynamic = true) in order to explain in the documentation that you can pass some dynamic variables. Provide a @PluginProperty annotation even if you didn't set any of its attributes for all variables or the generated documentation will not be accurate.

Kestra storage

You can read any files from Kestra storage using the method runContext.uriToInputStream()

java
final URI from = new URI(runContext.render(this.from));
final InputStream inputStream = runContext.uriToInputStream(from);

You will get an InputStream in order to read the file from Kestra storage (coming from inputs or task outputs).

You can also write files to Kestra's internal storage using runContext.putTempFile(File file). The local file will be deleted, so you must use a temporary file.

java
File tempFile = File.createTempFile("concat_", "");
runContext.putTempFile(tempFile)

Do not forget to provide Outputs with the link generated by putTempFile in order for it to be usable by other tasks.

Outputs

java
public class ReverseString extends Task implements RunnableTask<ReverseString.Output> {
    @Override
    public ReverseString.Output run(RunContext runContext) throws Exception {
        return Output.builder()
            .reverse(StringUtils.reverse(render))
            .build();
    }

    @Builder
    @Getter
    public static class Output implements io.kestra.core.models.tasks.Output {
        @Schema(
            title = "The reversed string"
        )
        private final String reverse;
    }
}

Each task must return a class instance with output values that can be used in the next tasks. You must return a class that implements io.kestra.core.models.tasks.Output. You can add as many properties as you want, just keep in mind that outputs need to be serializable. At execution time, outputs can be accessed by downstream tasks by leveraging outputs expressions e.g. {{ outputs.task_id.output_attribute }}.

If your task doesn't provide any outputs (mostly never), you use io.kestra.core.models.tasks.VoidOutput:

java
public class NoOutput extends Task implements FlowableTask<VoidOutput> {
    @Override
    public VoidOutput run(RunContext runContext) throws Exception {
        return null;
    }
}

Exception

In the run method, you can throw any Exception that will be caught by Kestra and will fail the execution. We advise you to throw any Exception that can break your task as soon as possible.

Metrics

You can expose metrics to add observability to your task. Metrics will be recorded with the execution and can be accessed via the UI or as Prometheus metrics.

There are two kinds of metrics available:

  • Counter: Counter.of("your.counter", count, tags); with args
    • String name: The name of the metric
    • Double|Long|Integer|Float count: the associated counter
    • String... tags: a list of tags associated with your metric
  • Timer: Timer.of("your.duration", duration, tags);
    • String name: The name of the metric
    • Duration duration: the recorded duration
    • String... tags: a list of tags associated with your metric

To save metrics with the execution, you need to use runContext.metric(metric).

Name

Must be lowercase separated by dots.

Tags

Must be pairs of tag key and value. An example of two valid tags (zone and location) is:

java
Counter.of("your.counter", count, "zone", "EU", "location", "France");

Documentation

Remember to document your tasks. For this, we provide a set of annotations explained in the Document each plugin section.

Flowable Task

Flowable tasks are the most complex tasks to develop, and will usually be available from the Kestra core. You will rarely need to create a flowable task by yourself.

Keep in mind that a flowable task will be evaluated very frequently inside the Executor and must have low CPU usage; no I/O should be done by this kind of task.

In the future, complete documentation will be available here. In the meantime, you can find all the actual Flowable tasks here to have some inspiration for Sequential or Parallel tasks development.

Was this page helpful?