APM

>Agent Skill

@microsoft/duroxide-node-orchestrations

skilldata

Writing durable workflows in JavaScript using duroxide-node. Use when creating orchestrations, activities, writing tests, or when the user mentions generator workflows, yield patterns, or duroxide-node development.

javascriptrustapi-design
apm::install
$apm install @microsoft/duroxide-node-orchestrations
apm::skill.md
---
name: duroxide-node-orchestrations
description: Writing durable workflows in JavaScript using duroxide-node. Use when creating orchestrations, activities, writing tests, or when the user mentions generator workflows, yield patterns, or duroxide-node development.
---

# Duroxide-Node Orchestration Development

## Core Rule: Yield vs Await

| Context | Syntax | Why |
|---------|--------|-----|
| Orchestrations | `function*` + `yield` | Rust replay engine needs step-by-step control |
| Activities | `async function` + `await` | Run once, result cached — no replay constraints |
| Orchestration tracing | Direct call (no yield) | Fire-and-forget, delegates to Rust |

```javascript
// ✅ Orchestration
runtime.registerOrchestration('MyWorkflow', function* (ctx, input) {
  ctx.traceInfo('starting');                                // no yield
  const result = yield ctx.scheduleActivity('Work', input); // yield
  return result;
});

// ✅ Activity
runtime.registerActivity('Work', async (ctx, input) => {
  ctx.traceInfo(`processing ${input}`);                     // no yield
  const data = await fetch(input.url);                      // await is fine
  return data;
});
```

**Never use `async function*` for orchestrations** — async generators break the replay model.

## Orchestration Context API

### Scheduling (MUST yield)

```javascript
function* (ctx, input) {
  // Activity
  const result = yield ctx.scheduleActivity('Name', input);

  // Activity with retry
  const result = yield ctx.scheduleActivityWithRetry('Name', input, {
    maxAttempts: 3,
    backoff: 'exponential',
    timeoutMs: 5000,
    totalTimeoutMs: 30000,
  });

  // Timer (durable delay)
  yield ctx.scheduleTimer(60000); // 1 minute

  // External event
  const eventData = yield ctx.waitForEvent('approval');

  // Sub-orchestration (waits for completion)
  const childResult = yield ctx.scheduleSubOrchestration('Child', childInput);

  // Sub-orchestration with explicit ID
  const childResult = yield ctx.scheduleSubOrchestrationWithId('Child', 'child-1', childInput);

  // Fire-and-forget orchestration (returns immediately)
  yield ctx.startOrchestration('BackgroundWork', 'bg-1', bgInput);

  // Deterministic values
  const now = yield ctx.utcNow();      // timestamp in ms
  const guid = yield ctx.newGuid();    // deterministic UUID

  // Continue as new (restart with fresh history)
  yield ctx.continueAsNew(newInput);
}
```

### Composition (MUST yield)

```javascript
// Fan-out / fan-in — wait for ALL tasks (supports all task types)
const results = yield ctx.all([
  ctx.scheduleActivity('TaskA', inputA),
  ctx.scheduleActivity('TaskB', inputB),
  ctx.scheduleTimer(5000),                    // timers work too
  ctx.waitForEvent('approval'),               // waits work too
]);
// results = [resultA, resultB, { ok: null }, { ok: eventData }]

// Race — wait for FIRST of 2 tasks (supports all task types)
const winner = yield ctx.race(
  ctx.scheduleActivity('FastService', input),
  ctx.scheduleTimer(5000)
);
// winner = { index: 0|1, value: ... }
```

**`ctx.race()` supports exactly 2 tasks** (maps to Rust `select2`). Nesting `all()`/`race()` inside `all()` or `race()` is **not supported** — the runtime rejects it.

### Cooperative Activity Cancellation

```javascript
runtime.registerActivity('LongTask', async (ctx, input) => {
  for (let i = 0; i < 1000; i++) {
    if (ctx.isCancelled()) {
      ctx.traceInfo('cancelled, cleaning up');
      return { status: 'cancelled' };
    }
    await doChunk(i);
  }
  return { status: 'done' };
});
```

`ctx.isCancelled()` checks whether the orchestration no longer needs the activity result (e.g., lost a race). Detection latency is `workerLockTimeoutMs / 2` (default 30s → ~15s).

### Tracing (NO yield — fire-and-forget)

```javascript
ctx.traceInfo('message');   // suppressed during replay automatically
ctx.traceWarn('message');
ctx.traceError('message');
ctx.traceDebug('message');
```

Tracing delegates to the Rust `OrchestrationContext` which has the `is_replaying` guard. **Do not use `console.log()`** in orchestrations — it will duplicate on replay.

## Activity Context API

```javascript
runtime.registerActivity('MyActivity', async (ctx, input) => {
  // Available fields
  ctx.instanceId;
  ctx.executionId;
  ctx.orchestrationName;
  ctx.orchestrationVersion;
  ctx.activityName;
  ctx.workerId;

  // Cooperative cancellation (check if orchestration no longer needs this result)
  if (ctx.isCancelled()) {
    ctx.traceInfo('cancelled');
    return { status: 'cancelled' };
  }

  // Tracing (delegates to Rust ActivityContext — full structured fields)
  ctx.traceInfo(`processing ${input.id}`);
  ctx.traceWarn('slow response');
  ctx.traceError('connection failed');
  ctx.traceDebug('raw payload: ...');

  // Activities can do anything — I/O, HTTP, DB, etc.
  const data = await fetch(input.url);
  return data;
});
```

## Determinism Rules

Orchestrations **must be deterministic** — the replay engine re-executes from the beginning on every dispatch:

| ✅ Safe | ❌ Breaks Replay |
|---------|-----------------|
| `yield ctx.utcNow()` | `Date.now()` |
| `yield ctx.newGuid()` | `crypto.randomUUID()` |
| `ctx.traceInfo()` | `console.log()` (duplicates) |
| `yield ctx.scheduleTimer(ms)` | `setTimeout()` / `await sleep()` |
| Pure computation, conditionals | `fetch()`, file I/O, DB queries |
| `JSON.parse()`, `JSON.stringify()` | `process.env.X` (may change) |

**All I/O belongs in activities**, not orchestrations.

## Common Patterns

### Error Handling

```javascript
function* (ctx, input) {
  try {
    const result = yield ctx.scheduleActivity('RiskyOp', input);
    return result;
  } catch (error) {
    ctx.traceError(`failed: ${error.message}`);
    yield ctx.scheduleActivity('Cleanup', { error: error.message });
    return { status: 'failed' };
  }
}
```

### Eternal Orchestration (continue-as-new)

```javascript
function* (ctx, input) {
  const state = input.state || { iteration: 0 };

  // Do periodic work
  const health = yield ctx.scheduleActivity('CheckHealth', input.target);
  ctx.traceInfo(`check #${state.iteration}: ${health.status}`);

  // Wait
  yield ctx.scheduleTimer(30000);

  // Restart with updated state (prevents unbounded history)
  yield ctx.continueAsNew({
    target: input.target,
    state: { iteration: state.iteration + 1 },
  });
}
```

### Race with Timeout

```javascript
function* (ctx, input) {
  const winner = yield ctx.race(
    ctx.scheduleActivity('SlowOperation', input),
    ctx.scheduleTimer(10000)
  );

  if (winner.index === 1) {
    ctx.traceWarn('operation timed out');
    return { status: 'timeout' };
  }
  return { status: 'ok', result: winner.value };
}
```

### Fire-and-Forget + Cleanup on Failure

```javascript
function* (ctx, input) {
  try {
    yield ctx.scheduleActivity('ProvisionResource', input);
    yield ctx.scheduleActivity('ConfigureResource', input);

    // Launch background monitor (runs independently)
    yield ctx.startOrchestration('ResourceMonitor', `monitor-${input.id}`, {
      resourceId: input.id,
    });

    return { status: 'created' };
  } catch (error) {
    ctx.traceError(`provisioning failed: ${error.message}`);
    yield ctx.scheduleActivity('DeleteResource', { id: input.id });
    throw error;
  }
}
```

### Polling Loop (Activity-Level)

```javascript
runtime.registerActivity('WaitForReady', async (ctx, input) => {
  for (let i = 0; i < input.maxAttempts; i++) {
    ctx.traceInfo(`poll attempt ${i + 1}`);
    const status = await checkStatus(input.resourceId);
    if (status === 'ready') return { ready: true };
    await new Promise(r => setTimeout(r, input.intervalMs));
  }
  throw new Error(`not ready after ${input.maxAttempts} attempts`);
});
```

### Versioned Orchestrations

```javascript
// Register multiple versions — old for running instances, new for future
runtime.registerOrchestration('MyWorkflow', function* (ctx, input) {
  // v1.0.0
  ctx.traceInfo('[v1.0.0] starting');
  return yield ctx.scheduleActivity('Work', input);
});

runtime.registerOrchestrationVersioned('MyWorkflow', '1.0.1', function* (ctx, input) {
  // v1.0.1 — added validation step
  ctx.traceInfo('[v1.0.1] starting');
  yield ctx.scheduleActivity('Validate', input);
  return yield ctx.scheduleActivity('Work', input);
});
```

## Writing Tests

Tests use Node.js built-in test runner (`node:test`):

```javascript
const { describe, it } = require('node:test');
const assert = require('node:assert');
const { SqliteProvider, Client, Runtime } = require('../lib/duroxide');

describe('My Feature', () => {
  it('should do something', async () => {
    const provider = await SqliteProvider.inMemory();
    const client = new Client(provider);
    const runtime = new Runtime(provider);

    runtime.registerActivity('MyActivity', async (ctx, input) => {
      return `result-${input}`;
    });

    runtime.registerOrchestration('MyWorkflow', function* (ctx, input) {
      return yield ctx.scheduleActivity('MyActivity', input);
    });

    await runtime.start();

    await client.startOrchestration('test-1', 'MyWorkflow', 'hello');
    const result = await client.waitForOrchestration('test-1');

    assert.strictEqual(result.status, 'Completed');
    assert.strictEqual(result.output, 'result-hello');

    await runtime.shutdown(100);
  });
});
```

### Test Commands

```bash
npm test                    # PostgreSQL e2e (24 tests + 1 SQLite smoketest)
npm run test:races          # Race/join composition tests (7 tests)
npm run test:admin          # Admin API tests (14 tests)
npm run test:scenarios      # Scenario tests (6 tests)
npm run test:all            # Everything (52 tests)
```

### Test Tips

- Use `SqliteProvider.inMemory()` for fast isolated tests (SQLite smoketest only)
- All PG tests need `DATABASE_URL` in `.env` (loaded by `dotenv`)
- Each test file uses a separate PG schema for isolation
- Use short `runtime.shutdown(100)` timeout — it waits the full duration
- Set `RUST_LOG=info` to see traces in test output
- Use `workerLockTimeoutMs: 2000` in tests needing fast activity cancellation detection

## Client API

```javascript
const client = new Client(provider);

// Start orchestration
await client.startOrchestration('instance-1', 'WorkflowName', inputData);
await client.startOrchestrationVersioned('instance-1', 'WorkflowName', inputData, '1.0.1');

// Wait for completion
const result = await client.waitForOrchestration('instance-1', 30000);
// result.status: 'Completed' | 'Failed' | 'Running' | ...
// result.output: parsed JSON output

// Raise event (for waitForEvent)
await client.raiseEvent('instance-1', 'approval', { approved: true });

// Cancel
await client.cancelInstance('instance-1', 'no longer needed');

// Metrics
const metrics = await client.getSystemMetrics();
const depths = await client.getQueueDepths();
```

## Logging Control

```bash
RUST_LOG=info node app.js                           # All INFO
RUST_LOG=duroxide::orchestration=debug node app.js   # Orchestration DEBUG
RUST_LOG=duroxide::activity=info node app.js         # Activity INFO only
```

Traces include structured fields automatically:
- **Orchestration**: `instance_id`, `execution_id`, `orchestration_name`, `orchestration_version`
- **Activity**: above + `activity_name`, `activity_id`, `worker_id`