Namespace: activity
This package's main export is Context. Get the current Activity's context with
Context.current():
import { Context } from '@temporalio/activity';
export async function myActivity() {
const context = Context.current();
}
Any function can be used as an Activity as long as its parameters and return value are serializable using a DataConverter.
Cancellation​
Activity Cancellation:
- lets the Activity know it doesn't need to keep doing work, and
- gives the Activity time to clean up any resources it has created.
Activities can only receive Cancellation if they emit heartbeats or are Local Activities (which can't heartbeat but receive Cancellation anyway).
An Activity may receive Cancellation if:
- The Workflow scope containing the Activity call was requested to be Cancelled and
ActivityOptions.cancellationType was not set to ActivityCancellationType.ABANDON. The scope can
be cancelled in either of the following ways:
- The entire Workflow was Cancelled (via WorkflowHandle.cancel).
- Calling CancellationScope.cancel) from inside a Workflow.
- The Worker has started to shut down. Shutdown is initiated by either:
- One of the RuntimeOptions.shutdownSignals was sent to the process.
- Worker.shutdown |
Worker.shutdown()was called.
- The Activity was considered failed by the Server because any of the Activity timeouts have triggered (for example,
the Server didn't receive a heartbeat within the ActivityOptions.heartbeatTimeout). The
CancelledFailure will have
message: 'TIMED_OUT'. - An Activity sends a heartbeat with
Context.current().heartbeat()and the heartbeat details can't be converted by the Worker's configured DataConverter. - The Workflow Run reached a Closed state, in which case the
CancelledFailure will have
message: 'NOT_FOUND'.
The reason for the Cancellation is available at CancelledFailure.message or Context.cancellationSignal.reason.
Activity implementations should opt-in and subscribe to cancellation using one of the following methods:
awaitonContext.current().cancelledorContext.current().sleep(), which each throw a CancelledFailure.- Pass the context's
AbortSignalatContext.current().cancellationSignalto a library that supports it.
Examples​
An Activity that sends progress heartbeats and can be Cancelled​
import { activityInfo, log, sleep, CancelledFailure, heartbeat } from '@temporalio/activity';
export async function fakeProgress(sleepIntervalMs = 1000): Promise<void> {
try {
// allow for resuming from heartbeat
const startingPoint = activityInfo().heartbeatDetails || 1;
log.info('Starting activity at progress', { startingPoint });
for (let progress = startingPoint; progress <= 100; ++progress) {
// simple utility to sleep in activity for given interval or throw if Activity is cancelled
// don't confuse with Workflow.sleep which is only used in Workflow functions!
log.info('Progress', { progress });
await sleep(sleepIntervalMs);
heartbeat(progress);
}
} catch (err) {
if (err instanceof CancelledFailure) {
log.warn('Fake progress activity cancelled', { message: err.message });
// Cleanup
}
throw err;
}
}
An Activity that makes a cancellable HTTP request​
It passes the AbortSignal to fetch: fetch(url, { signal: Context.current().cancellationSignal }).
import fetch from 'node-fetch';
import { cancellationSignal, heartbeat } from '@temporalio/activity';
import type { AbortSignal as FetchAbortSignal } from 'node-fetch/externals';
export async function cancellableFetch(url: string): Promise<Uint8Array> {
const response = await fetch(url, { signal: cancellationSignal() as FetchAbortSignal });
const contentLengthHeader = response.headers.get('Content-Length');
if (contentLengthHeader === null) {
throw new Error('expected Content-Length header to be set');
}
const contentLength = parseInt(contentLengthHeader);
let bytesRead = 0;
const chunks: Buffer[] = [];
for await (const chunk of response.body) {
if (!(chunk instanceof Buffer)) {
throw new TypeError('Expected Buffer');
}
bytesRead += chunk.length;
chunks.push(chunk);
heartbeat(bytesRead / contentLength);
}
return Buffer.concat(chunks);
}
Classes​
Interfaces​
References​
ActivityFunction​
Re-exports ActivityFunction
ActivityInterface​
Re-exports ActivityInterface
ApplicationFailure​
Re-exports ApplicationFailure
CancelledFailure​
Re-exports CancelledFailure
UntypedActivities​
Re-exports UntypedActivities
Variables​
asyncLocalStorage​
• Const asyncLocalStorage: AsyncLocalStorage<Context>
log​
• Const log: Logger
The logger for this Activity.
This is a shortcut for Context.current().log (see Context.log).
metricMeter​
• Const metricMeter: MetricMeter
Get the metric meter for the current activity, with activity-specific tags.
To add custom tags, register a ActivityOutboundCallsInterceptor that
intercepts the getMetricTags() method.
This is a shortcut for Context.current().metricMeter (see Context.metricMeter).
Functions​
activityInfo​
â–¸ activityInfo(): Info
The current Activity's context.
Returns​
cancellationDetails​
â–¸ cancellationDetails(): ActivityCancellationDetails | undefined
Return the cancellation details for this activity, if any.
Returns​
ActivityCancellationDetails | undefined
an object with boolean properties that describes the reason for cancellation, or undefined if not cancelled.
Activity cancellation details include usage of experimental features such as activity pause, and may be subject to change.
cancellationSignal​
â–¸ cancellationSignal(): AbortSignal
Return an AbortSignal that can be used to
react to Activity cancellation.
This can be passed in to libraries such as fetch to abort an in-progress request and child_process to abort a child process, as well as other built-in node modules and modules found on npm.
Note that to get notified of cancellation, an activity must also do Context.heartbeat.
This is a shortcut for Context.current().cancellationSignal (see Context.cancellationSignal).
Returns​
AbortSignal
cancelled​
â–¸ cancelled(): Promise<never>
Return a Promise that fails with a CancelledFailure when cancellation of this activity is requested. The promise is guaranteed to never successfully resolve. Await this promise in an Activity to get notified of cancellation.
Note that to get notified of cancellation, an activity must also do Context.heartbeat.
This is a shortcut for Context.current().cancelled (see Context.cancelled).
Returns​
Promise<never>
getClient​
â–¸ getClient(): Client
A Temporal Client, bound to the same Temporal Namespace as the Worker executing this Activity.
May throw an IllegalStateError if the Activity is running inside a MockActivityEnvironment
that was created without a Client.
This is a shortcut for Context.current().client (see Context.client).
Client support over NativeConnection is experimental. Error handling may be
incomplete or different from what would be observed using a Connection
instead. Client doesn't support cancellation through a Signal.
Returns​
heartbeat​
â–¸ heartbeat(details?): void
Send a heartbeat from an Activity.
If an Activity times out, then during the next retry, the last value of details is available at
Info.heartbeatDetails. This acts as a periodic checkpoint mechanism for the progress of an Activity.
If an Activity times out on the final retry (relevant in cases in which RetryPolicy.maximumAttempts is
set), the Activity function call in the Workflow code will throw an ActivityFailure with the cause
attribute set to a TimeoutFailure, which has the last value of details available at
TimeoutFailure.lastHeartbeatDetails.
This is a shortcut for Context.current().heatbeat(ms) (see Context.heartbeat).
Parameters​
| Name | Type |
|---|---|
details? | unknown |
Returns​
void
sleep​
â–¸ sleep(ms): Promise<void>
Helper function for sleeping in an Activity.
This is a shortcut for Context.current().sleep(ms) (see Context.sleep).
Parameters​
| Name | Type | Description |
|---|---|---|
ms | Duration | Sleep duration: number of milliseconds or ms-formatted string |
Returns​
Promise<void>
A Promise that either resolves when ms is reached or rejects when the Activity is cancelled