Skip to content

Worker Implementation

This guide explains how to implement workers using temporal-contract.

Overview

The @temporal-contract/worker package provides functions for implementing Temporal workers with full type safety:

  1. declareActivitiesHandler - Implements all activities (global + workflow-specific)
  2. declareWorkflow - Implements individual workflows with typed context

Workflow Execution Flow

Activities Handler

Create a handler for all activities using the Result/Future pattern:

typescript
import { declareActivitiesHandler, ActivityError } from "@temporal-contract/worker/activity";
import { Future, Result } from "@swan-io/boxed";
import { myContract } from "./contract";

export const activities = declareActivitiesHandler({
  contract: myContract,
  activities: {
    // Global activities - use Future/Result for explicit error handling
    sendEmail: ({ to, subject, body }) => {
      return Future.fromPromise(emailService.send({ to, subject, body }))
        .mapError(
          (error) =>
            new ActivityError(
              "EMAIL_FAILED",
              error instanceof Error ? error.message : "Failed to send email",
              error,
            ),
        )
        .mapOk(() => ({ sent: true }));
    },

    // Workflow-specific activities
    processPayment: ({ customerId, amount }) => {
      return Future.fromPromise(paymentGateway.charge(customerId, amount))
        .mapError(
          (error) =>
            new ActivityError(
              "PAYMENT_FAILED",
              error instanceof Error ? error.message : "Payment failed",
              error,
            ),
        )
        .mapOk((txId) => ({ transactionId: txId, success: true }));
    },
  },
});

Workflow Implementation

Implement workflows with typed context. Activities called from workflows return plain values (Result is unwrapped internally):

typescript
import { declareWorkflow } from "@temporal-contract/worker/workflow";
import { myContract } from "./contract";

export const processOrder = declareWorkflow({
  workflowName: "processOrder",
  contract: myContract,
  implementation: async ({ activities }, input) => {
    // activities is fully typed
    // Activities return plain values (Result is unwrapped by the framework)
    const payment = await activities.processPayment({
      customerId: input.customerId,
      amount: 100,
    });

    await activities.sendEmail({
      to: input.customerId,
      subject: "Order Confirmed",
      body: "Your order has been processed",
    });

    // Return plain object (not Result)
    return {
      status: payment.success ? "success" : "failed",
      transactionId: payment.transactionId,
    };
  },
});

Worker Setup

Set up the Temporal worker:

typescript
import { Worker } from "@temporalio/worker";
import { activities } from "./activities";

const worker = await Worker.create({
  workflowsPath: require.resolve("./workflows"),
  activities,
  taskQueue: "my-task-queue", // or myContract.taskQueue
});

await worker.run();

Type Safety Features

Input Validation

All activity and workflow inputs are automatically validated:

typescript
// ✅ Valid - matches schema
await context.activities.processPayment({
  customerId: "CUST-123",
  amount: 100,
});

// ❌ Invalid - throws validation error
await context.activities.processPayment({
  customerId: 123, // Should be string
  amount: -10, // Should be positive
});

Output Validation

Return values are validated against output schemas:

typescript
// ✅ Valid
return { transactionId: "TXN-123", success: true };

// ❌ Invalid - TypeScript error + runtime validation
return { txId: "TXN-123" }; // Wrong field name

Typed Context

The workflow context is fully typed based on your contract:

typescript
implementation: async ({ activities }, input) => {
  // TypeScript knows all available activities
  activities.processPayment; // ✅ Available
  activities.unknownActivity; // ❌ TypeScript error

  // Full autocomplete for parameters
  await activities.processPayment({
    // IDE shows: customerId: string, amount: number
  });
};

Child Workflows

Execute child workflows with type-safe Future/Result pattern. Child workflows can be from the same contract or from a different contract (cross-worker communication).

Basic Usage

typescript
import { declareWorkflow } from "@temporal-contract/worker/workflow";
import { myContract, notificationContract } from "./contracts";

export const parentWorkflow = declareWorkflow({
  workflowName: "parentWorkflow",
  contract: myContract,
  implementation: async ({ executeChildWorkflow }, input) => {
    // Execute child workflow from same contract and wait for result
    const result = await executeChildWorkflow(myContract, "processPayment", {
      workflowId: `payment-${input.orderId}`,
      args: { amount: input.totalAmount },
    });

    result.match({
      Ok: (output) => console.log("Payment processed:", output),
      Error: (error) => console.error("Payment failed:", error),
    });

    return { success: true };
  },
});

Cross-Contract Child Workflows

Invoke child workflows from different contracts and workers:

typescript
export const orderWorkflow = declareWorkflow({
  workflowName: "processOrder",
  contract: orderContract,
  implementation: async ({ executeChildWorkflow }, input) => {
    // Process payment in same contract
    const paymentResult = await executeChildWorkflow(orderContract, "processPayment", {
      workflowId: `payment-${input.orderId}`,
      args: { amount: input.total },
    });

    if (paymentResult.isError()) {
      return { status: "failed", reason: "payment" };
    }

    // Send notification using another worker's contract
    const notificationResult = await executeChildWorkflow(
      notificationContract,
      "sendOrderConfirmation",
      {
        workflowId: `notify-${input.orderId}`,
        args: { orderId: input.orderId, email: input.customerEmail },
      },
    );

    return {
      status: "completed",
      transactionId: paymentResult.value.transactionId,
    };
  },
});

Start Without Waiting

Use startChildWorkflow to start a child workflow without waiting for its result:

typescript
export const orderWorkflow = declareWorkflow({
  workflowName: "processOrder",
  contract: myContract,
  implementation: async ({ startChildWorkflow }, input) => {
    // Start background notification workflow
    const handleResult = await startChildWorkflow(notificationContract, "sendEmail", {
      workflowId: `email-${input.orderId}`,
      args: { to: input.customerEmail, subject: "Order received" },
    });

    handleResult.match({
      Ok: async (handle) => {
        // Child workflow started successfully
        // Can wait for result later if needed
        const result = await handle.result();
      },
      Error: (error) => {
        console.error("Failed to start notification:", error);
      },
    });

    return { success: true };
  },
});

Error Handling

Child workflow errors are returned as ChildWorkflowError:

typescript
const result = await context.executeChildWorkflow(myContract, "processPayment", {
  workflowId: "payment-123",
  args: { amount: 100 },
});

result.match({
  Ok: (output) => {
    // Child workflow completed successfully
    console.log("Transaction ID:", output.transactionId);
  },
  Error: (error) => {
    // Handle child workflow errors
    if (error instanceof ChildWorkflowNotFoundError) {
      console.error("Workflow not found in contract");
    } else {
      console.error("Child workflow failed:", error.message);
    }
  },
});

Best Practices

1. Separate Activity Files

Organize activities by domain:

typescript
// activities/payment.ts
export const paymentActivities = {
  processPayment: async ({ customerId, amount }) => {
    /* ... */
  },
  refundPayment: async ({ transactionId }) => {
    /* ... */
  },
};

// activities/email.ts
export const emailActivities = {
  sendEmail: async ({ to, subject, body }) => {
    /* ... */
  },
};

// activities/index.ts
import { declareActivitiesHandler } from "@temporal-contract/worker/activity";
import { paymentActivities } from "./payment";
import { emailActivities } from "./email";

export const activities = declareActivitiesHandler({
  contract: myContract,
  activities: {
    ...paymentActivities,
    ...emailActivities,
  },
});

2. Use Dependency Injection

Make activities testable:

typescript
export const createActivities = (services: {
  emailService: EmailService;
  paymentGateway: PaymentGateway;
}) =>
  declareActivitiesHandler({
    contract: myContract,
    activities: {
      sendEmail: async ({ to, subject, body }) => {
        await services.emailService.send({ to, subject, body });
        return { sent: true };
      },
      processPayment: async ({ customerId, amount }) => {
        const txId = await services.paymentGateway.charge(customerId, amount);
        return { transactionId: txId, success: true };
      },
    },
  });

3. Error Handling

Activities use the Future/Result pattern for explicit error handling:

typescript
import { declareActivitiesHandler, ActivityError } from "@temporal-contract/worker/activity";
import { Future, Result } from "@swan-io/boxed";

export const activities = declareActivitiesHandler({
  contract: myContract,
  activities: {
    processPayment: ({ customerId, amount }) => {
      return Future.fromPromise(paymentGateway.charge(customerId, amount))
        .mapError((error) => {
          // Wrap technical errors in ActivityError
          // This enables proper retry policies and error handling
          return new ActivityError(
            "PAYMENT_FAILED",
            error instanceof Error ? error.message : "Payment failed",
            error,
          );
        })
        .mapOk((txId) => ({ transactionId: txId, success: true }));
    },
  },
});

In workflows, activities return plain values. If an activity fails, it will throw an error that can be caught:

typescript
export const processOrder = declareWorkflow({
  workflowName: "processOrder",
  contract: myContract,
  implementation: async ({ activities }, input) => {
    try {
      // Activity returns plain value if successful
      const payment = await activities.processPayment({
        customerId: input.customerId,
        amount: 100,
      });

      return {
        status: "success",
        transactionId: payment.transactionId,
      };
    } catch (error) {
      // Activity errors are thrown and can be caught
      console.error("Payment failed:", error);

      return {
        status: "failed",
        transactionId: "",
      };
    }
  },
});

See Also

Released under the MIT License.