Develop a Task
Here are the instructions to develop a new task.
Runnable Task
Let's look at this one more deeply:
Class annotations
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
These annotations required to make your plugin work with Kestra. They are Lombok annotations and allow Kestra and its internal serialization to work properly.
Class declaration
public class ReverseString extends Task implements RunnableTask<ReverseString.Output>
ReverseString
is the name of your task, it can be used in Kestra withtype: io.kestra.plugin.templates.ReverseString
(aka:{{package}}.{{className}}
).- The task class must extend
Task
, this is the base class for all tasks. - The task class must implement
RunnableTask
as it's a task that must run on the Worker, and must declare its output which is here of typeReverseString.Output
.
Properties
private Property<String> string;
All task properties must be declared as task class attributes. They will be passed to the task by the flow at execution time.
If you want your attribute to be dynamic, you need to wrap the type of your attribute into the Property
type. Dynamic properties are explained later.
For example, this will be a valid YAML for using this task. It uses an output from a previous task as its property, which is possible thanks to dynamic properties rendering inside the task via runContext.render(string).as(String.class).orElse(null)
:
type: io.kestra.plugin.templates.ReverseString
string: "{{ outputs.previousTask.name }}"
You can declare as many properties as you want.
You can use any serializable types by Jackson for your properties (ex: Double, boolean, ...). You can create any class as long as the class is Serializable.
Properties validation
Properties can be validated using jakarta.validation.constraints.*
annotations. When the user creates a flow, your task properties will be validated before insertion and prevent any wrong flow definition from being saved.
The default available annotations are:
@Positive
@AssertFalse
@AssertTrue
@Max
@Min
@Negative
@NegativeOrZero
@Positive
@PositiveOrZero
@NotBlank
@NotNull
@Null
@NotEmpty
@Past
@PastOrPresent
@Future
@FutureOrPresent
You can also create your own custom validation. To do so, first define the annotation as follows:
@Retention(RetentionPolicy.RUNTIME)
@Constraint(validatedBy = { })
public @interface CronExpression {
String message() default "invalid cron expression ({validatedValue})";
}
Then, define a factory to inject the validator:
@Factory
public class ValidationFactory {
private static final CronParser CRON_PARSER = new CronParser(CronDefinitionBuilder.instanceDefinitionFor(CronType.UNIX));
@Singleton
ConstraintValidator<CronExpression, CharSequence> cronExpressionValidator() {
return (value, annotationMetadata, context) -> {
if (value == null) {
return true;
}
try {
Cron parse = CRON_PARSER.parse(value.toString());
parse.validate();
} catch (IllegalArgumentException e) {
return false;
}
return true;
};
}
}
Run
@Override
public ReverseString.Output run(RunContext runContext) throws Exception {
Logger logger = runContext.logger();
String render = runContext.render(string).as(String.class).orElse(null);
logger.debug(render);
return Output.builder()
.reversed(StringUtils.reverse(render))
.build();
}
The run
method is where the main logic of your task will do all the work needed. You can use any Java code here with any required libraries as long as you have declared them in the Gradle configuration.
Log
Logger logger = runContext.logger();
You can access a logger via the run context. The run context will provide a logger for the current execution. Do not create your own custom logger so logs can be tracked inside Kestra.
Dynamic properties rendering
String rendered = runContext.render(string).as(String.class).orElse(null);
Kestra supports expressions as tasks parameters. To use them, your task attribute must be encapsulated into the Property
carrier type.
A dynamic property must be rendered before usage; this will use our templating engine, Pebble, to render the property into the target type. Rendering properties using the Property
carrier type via the run context is null-safe, it will return an empty Optional or an empty collection for lists and maps.
Dynamic properties supports all Java types that can be serialized via Jackson, for example, for using a Duration
you can do:
// property definition
private Property<Duration> duration;
@Override
public Output run(RunContext runContext) throws Exception {
Duration rendered = runContext.render(duration).as(Duration.class).orElse(null);
// [...]
}
You can provide a default value at property definition time via Property.of()
:
private Property<Duration> duration = Property.of(Duration.ofSeconds(10));
Lists are supported via Property<List<String>>
, for example:
// property definition
private Property<List<String>> list;
@Override
public Output run(RunContext runContext) throws Exception {
List<String> rendered = runContext.render(list).asList(String.class);
// [...]
}
Maps are supported via Property<Map<String, String>>
, for example:
// property definition
private Property<Map<String, String>> map;
@Override
public Output run(RunContext runContext) throws Exception {
Map<String, String> rendered = runContext.render(map).asMap(String.class, String.class);
// [...]
}
Kestra uses a special type to carry data in a flexible way: Data
.
Data
can be built via three different types or properties: a URI (which will points to a Kestra internal storage file), a list of map (for defining multiple items), or a map (for a single item). Thanks to this, the task user can pass data to it in a very flexible way, we strongly encourage you to use this type when it fits your needs.
Here is an example that defines a Data
attibute of type Message
, at run time you will need to render this property and map the message from a Map
. It uses Project Reactor Flux
under the cover to allow processing items one by one in a reactive manner allowing to process an arbitrary number of items. When couple with our internal storage files, it can process files or billions of items if needed:
// property definition
@NotNull
private Data<Message> data;
@Override
public Output run(RunContext runContext) throws Exception {
List<Message> outputMessages = data.flux(runContext, Message.class, message -> Message.fromMap(message))
.collectList()
.block();
// [...]
}
At flow definition, the user must define how to define the data property.
For using it with an internal storage file (in the ION format), which would allow to process the file line by line:
type: io.kestra.plugin.myplugin.MyTask
data:
fromURI: "{{inputs.file}}"
For using it with a single item as a map:
type: io.kestra.plugin.myplugin.MyTask
data:
fromMap:
prop1: "{{inputs.value1}}"
prop2: "{{inputs.value2}}"
For using it with a list of items as a list of maps:
type: io.kestra.plugin.myplugin.MyTask
data:
fromList:
- prop1: "{{inputs.value1}}"
prop2: "{{inputs.value2}}"
- prop1: "{{inputs.value3}}"
prop2: "{{inputs.value4}}"
Static properties or the old @PluginProperty
annotation
Alternatively to using the Property
carrier type, if you don't want or can't use dynamic properties using the Property
carrier type, you can define you task property like this:
@PluginProperty
private String string;
Always use the @PluginProperty
if you don't use the Property
carrier type for the task JSON Schema and documentation to be property generated. You can even use @PluginProperty(dynamic = true)
and render the property in other ways that I will not explain here, this was the old way to deal with dynamic properties, but it's now strongly dis-encouraged.
Kestra internal storage
The run context object has a storage()
method that allows accessing the Kestra internal storage. It also has a workingDir()
method which allow managing files inside the task working directory, this is important to use it for all files manipulations, to avoid any task going out of its working directory for security reasons, the WorkingDir
object will protect your task against that.
Example of reading a file from the internal storage:
final URI from = new URI(runContext.render(this.from).as(String.class).orElseThrow());
final InputStream inputStream = runContext.storage().getFile(from);
It will return an InputStream
that will read the file from the internal storage.
You can also write files to Kestra's internal storage using runContext.storage().putFile(File file)
. The local file must be created inside the working directory and will be deleted after being put inside the internal storage.
File file = runContext.workingDir().createFile("items.csv");
// [...] -> fill the file
URI uri = runContext.storage().putFile(file);
// return the uri inside an Output so it can be used by other tasks
If a file with the same name already exist, the call to createFile()
will fail, to avoid that you can use runContext.workingDir().createTempFile(".csv")
instead, which will generate a unique file name for you.
runContext.storage().putFile()
will return a URI pointing to the file inside the internal storage, for this file to be used by other tasks and available inside the execution outputs; you must return it as one of your task outputs.
Outputs
public class ReverseString extends Task implements RunnableTask<ReverseString.Output> {
@Override
public ReverseString.Output run(RunContext runContext) throws Exception {
return Output.builder()
.reversed(StringUtils.reverse(render))
.build();
}
@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
@Schema(
title = "The reversed string"
)
private final String reversed;
}
}
Each task must return an object of type io.kestra.core.models.tasks.Output
with the output properties that would be available for the next tasks and as execution outputs.
You can add as many properties as you want, just keep in mind that outputs need to be serializable. At execution time, outputs can be accessed by downstream tasks by leveraging outputs expressions e.g. {{ outputs.task_id.output_attribute }}
.
Tasks outputs will be stored inside the execution context, they are not designed to store data but task execution metadata, to store data, use an internal storage file and return the file URI inside the output.
If your task doesn't have any outputs, you can use io.kestra.core.models.tasks.VoidOutput
and returns null:
public class NoOutput extends Task implements FlowableTask<VoidOutput> {
@Override
public VoidOutput run(RunContext runContext) throws Exception {
return null;
}
}
Exception
In the run
method, you can throw any Java Exception
, they will be caught by Kestra and will fail the execution.
We advise you to throw any Exception that can fail your task as soon as possible.
Metrics
You can expose metrics to add observability to your task. Metrics will be recorded within the execution, and can be accessed via the UI or as Prometheus metrics.
There are two kinds of metrics available:
Counter
:Counter.of("your.counter", count, tags);
with argsString name
: The name of the metricDouble|Long|Integer|Float count
: the associated counterString... tags
: a list of tags associated with your metric
Timer
:Timer.of("your.duration", duration, tags);
String name
: The name of the metricDuration duration
: the recorded durationString... tags
: a list of tags associated with your metric
To save metrics with the execution, you need to use runContext.metric(metric)
.
Name
Must be lowercase separated by dots.
Tags
Must be pairs of tag key and value. An example of two valid tags (zone
and location
) is:
Counter.of("your.counter", count, "zone", "EU", "location", "France");
Documentation
Remember to document your tasks. For this, we provide a set of annotations explained in the Document each plugin section.
Flowable Task
Flowable tasks are the most complex tasks to develop, and will usually be available from Kestra core. You will rarely need to create a flowable task by yourself.
When developing such tasks, you must make it fault-tolerant as an exception thrown by a flowable task can endanger the Kestra instance and lead to inconsistencies in the flow execution.
Keep in mind that a flowable task will be evaluated very frequently inside the Executor and must have low CPU usage; no I/O should be done by this kind of task.
In the future, complete documentation will be available here. In the meantime, you can find all the actual Flowable tasks here to have some inspiration for Sequential or Parallel tasks development.
Was this page helpful?