Worker Implementation
This guide explains how to implement workers using temporal-contract.
Overview
The @temporal-contract/worker package provides functions for implementing Temporal workers with full type safety:
declareActivitiesHandler- Implements all activities (global + workflow-specific)declareWorkflow- Implements individual workflows with typed context
Workflow Execution Flow
Activities Handler
Create a handler for all activities using the Result/Future pattern:
import { declareActivitiesHandler, ActivityError } from "@temporal-contract/worker/activity";
import { Future, Result } from "@swan-io/boxed";
import { myContract } from "./contract";
export const activities = declareActivitiesHandler({
contract: myContract,
activities: {
// Global activities - use Future/Result for explicit error handling
sendEmail: ({ to, subject, body }) => {
return Future.fromPromise(emailService.send({ to, subject, body }))
.mapError(
(error) =>
new ActivityError(
"EMAIL_FAILED",
error instanceof Error ? error.message : "Failed to send email",
error,
),
)
.mapOk(() => ({ sent: true }));
},
// Workflow-specific activities
processPayment: ({ customerId, amount }) => {
return Future.fromPromise(paymentGateway.charge(customerId, amount))
.mapError(
(error) =>
new ActivityError(
"PAYMENT_FAILED",
error instanceof Error ? error.message : "Payment failed",
error,
),
)
.mapOk((txId) => ({ transactionId: txId, success: true }));
},
},
});Workflow Implementation
Implement workflows with typed context. Activities called from workflows return plain values (Result is unwrapped internally):
import { declareWorkflow } from "@temporal-contract/worker/workflow";
import { myContract } from "./contract";
export const processOrder = declareWorkflow({
workflowName: "processOrder",
contract: myContract,
implementation: async ({ activities }, input) => {
// activities is fully typed
// Activities return plain values (Result is unwrapped by the framework)
const payment = await activities.processPayment({
customerId: input.customerId,
amount: 100,
});
await activities.sendEmail({
to: input.customerId,
subject: "Order Confirmed",
body: "Your order has been processed",
});
// Return plain object (not Result)
return {
status: payment.success ? "success" : "failed",
transactionId: payment.transactionId,
};
},
});Worker Setup
Set up the Temporal worker:
import { Worker } from "@temporalio/worker";
import { activities } from "./activities";
const worker = await Worker.create({
workflowsPath: require.resolve("./workflows"),
activities,
taskQueue: "my-task-queue", // or myContract.taskQueue
});
await worker.run();Type Safety Features
Input Validation
All activity and workflow inputs are automatically validated:
// ✅ Valid - matches schema
await context.activities.processPayment({
customerId: "CUST-123",
amount: 100,
});
// ❌ Invalid - throws validation error
await context.activities.processPayment({
customerId: 123, // Should be string
amount: -10, // Should be positive
});Output Validation
Return values are validated against output schemas:
// ✅ Valid
return { transactionId: "TXN-123", success: true };
// ❌ Invalid - TypeScript error + runtime validation
return { txId: "TXN-123" }; // Wrong field nameTyped Context
The workflow context is fully typed based on your contract:
implementation: async ({ activities }, input) => {
// TypeScript knows all available activities
activities.processPayment; // ✅ Available
activities.unknownActivity; // ❌ TypeScript error
// Full autocomplete for parameters
await activities.processPayment({
// IDE shows: customerId: string, amount: number
});
};Child Workflows
Execute child workflows with type-safe Future/Result pattern. Child workflows can be from the same contract or from a different contract (cross-worker communication).
Basic Usage
import { declareWorkflow } from "@temporal-contract/worker/workflow";
import { myContract, notificationContract } from "./contracts";
export const parentWorkflow = declareWorkflow({
workflowName: "parentWorkflow",
contract: myContract,
implementation: async ({ executeChildWorkflow }, input) => {
// Execute child workflow from same contract and wait for result
const result = await executeChildWorkflow(myContract, "processPayment", {
workflowId: `payment-${input.orderId}`,
args: { amount: input.totalAmount },
});
result.match({
Ok: (output) => console.log("Payment processed:", output),
Error: (error) => console.error("Payment failed:", error),
});
return { success: true };
},
});Cross-Contract Child Workflows
Invoke child workflows from different contracts and workers:
export const orderWorkflow = declareWorkflow({
workflowName: "processOrder",
contract: orderContract,
implementation: async ({ executeChildWorkflow }, input) => {
// Process payment in same contract
const paymentResult = await executeChildWorkflow(orderContract, "processPayment", {
workflowId: `payment-${input.orderId}`,
args: { amount: input.total },
});
if (paymentResult.isError()) {
return { status: "failed", reason: "payment" };
}
// Send notification using another worker's contract
const notificationResult = await executeChildWorkflow(
notificationContract,
"sendOrderConfirmation",
{
workflowId: `notify-${input.orderId}`,
args: { orderId: input.orderId, email: input.customerEmail },
},
);
return {
status: "completed",
transactionId: paymentResult.value.transactionId,
};
},
});Start Without Waiting
Use startChildWorkflow to start a child workflow without waiting for its result:
export const orderWorkflow = declareWorkflow({
workflowName: "processOrder",
contract: myContract,
implementation: async ({ startChildWorkflow }, input) => {
// Start background notification workflow
const handleResult = await startChildWorkflow(notificationContract, "sendEmail", {
workflowId: `email-${input.orderId}`,
args: { to: input.customerEmail, subject: "Order received" },
});
handleResult.match({
Ok: async (handle) => {
// Child workflow started successfully
// Can wait for result later if needed
const result = await handle.result();
},
Error: (error) => {
console.error("Failed to start notification:", error);
},
});
return { success: true };
},
});Error Handling
Child workflow errors are returned as ChildWorkflowError:
const result = await context.executeChildWorkflow(myContract, "processPayment", {
workflowId: "payment-123",
args: { amount: 100 },
});
result.match({
Ok: (output) => {
// Child workflow completed successfully
console.log("Transaction ID:", output.transactionId);
},
Error: (error) => {
// Handle child workflow errors
if (error instanceof ChildWorkflowNotFoundError) {
console.error("Workflow not found in contract");
} else {
console.error("Child workflow failed:", error.message);
}
},
});Best Practices
1. Separate Activity Files
Organize activities by domain:
// activities/payment.ts
export const paymentActivities = {
processPayment: async ({ customerId, amount }) => {
/* ... */
},
refundPayment: async ({ transactionId }) => {
/* ... */
},
};
// activities/email.ts
export const emailActivities = {
sendEmail: async ({ to, subject, body }) => {
/* ... */
},
};
// activities/index.ts
import { declareActivitiesHandler } from "@temporal-contract/worker/activity";
import { paymentActivities } from "./payment";
import { emailActivities } from "./email";
export const activities = declareActivitiesHandler({
contract: myContract,
activities: {
...paymentActivities,
...emailActivities,
},
});2. Use Dependency Injection
Make activities testable:
export const createActivities = (services: {
emailService: EmailService;
paymentGateway: PaymentGateway;
}) =>
declareActivitiesHandler({
contract: myContract,
activities: {
sendEmail: async ({ to, subject, body }) => {
await services.emailService.send({ to, subject, body });
return { sent: true };
},
processPayment: async ({ customerId, amount }) => {
const txId = await services.paymentGateway.charge(customerId, amount);
return { transactionId: txId, success: true };
},
},
});3. Error Handling
Activities use the Future/Result pattern for explicit error handling:
import { declareActivitiesHandler, ActivityError } from "@temporal-contract/worker/activity";
import { Future, Result } from "@swan-io/boxed";
export const activities = declareActivitiesHandler({
contract: myContract,
activities: {
processPayment: ({ customerId, amount }) => {
return Future.fromPromise(paymentGateway.charge(customerId, amount))
.mapError((error) => {
// Wrap technical errors in ActivityError
// This enables proper retry policies and error handling
return new ActivityError(
"PAYMENT_FAILED",
error instanceof Error ? error.message : "Payment failed",
error,
);
})
.mapOk((txId) => ({ transactionId: txId, success: true }));
},
},
});In workflows, activities return plain values. If an activity fails, it will throw an error that can be caught:
export const processOrder = declareWorkflow({
workflowName: "processOrder",
contract: myContract,
implementation: async ({ activities }, input) => {
try {
// Activity returns plain value if successful
const payment = await activities.processPayment({
customerId: input.customerId,
amount: 100,
});
return {
status: "success",
transactionId: payment.transactionId,
};
} catch (error) {
// Activity errors are thrown and can be caught
console.error("Payment failed:", error);
return {
status: "failed",
transactionId: "",
};
}
},
});