Develop a Task
Here are the instructions to develop a new task.
Runnable Task
Lets look at this one more deeply:
Class annotations
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
These are required in order to make your plugin work with Kestra. These are Lombok annotations that allow Kestra and its internal serialization to work properly.
Class declaration
public class ReverseString extends Task implements RunnableTask<ReverseString.Output>
ReverseString
is the name of your task, and it can be used on Kestra withtype: io.kestra.plugin.templates.ReverseString
(aka:{{package}}.{{className}}
).- Class must extend
Task
to be usable. implements RunnableTask<ReverseString.Output>
: must implementRunnableTask
to be discovered and must declare the output of the taskReverseString.Output
.
Properties
@PluginProperty(dynamic = true)
private String format;
Declare all the properties that you can pass to the current task in a flow. For example, this will be a valid yaml for this task:
type: io.kestra.plugin.templates.ReverseString
format: "{{ outputs.previousTask.name }}"
You can declare as many properties as you want. All of these will be filled by Kestra executors.
You can use any serializable by Jackson for your properties (ex: Double, boolean, ...). You can create any class as long as the class is Serializable.
Properties validation
Properties can be validated using javax.validation.constraints.*
annotations. When the user creates a flow, your task properties will be validated before insertion and prevent wrong definition to be inserted.
The default available annotations are:
@Positive
@AssertFalse
@AssertTrue
@Max
@Min
@Negative
@NegativeOrZero
@Positive
@PositiveOrZero
@NotBlank
@NotNull
@Null
@NotEmpty
@Past
@PastOrPresent
@Future
@FutureOrPresent
You can also create your own custom validation. You must defined the annotation as follows:
@Retention(RetentionPolicy.RUNTIME)
@Constraint(validatedBy = { })
public @interface CronExpression {
String message() default "invalid cron expression ({validatedValue})";
}
And you must also define a factory to inject the validation method:
@Factory
public class ValidationFactory {
private static final CronParser CRON_PARSER = new CronParser(CronDefinitionBuilder.instanceDefinitionFor(CronType.UNIX));
@Singleton
ConstraintValidator<CronExpression, CharSequence> cronExpressionValidator() {
return (value, annotationMetadata, context) -> {
if (value == null) {
return true;
}
try {
Cron parse = CRON_PARSER.parse(value.toString());
parse.validate();
} catch (IllegalArgumentException e) {
return false;
}
return true;
};
}
}
Run
@Override
public ReverseString.Output run(RunContext runContext) throws Exception {
Logger logger = runContext.logger();
String render = runContext.render(format);
logger.debug(render);
return Output.builder()
.reverse(StringUtils.reverse(render))
.build();
}
The run
method is where the main logic of your task will do all the work needed. You can use any Java code here with any required libraries as long as you have declared them in the Gradle configuration.
Log
Logger logger = runContext.logger();
To have a logger, you need to use this instruction. This will provide a logger for the current execution and will log appropriately. Do not create your own custom logger in order to track logs on the UI.
Render variables
String render = runContext.render(format);
In order to use dynamic expressions, you need to render them i.e. transform the properties with Pebble. Do not forget to render variables if you need to pass an output from previous variables.
You also need to add the annotation @PluginProperty(dynamic = true)
in order to explain in the documentation that you can pass some dynamic variables.
Provide a @PluginProperty
annotation even if you didn't set any of its attributes for all variables or the generated documentation will not be accurate.
Kestra storage
You can read any files from Kestra storage using the method runContext.uriToInputStream()
final URI from = new URI(runContext.render(this.from));
final InputStream inputStream = runContext.uriToInputStream(from);
You will get an InputStream
in order to read the file from Kestra storage (coming from inputs or task outputs).
You can also write files to Kestra's internal storage using runContext.putTempFile(File file)
. The local file will be deleted, so you must use a temporary file.
File tempFile = File.createTempFile("concat_", "");
runContext.putTempFile(tempFile)
Do not forget to provide Outputs with the link generated by putTempFile
in order for it to be usable by other tasks.
Outputs
public class ReverseString extends Task implements RunnableTask<ReverseString.Output> {
@Override
public ReverseString.Output run(RunContext runContext) throws Exception {
return Output.builder()
.reverse(StringUtils.reverse(render))
.build();
}
@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
@Schema(
title = "The reversed string"
)
private final String reverse;
}
}
Each task must return a class instance with output values that can be used in the next tasks.
You must return a class that implements io.kestra.core.models.tasks.Output
.
You can add as many properties as you want, just keep in mind that outputs need to be serializable. At execution time, outputs can be accessed by downstream tasks by leveraging outputs expressions e.g. {{ outputs.task_id.output_attribute }}
.
If your task doesn't provide any outputs (mostly never), you use io.kestra.core.models.tasks.VoidOutput
:
public class NoOutput extends Task implements FlowableTask<VoidOutput> {
@Override
public VoidOutput run(RunContext runContext) throws Exception {
return null;
}
}
Exception
In the run
method, you can throw any Exception
that will be caught by Kestra and will fail the execution.
We advise you to throw any Exception that can break your task as soon as possible.
Metrics
You can expose metrics to add observability to your task. Metrics will be recorded with the execution and can be accessed via the UI or as Prometheus metrics.
There are two kinds of metrics available:
Counter
:Counter.of("your.counter", count, tags);
with argsString name
: The name of the metricDouble|Long|Integer|Float count
: the associated counterString... tags
: a list of tags associated with your metric
Timer
:Timer.of("your.duration", duration, tags);
String name
: The name of the metricDuration duration
: the recorded durationString... tags
: a list of tags associated with your metric
To save metrics with the execution, you need to use runContext.metric(metric)
.
Name
Must be lowercase separated by dots.
Tags
Must be pairs of tag key and value. An example of two valid tags (zone
and location
) is:
Counter.of("your.counter", count, "zone", "EU", "location", "France");
Documentation
Remember to document your tasks. For this, we provide a set of annotations explained in the Document each plugin section.
Flowable Task
Flowable tasks are the most complex tasks to develop, and will usually be available from the Kestra core. You will rarely need to create a flowable task by yourself.
When developing such tasks, you must make it fault-tolerant as an exception thrown by a flowable task can endanger the Kestra instance and lead to inconsistencies in the flow execution.
Keep in mind that a flowable task will be evaluated very frequently inside the Executor and must have low CPU usage; no I/O should be done by this kind of task.
In the future, complete documentation will be available here. In the meantime, you can find all the actual Flowable tasks here to have some inspiration for Sequential or Parallel tasks development.
Was this page helpful?