Activity execution - Java SDK
Start an Activity Execution
Calls to spawn Activity Executions are written within a Workflow Definition. The call to spawn an Activity Execution generates the ScheduleActivityTask Command. This results in the set of three Activity Task related Events (ActivityTaskScheduled, ActivityTaskStarted, and ActivityTask[Closed])in your Workflow Execution Event History.
A single instance of the Activities implementation is shared across multiple simultaneous Activity invocations. Activity implementation code should be idempotent.
The values passed to Activities through invocation parameters or returned through a result value are recorded in the Execution history. The entire Execution history is transferred from the Temporal service to Workflow Workers when a Workflow state needs to recover. A large Execution history can thus adversely impact the performance of your Workflow.
Therefore, be mindful of the amount of data you transfer through Activity invocation parameters or Return Values. Otherwise, no additional limitations exist on Activity implementations.
Activities are remote procedure calls that must be invoked from within a Workflow using ActivityStub.
Activities are not executable on their own. You cannot start an Activity Execution by itself.
Note that before an Activity Execution is invoked:
- Activity options (either
setStartToCloseTimeoutorScheduleToCloseTimeoutare required) must be set for the Activity. For details, see How to set Activity timeouts. - The Activity must be registered with a Worker. See Worker Program
- Activity code must be thread-safe.
Activities should only be instantiated using stubs from within a Workflow.
An ActivityStub returns a client-side stub that implements an Activity interface.
You can invoke Activities using Workflow.newActivityStub(type-safe) or Workflow.newUntypedActivityStub (untyped).
Calling a method on the Activity interface schedules the Activity invocation with the Temporal service, and generates an ActivityTaskScheduled Event.
Activities can be invoked synchronously or asynchronously.
Invoking Activities Synchronously
In the following example, we use the type-safe Workflow.newActivityStub within the "FileProcessingWorkflow" Workflow implementation to create a client-side stub of the FileProcessingActivities class. We also define ActivityOptions and set setStartToCloseTimeout option to one hour.
public class FileProcessingWorkflowImpl implements FileProcessingWorkflow {
private final FileProcessingActivities activities;
public FileProcessingWorkflowImpl() {
this.activities = Workflow.newActivityStub(
FileProcessingActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofHours(1))
.build());
}
@Override
public void processFile(Arguments args) {
String localName = null;
String processedName = null;
try {
localName = activities.download(args.getSourceBucketName(), args.getSourceFilename());
processedName = activities.processFile(localName);
activities.upload(args.getTargetBucketName(), args.getTargetFilename(), processedName);
} finally {
if (localName != null) {
activities.deleteLocalFile(localName);
}
if (processedName != null) {
activities.deleteLocalFile(processedName);
}
}
}
// ...
}
A Workflow can have multiple Activity stubs. Each Activity stub can have its own ActivityOptions defined.
The following example shows a Workflow implementation with two typed Activity stubs.
public FileProcessingWorkflowImpl() {
ActivityOptions options1 = ActivityOptions.newBuilder()
.setTaskQueue("taskQueue1")
.setStartToCloseTimeout(Duration.ofMinutes(10))
.build();
this.store1 = Workflow.newActivityStub(FileProcessingActivities.class, options1);
ActivityOptions options2 = ActivityOptions.newBuilder()
.setTaskQueue("taskQueue2")
.setStartToCloseTimeout(Duration.ofMinutes(5))
.build();
this.store2 = Workflow.newActivityStub(FileProcessingActivities.class, options2);
}
To invoke Activities inside Workflows without referencing the interface it implements, use an untyped Activity stub Workflow.newUntypedActivityStub.
This is useful when the Activity type is not known at compile time, or to invoke Activities implemented in different programming languages.
// Workflow code
ActivityOptions activityOptions =
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(3))
.setTaskQueue("simple-queue-node")
.build();
ActivityStub activity = Workflow.newUntypedActivityStub(activityOptions);
activity.execute("ComposeGreeting", String.class, "Hello World", "Spanish");
Invoking Activities Asynchronously
Sometimes Workflows need to perform certain operations in parallel.
The Temporal Java SDK provides the Async class which includes static methods used to invoke any Activity asynchronously.
The calls return a result of type Promise which is similar to the Java Future and CompletionStage.
When invoking Activities, use Async.function for Activities that return a result, and Async.procedure for Activities that return void.
In the following asynchronous Activity invocation, the method reference is passed to Async.function followed by Activity arguments.
Promise<String> localNamePromise = Async.function(activities::download, sourceBucket, sourceFile);
The following example shows how to call two Activity methods, "download" and "upload", in parallel on multiple files.
public void processFile(Arguments args) {
List<Promise<String>> localNamePromises = new ArrayList<>();
List<String> processedNames = null;
try {
// Download all files in parallel.
for (String sourceFilename : args.getSourceFilenames()) {
Promise<String> localName =
Async.function(activities::download, args.getSourceBucketName(), sourceFilename);
localNamePromises.add(localName);
}
List<String> localNames = new ArrayList<>();
for (Promise<String> localName : localNamePromises) {
localNames.add(localName.get());
}
processedNames = activities.processFiles(localNames);
// Upload all results in parallel.
List<Promise<Void>> uploadedList = new ArrayList<>();
for (String processedName : processedNames) {
Promise<Void> uploaded =
Async.procedure(
activities::upload,
args.getTargetBucketName(),
args.getTargetFilename(),
processedName);
uploadedList.add(uploaded);
}
// Wait for all uploads to complete.
Promise.allOf(uploadedList).get();
} finally {
for (Promise<String> localNamePromise : localNamePromises) {
// Skip files that haven't completed downloading.
if (localNamePromise.isCompleted()) {
activities.deleteLocalFile(localNamePromise.get());
}
}
if (processedNames != null) {
for (String processedName : processedNames) {
activities.deleteLocalFile(processedName);
}
}
}
}
Activity Execution Context
ActivityExecutionContext is a context object passed to each Activity implementation by default.
You can access it in your Activity implementations via Activity.getExecutionContext().
It provides getters to access information about the Workflow that invoked the Activity.
Note that the Activity context information is stored in a thread-local variable.
Therefore, calls to getExecutionContext() succeed only within the thread that invoked the Activity function.
Following is an example of using the ActivityExecutionContext:
public class FileProcessingActivitiesImpl implements FileProcessingActivities {
@Override
public String download(String bucketName, String remoteName, String localName) {
ActivityExecutionContext ctx = Activity.getExecutionContext();
ActivityInfo info = ctx.getInfo();
log.info("namespace=" + info.getActivityNamespace());
log.info("workflowId=" + info.getWorkflowId());
log.info("runId=" + info.getRunId());
log.info("activityId=" + info.getActivityId());
log.info("activityTimeout=" + info.getStartToCloseTimeout();
return downloadFileFromS3(bucketName, remoteName, localDirectory + localName);
}
...
}
For details on getting the results of an Activity Execution, see Activity Execution Result.
Set required Activity Timeouts
Activity Execution semantics rely on several parameters. The only required value that needs to be set is either a Schedule-To-Close Timeout or a Start-To-Close Timeout. These values are set in the Activity Options.
Set your Activity Timeout from the ActivityOptions.Builder class.
Available timeouts are:
- ScheduleToCloseTimeout()
- ScheduleToStartTimeout()
- StartToCloseTimeout()
You can set Activity Options using an ActivityStub within a Workflow implementation, or per-Activity using WorkflowImplementationOptions within a Worker.
The following uses ActivityStub.
GreetingActivities activities = Workflow.newActivityStub(GreetingActivities.class,
ActivityOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofSeconds(5))
// .setStartToCloseTimeout(Duration.ofSeconds(2)
// .setScheduletoCloseTimeout(Duration.ofSeconds(20))
.build());
The following uses WorkflowImplementationOptions.
WorkflowImplementationOptions options =
WorkflowImplementationOptions.newBuilder()
.setActivityOptions(
ImmutableMap.of(
"GetCustomerGreeting",
// Set Activity Execution timeout
ActivityOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofSeconds(5))
// .setStartToCloseTimeout(Duration.ofSeconds(2))
// .setScheduleToStartTimeout(Duration.ofSeconds(5))
.build()))
.build();
If you define options per-Activity Type options with WorkflowImplementationOptions.setActivityOptions(), setting them again specifically with ActivityStub in a Workflow will override this setting.
Java ActivityOptions reference
Use ActivityOptions to configure how to invoke an Activity Execution.
You can set Activity Options using an ActivityStub within a Workflow implementation, or per-Activity using WorkflowImplementationOptions within a Worker.
Note that if you define options per-Activity Type options with WorkflowImplementationOptions.setActivityOptions(), setting them again specifically with ActivityStub in a Workflow will override this setting.
The following table lists all ActivityOptions that can be configured for an Activity invocation.
| Option | Required | Type |
|---|---|---|
setScheduleToCloseTimeout | Yes (if StartToCloseTimeout is not specified) | Duration |
setScheduleToStartTimeout | No | Duration |
setStartToCloseTimeout | Yes (if ScheduleToCloseTimeout is not specified) | Duration |
setHeartbeatTimeout | No | Duration |
setTaskQueue | No | String |
setRetryOptions | No | RetryOptions |
setCancellationType | No | ActivityCancellationType |
ScheduleToCloseTimeout
To set a Schedule-To-Close Timeout, use ActivityOptions.newBuilder.setScheduleToCloseTimeout.
This or StartToCloseTimeout must be set.
- Type:
Duration - Default: Unlimited.
Note that if
WorkflowRunTimeoutand/orWorkflowExecutionTimeoutare defined in the Workflow, all Activity retries will stop when either or both of these timeouts are reached.
You can set Activity Options using an ActivityStub within a Workflow implementation, or per-Activity using WorkflowImplementationOptions within a Worker.
Note that if you define options per-Activity Type options with WorkflowImplementationOptions.setActivityOptions(), setting them again specifically with ActivityStub in a Workflow will override this setting.
-
With
ActivityStubGreetingActivities activities = Workflow.newActivityStub(GreetingActivities.class,
ActivityOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofSeconds(5))
.build()); -
With
WorkflowImplementationOptionsWorkflowImplementationOptions options =
WorkflowImplementationOptions.newBuilder()
.setActivityOptions(
ImmutableMap.of(
"GetCustomerGreeting",
ActivityOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofSeconds(5))
.build()))
.build();
ScheduleToStartTimeout
To set a Schedule-To-Start Timeout, use ActivityOptions.newBuilder.setScheduleToStartTimeout.
- Type:
Duration - Default: Unlimited. This timeout is non-retryable.
You can set Activity Options using an ActivityStub within a Workflow implementation, or per-Activity using WorkflowImplementationOptions within a Worker.
Note that if you define options per-Activity Type options with WorkflowImplementationOptions.setActivityOptions(), setting them again specifically with ActivityStub in a Workflow will override this setting.
-
With
ActivityStubGreetingActivities activities = Workflow.newActivityStub(GreetingActivities.class,
ActivityOptions.newBuilder()
.setScheduleToStartTimeout(Duration.ofSeconds(5))
// note that either StartToCloseTimeout or ScheduleToCloseTimeout are
// required when setting Activity options.
.setScheduletoCloseTimeout(Duration.ofSeconds(20))
.build()); -
With
WorkflowImplementationOptionsWorkflowImplementationOptions options =
WorkflowImplementationOptions.newBuilder()
.setActivityOptions(
ImmutableMap.of(
"GetCustomerGreeting",
ActivityOptions.newBuilder()
.setScheduleToStartTimeout(Duration.ofSeconds(5))
.build()))
.build();
StartToCloseTimeout
To set a Start-To-Close Timeout, use ActivityOptions.newBuilder.setStartToCloseTimeout.
This or ScheduleToClose must be set.
- Type:
Duration - Default: Defaults to
ScheduleToCloseTimeoutvalue
You can set Activity Options using an ActivityStub within a Workflow implementation, or per-Activity using WorkflowImplementationOptions within a Worker.
Note that if you define options per-Activity Type options with WorkflowImplementationOptions.setActivityOptions(), setting them again specifically with ActivityStub in a Workflow will override this setting.
-
With
ActivityStubGreetingActivities activities = Workflow.newActivityStub(GreetingActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(2))
.build()); -
With
WorkflowImplementationOptionsWorkflowImplementationOptions options =
WorkflowImplementationOptions.newBuilder()
.setActivityOptions(
ImmutableMap.of(
"EmailCustomerGreeting",
ActivityOptions.newBuilder()
// Set Activity Execution timeout (single run)
.setStartToCloseTimeout(Duration.ofSeconds(2))
.build()))
.build();
HeartbeatTimeout
To set a Heartbeat Timeout, use ActivityOptions.newBuilder.setHeartbeatTimeout.
- Type:
Duration - Default: None
You can set Activity Options using an ActivityStub within a Workflow implementation, or per-Activity using WorkflowImplementationOptions within a Worker.
Note that if you define options per-Activity Type options with WorkflowImplementationOptions.setActivityOptions(), setting them again specifically with ActivityStub in a Workflow will override this setting.
-
With
ActivityStubprivate final GreetingActivities activities =
Workflow.newActivityStub(
GreetingActivities.class,
ActivityOptions.newBuilder()
// note that either StartToCloseTimeout or ScheduleToCloseTimeout are
// required when setting Activity options.
.setStartToCloseTimeout(Duration.ofSeconds(5))
.setHeartbeatTimeout(Duration.ofSeconds(2))
.build()); -
With
WorkflowImplementationOptionsWorkflowImplementationOptions options =
WorkflowImplementationOptions.newBuilder()
.setActivityOptions(
ImmutableMap.of(
"EmailCustomerGreeting",
ActivityOptions.newBuilder()
// note that either StartToCloseTimeout or ScheduleToCloseTimeout are
// required when setting Activity options.
.setStartToCloseTimeout(Duration.ofSeconds(5))
.setHeartbeatTimeout(Duration.ofSeconds(2))
.build()))
.build();
TaskQueue
-
Type:
String -
Default: Defaults to the Task Queue that the Workflow was started with.
-
With
ActivityStubGreetingActivities activities = Workflow.newActivityStub(GreetingActivities.class,
ActivityOptions.newBuilder()
// note that either StartToCloseTimeout or ScheduleToCloseTimeout are required when
// setting Activity options.
.setStartToCloseTimeout(Duration.ofSeconds(5))
.setTaskQueue("yourTaskQueue")
.build()); -
With
WorkflowImplementationOptionsWorkflowImplementationOptions options =
WorkflowImplementationOptions.newBuilder()
.setActivityOptions(
ImmutableMap.of(
"EmailCustomerGreeting",
ActivityOptions.newBuilder()
// note that either StartToCloseTimeout or ScheduleToCloseTimeout are
// required when setting Activity options.
.setStartToCloseTimeout(Duration.ofSeconds(5))
.setTaskQueue("yourTaskQueue")
.build()))
.build();
See Task Queue
RetryOptions
To set a Retry Policy, known as the Retry Options in Java, use ActivityOptions.newBuilder.setRetryOptions().
-
Type:
RetryOptions -
Default: Server-defined Activity Retry policy.
-
With
ActivityStubprivate final ActivityOptions options =
ActivityOptions.newBuilder()
// note that either StartToCloseTimeout or ScheduleToCloseTimeout are
// required when setting Activity options.
.setStartToCloseTimeout(Duration.ofSeconds(5))
.setRetryOptions(
RetryOptions.newBuilder()
.setInitialInterval(Duration.ofSeconds(1))
.setMaximumInterval(Duration.ofSeconds(10))
.build())
.build(); -
With
WorkflowImplementationOptionsWorkflowImplementationOptions options =
WorkflowImplementationOptions.newBuilder()
.setActivityOptions(
ImmutableMap.of(
"EmailCustomerGreeting",
ActivityOptions.newBuilder()
// note that either StartToCloseTimeout or ScheduleToCloseTimeout are
// required when setting Activity options.
.setStartToCloseTimeout(Duration.ofSeconds(5))
.setRetryOptions(
RetryOptions.newBuilder()
.setDoNotRetry(NullPointerException.class.getName())
.build())
.build()))
.build();
setCancellationType
-
Type:
ActivityCancellationType -
Default:
ActivityCancellationType.TRY_CANCEL -
With
ActivityStubprivate final GreetingActivities activities =
Workflow.newActivityStub(
GreetingActivities.class,
ActivityOptions.newBuilder()
.setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED)
.build()); -
With
WorkflowImplementationOptionsWorkflowImplementationOptions options =
WorkflowImplementationOptions.newBuilder()
.setActivityOptions(
ImmutableMap.of(
"EmailCustomerGreeting",
ActivityOptions.newBuilder()
.setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED)
.build()))
.build();
Get the result of an Activity Execution
The call to spawn an Activity Execution generates the ScheduleActivityTask Command and provides the Workflow with an Awaitable. Workflow Executions can either block progress until the result is available through the Awaitable or continue progressing, making use of the result when it becomes available.
To get the results of an asynchronously invoked Activity method, use the Promise get method to block until the Activity method result is available.
Sometimes an Activity Execution lifecycle goes beyond a synchronous method invocation. For example, a request can be put in a queue and later a reply comes and is picked up by a different Worker process. The whole request-reply interaction can be modeled as a single Activity.
To indicate that an Activity should not be completed upon its method return, call ActivityExecutionContext.doNotCompleteOnReturn() from the original Activity thread.
Then later, when replies come, complete the Activity using the ActivityCompletionClient.
To correlate Activity invocation with completion, use either a TaskToken or Workflow and Activity Ids.
Following is an example of using ActivityExecutionContext.doNotCompleteOnReturn():
public class FileProcessingActivitiesImpl implements FileProcessingActivities {
public String download(String bucketName, String remoteName, String localName) {
ActivityExecutionContext ctx = Activity.getExecutionContext();
// Used to correlate reply
byte[] taskToken = ctx.getInfo().getTaskToken();
asyncDownloadFileFromS3(taskToken, bucketName, remoteName, localDirectory + localName);
ctx.doNotCompleteOnReturn();
// Return value is ignored when doNotCompleteOnReturn was called.
return "ignored";
}
...
}
When the download is complete, the download service potentially can complete the Activity, or fail it from a different process, for example:
public <R> void completeActivity(byte[] taskToken, R result) {
completionClient.complete(taskToken, result);
}
public void failActivity(byte[] taskToken, Exception failure) {
completionClient.completeExceptionally(taskToken, failure);
}