Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import io.serverlessworkflow.api.types.ListenTask;
import io.serverlessworkflow.api.types.ListenTaskConfiguration;
import io.serverlessworkflow.api.types.ListenTo;
import io.serverlessworkflow.api.types.TaskTimeout;
import io.serverlessworkflow.api.types.Timeout;
import io.serverlessworkflow.api.types.TimeoutAfter;
import java.util.function.Consumer;

public abstract class AbstractListenTaskBuilder<
Expand Down Expand Up @@ -63,6 +66,27 @@ public AbstractListenTaskBuilder<T, F> to(Consumer<F> c) {
return this;
}

public ListenToConfigurerBuilder<T> to() {
return new ListenToConfigurerBuilder<>(this, taskItemListBuilder);
}

void applyTo(ListenTo listenTo) {
this.config.setTo(listenTo);
}

public AbstractListenTaskBuilder<T, F> timeout(String durationExpression) {
this.listenTask.setTimeout(
new TaskTimeout()
.withTaskTimeoutDefinition(
new Timeout()
.withAfter(new TimeoutAfter().withDurationExpression(durationExpression))));
return this;
}

protected ListenTask getListenTask() {
return listenTask;
}

public ListenTask build() {
return listenTask;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.fluent.spec;

import java.util.function.Consumer;

public class ListenToConfigurerBuilder<T extends BaseTaskItemListBuilder<T>> {

private final AbstractListenTaskBuilder<T, ?> listenTaskBuilder;
private final T taskItemListBuilder;
private final ListenToBuilder listenToBuilder;
private boolean strategySet;

public ListenToConfigurerBuilder(
AbstractListenTaskBuilder<T, ?> listenTaskBuilder, T taskItemListBuilder) {
this.listenTaskBuilder = listenTaskBuilder;
this.taskItemListBuilder = taskItemListBuilder;
this.listenToBuilder = new ListenToBuilder();
}

public ListenToConfigurerBuilder<T> one(Consumer<EventFilterBuilder> filter) {
listenToBuilder.one(filter);
strategySet = true;
return this;
}

public ListenToConfigurerBuilder<T> all(Consumer<EventFilterBuilder>... filters) {
listenToBuilder.all(filters);
strategySet = true;
return this;
}

public ListenToConfigurerBuilder<T> any(Consumer<EventFilterBuilder>... filters) {
listenToBuilder.any(filters);
strategySet = true;
return this;
}

public ListenToConfigurerBuilder<T> until(String expression) {
listenToBuilder.until(expression);
return this;
}

public ListenToConfigurerBuilder<T> forEach(
String item, Consumer<SubscriptionIteratorBuilder<T>> tasksConsumer) {
final SubscriptionIteratorBuilder<T> iteratorBuilder =
new SubscriptionIteratorBuilder<>(this.taskItemListBuilder);
tasksConsumer.accept(iteratorBuilder);
listenTaskBuilder.getListenTask().setForeach(iteratorBuilder.build());
return this;
}

public AbstractListenTaskBuilder<T, ?> apply() {
listenTaskBuilder.applyTo(listenToBuilder.build());
return listenTaskBuilder;
}

public boolean isStrategySet() {
return strategySet;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,15 @@ public SELF jsonData(Map<String, Object> data) {
addPropertyStep(e -> e.data(data));
return JSON();
}

/**
* Sets the event data as an expression without setting content type.
*
* @param expr the data expression (e.g., "${.temperature}")
* @return self
*/
public SELF data(String expr) {
addPropertyStep(e -> e.data(expr));
return self();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,15 @@ public SELF jsonData(String expr) {
addPropertyStep(e -> e.data(expr));
return JSON();
}

/**
* Sets the event data filter expression without setting content type.
*
* @param expr the data filter expression (e.g., "${ .temperature > 38 }")
* @return self
*/
public SELF data(String expr) {
addPropertyStep(e -> e.data(expr));
return self();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static io.serverlessworkflow.fluent.spec.dsl.DSL.cases;
import static io.serverlessworkflow.fluent.spec.dsl.DSL.doTasks;
import static io.serverlessworkflow.fluent.spec.dsl.DSL.emit;
import static io.serverlessworkflow.fluent.spec.dsl.DSL.event;
import static io.serverlessworkflow.fluent.spec.dsl.DSL.forEach;
import static io.serverlessworkflow.fluent.spec.dsl.DSL.fork;
import static io.serverlessworkflow.fluent.spec.dsl.DSL.http;
Expand Down Expand Up @@ -703,4 +704,53 @@ void testDoTaskRunWorkflow() {
RunTaskConfiguration.ProcessReturnType.NONE, runTask.getRun().getRunWorkflow().getReturn());
assertEquals(false, runTask.getRun().getRunWorkflow().isAwait());
}

@Test
void testListenWithConfigurerBuilder() {
Workflow wf =
WorkflowBuilder.workflow("listen-with-configurer", "test", "0.1.0")
.tasks(
doTasks(
listen(
"waitForEvent",
l ->
l.to()
.any(
event().type("com.example.event.A"),
event().type("com.example.event.B"))
.until("$.count > 0")
.forEach(
"item",
f ->
f.tasks(
set("processed", s -> s.put("eventType", "processed"))))
.apply())))
.build();

List<TaskItem> items = wf.getDo();
assertNotNull(items, "Do list must not be null");
assertEquals(1, items.size(), "There should be one listen task");

TaskItem item = items.get(0);
assertEquals("waitForEvent", item.getName(), "TaskItem name should match");
ListenTask lt = item.getTask().getListenTask();
assertNotNull(lt, "ListenTask should be present");

// Verify strategy
var strategy = lt.getListen().getTo().getAnyEventConsumptionStrategy();
assertNotNull(strategy, "Any strategy should be set");
assertEquals(2, strategy.getAny().size(), "Should have 2 event filters");

// Verify until condition
var until = strategy.getUntil();
assertNotNull(until, "Until should be set");
assertEquals("$.count > 0", until.getAnyEventUntilCondition(), "Until expression should match");

// Verify foreach
var foreach = lt.getForeach();
assertNotNull(foreach, "Foreach should be set");
assertNotNull(foreach.getDo(), "Foreach do tasks should be set");
assertEquals(1, foreach.getDo().size(), "Foreach do should have 1 task");
assertEquals("processed", foreach.getDo().get(0).getName(), "Foreach task name should match");
}
}
Loading