Skip to content

Worker Implementation

This guide explains how to implement workers using temporal-contract.

Overview

The @temporal-contract/worker package provides functions for implementing Temporal workers with full type safety:

  1. declareActivitiesHandler - Implements all activities (global + workflow-specific)
  2. declareWorkflow - Implements individual workflows with typed context

Workflow Execution Flow

Activities Handler

Create a handler for all activities using the Result/Future pattern:

typescript
import { declareActivitiesHandler, ActivityError } from '@temporal-contract/worker/activity';
import { Future, Result } from '@swan-io/boxed';
import { myContract } from './contract';

export const activities = declareActivitiesHandler({
  contract: myContract,
  activities: {
    // Global activities - use Future/Result for explicit error handling
    sendEmail: ({ to, subject, body }) => {
      return Future.fromPromise(emailService.send({ to, subject, body }))
        .mapError((error) =>
          new ActivityError(
            'EMAIL_FAILED',
            error instanceof Error ? error.message : 'Failed to send email',
            error
          )
        )
        .mapOk(() => ({ sent: true }));
    },

    // Workflow-specific activities
    processPayment: ({ customerId, amount }) => {
      return Future.fromPromise(paymentGateway.charge(customerId, amount))
        .mapError((error) =>
          new ActivityError(
            'PAYMENT_FAILED',
            error instanceof Error ? error.message : 'Payment failed',
            error
          )
        )
        .mapOk((txId) => ({ transactionId: txId, success: true }));
    },
  },
});

Workflow Implementation

Implement workflows with typed context. Activities called from workflows return plain values (Result is unwrapped internally):

typescript
import { declareWorkflow } from '@temporal-contract/worker/workflow';
import { myContract } from './contract';

export const processOrder = declareWorkflow({
  workflowName: 'processOrder',
  contract: myContract,
  implementation: async ({ activities }, input) => {
    // activities is fully typed
    // Activities return plain values (Result is unwrapped by the framework)
    const payment = await activities.processPayment({
      customerId: input.customerId,
      amount: 100
    });

    await activities.sendEmail({
      to: input.customerId,
      subject: 'Order Confirmed',
      body: 'Your order has been processed'
    });

    // Return plain object (not Result)
    return {
      status: payment.success ? 'success' : 'failed',
      transactionId: payment.transactionId
    };
  },
});

Worker Setup

Set up the Temporal worker:

typescript
import { Worker } from '@temporalio/worker';
import { activities } from './activities';

const worker = await Worker.create({
  workflowsPath: require.resolve('./workflows'),
  activities,
  taskQueue: 'my-task-queue', // or myContract.taskQueue
});

await worker.run();

Type Safety Features

Input Validation

All activity and workflow inputs are automatically validated:

typescript
// ✅ Valid - matches schema
await context.activities.processPayment({
  customerId: 'CUST-123',
  amount: 100
});

// ❌ Invalid - throws validation error
await context.activities.processPayment({
  customerId: 123,  // Should be string
  amount: -10       // Should be positive
});

Output Validation

Return values are validated against output schemas:

typescript
// ✅ Valid
return { transactionId: 'TXN-123', success: true };

// ❌ Invalid - TypeScript error + runtime validation
return { txId: 'TXN-123' };  // Wrong field name

Typed Context

The workflow context is fully typed based on your contract:

typescript
implementation: async ({ activities }, input) => {
  // TypeScript knows all available activities
  activities.processPayment  // ✅ Available
  activities.unknownActivity // ❌ TypeScript error

  // Full autocomplete for parameters
  await activities.processPayment({
    // IDE shows: customerId: string, amount: number
  });
}

Child Workflows

Execute child workflows with type-safe Future/Result pattern. Child workflows can be from the same contract or from a different contract (cross-worker communication).

Basic Usage

typescript
import { declareWorkflow } from '@temporal-contract/worker/workflow';
import { myContract, notificationContract } from './contracts';

export const parentWorkflow = declareWorkflow({
  workflowName: 'parentWorkflow',
  contract: myContract,
  implementation: async ({ executeChildWorkflow }, input) => {
    // Execute child workflow from same contract and wait for result
    const result = await executeChildWorkflow(myContract, 'processPayment', {
      workflowId: `payment-${input.orderId}`,
      args: { amount: input.totalAmount }
    });

    result.match({
      Ok: (output) => console.log('Payment processed:', output),
      Error: (error) => console.error('Payment failed:', error),
    });

    return { success: true };
  },
});

Cross-Contract Child Workflows

Invoke child workflows from different contracts and workers:

typescript
export const orderWorkflow = declareWorkflow({
  workflowName: 'processOrder',
  contract: orderContract,
  implementation: async ({ executeChildWorkflow }, input) => {
    // Process payment in same contract
    const paymentResult = await executeChildWorkflow(
      orderContract,
      'processPayment',
      {
        workflowId: `payment-${input.orderId}`,
        args: { amount: input.total }
      }
    );

    if (paymentResult.isError()) {
      return { status: 'failed', reason: 'payment' };
    }

    // Send notification using another worker's contract
    const notificationResult = await executeChildWorkflow(
      notificationContract,
      'sendOrderConfirmation',
      {
        workflowId: `notify-${input.orderId}`,
        args: { orderId: input.orderId, email: input.customerEmail }
      }
    );

    return {
      status: 'completed',
      transactionId: paymentResult.value.transactionId
    };
  },
});

Start Without Waiting

Use startChildWorkflow to start a child workflow without waiting for its result:

typescript
export const orderWorkflow = declareWorkflow({
  workflowName: 'processOrder',
  contract: myContract,
  implementation: async ({ startChildWorkflow }, input) => {
    // Start background notification workflow
    const handleResult = await startChildWorkflow(
      notificationContract,
      'sendEmail',
      {
        workflowId: `email-${input.orderId}`,
        args: { to: input.customerEmail, subject: 'Order received' }
      }
    );

    handleResult.match({
      Ok: async (handle) => {
        // Child workflow started successfully
        // Can wait for result later if needed
        const result = await handle.result();
      },
      Error: (error) => {
        console.error('Failed to start notification:', error);
      },
    });

    return { success: true };
  },
});

Error Handling

Child workflow errors are returned as ChildWorkflowError:

typescript
const result = await context.executeChildWorkflow(myContract, 'processPayment', {
  workflowId: 'payment-123',
  args: { amount: 100 }
});

result.match({
  Ok: (output) => {
    // Child workflow completed successfully
    console.log('Transaction ID:', output.transactionId);
  },
  Error: (error) => {
    // Handle child workflow errors
    if (error instanceof ChildWorkflowNotFoundError) {
      console.error('Workflow not found in contract');
    } else {
      console.error('Child workflow failed:', error.message);
    }
  },
});

Best Practices

1. Separate Activity Files

Organize activities by domain:

typescript
// activities/payment.ts
export const paymentActivities = {
  processPayment: async ({ customerId, amount }) => { /* ... */ },
  refundPayment: async ({ transactionId }) => { /* ... */ }
};

// activities/email.ts
export const emailActivities = {
  sendEmail: async ({ to, subject, body }) => { /* ... */ }
};

// activities/index.ts
import { declareActivitiesHandler } from '@temporal-contract/worker/activity';
import { paymentActivities } from './payment';
import { emailActivities } from './email';

export const activities = declareActivitiesHandler({
  contract: myContract,
  activities: {
    ...paymentActivities,
    ...emailActivities
  }
});

2. Use Dependency Injection

Make activities testable:

typescript
export const createActivities = (services: {
  emailService: EmailService;
  paymentGateway: PaymentGateway;
}) => declareActivitiesHandler({
  contract: myContract,
  activities: {
    sendEmail: async ({ to, subject, body }) => {
      await services.emailService.send({ to, subject, body });
      return { sent: true };
    },
    processPayment: async ({ customerId, amount }) => {
      const txId = await services.paymentGateway.charge(customerId, amount);
      return { transactionId: txId, success: true };
    }
  }
});

3. Error Handling

Activities use the Future/Result pattern for explicit error handling:

typescript
import { declareActivitiesHandler, ActivityError } from '@temporal-contract/worker/activity';
import { Future, Result } from '@swan-io/boxed';

export const activities = declareActivitiesHandler({
  contract: myContract,
  activities: {
    processPayment: ({ customerId, amount }) => {
      return Future.fromPromise(paymentGateway.charge(customerId, amount))
        .mapError((error) => {
          // Wrap technical errors in ActivityError
          // This enables proper retry policies and error handling
          return new ActivityError(
            'PAYMENT_FAILED',
            error instanceof Error ? error.message : 'Payment failed',
            error
          );
        })
        .mapOk((txId) => ({ transactionId: txId, success: true }));
    }
  }
});

In workflows, activities return plain values. If an activity fails, it will throw an error that can be caught:

typescript
export const processOrder = declareWorkflow({
  workflowName: 'processOrder',
  contract: myContract,
  implementation: async ({ activities }, input) => {
    try {
      // Activity returns plain value if successful
      const payment = await activities.processPayment({
        customerId: input.customerId,
        amount: 100
      });

      return {
        status: 'success',
        transactionId: payment.transactionId
      };
    } catch (error) {
      // Activity errors are thrown and can be caught
      console.error('Payment failed:', error);

      return {
        status: 'failed',
        transactionId: ''
      };
    }
  }
});

See Also

Released under the MIT License.