APM

>Agent Skill

@microsoft/pyo3-interop

skilldevelopment

Rust↔Python interop architecture in duroxide-python. Use when modifying the PyO3 bridge, adding ScheduledTask types, fixing GIL deadlocks, changing tracing delegation, or debugging block_in_place / with_gil behavior.

pythonrust
apm::install
$apm install @microsoft/pyo3-interop
apm::skill.md
---
name: pyo3-interop
description: Rust↔Python interop architecture in duroxide-python. Use when modifying the PyO3 bridge, adding ScheduledTask types, fixing GIL deadlocks, changing tracing delegation, or debugging block_in_place / with_gil behavior.
---

# PyO3 Interop Architecture

## Overview

duroxide-python bridges Rust's duroxide runtime to Python via PyO3/maturin. The interop has two distinct paths — orchestrations (generator-based, synchronous blocking) and activities (synchronous GIL calls). Getting this wrong causes GIL deadlocks, silent replay corruption, or dropped futures.

## File Map

| File | Role |
|------|------|
| `src/handlers.rs` | Core interop — orchestration handler loop, activity invocation, global context maps, select/race/join, activity cancellation |
| `src/types.rs` | `ScheduledTask` enum — the protocol between Python and Rust |
| `src/lib.rs` | PyO3 module entry point, `#[pyfunction]` trace functions |
| `src/runtime.rs` | `PyRuntime` — wraps `duroxide::Runtime`, global tokio runtime |
| `src/client.rs` | `PyClient` — wraps `duroxide::Client`, all methods with `py.allow_threads()` |
| `src/provider.rs` | `PySqliteProvider` |
| `src/pg_provider.rs` | `PyPostgresProvider` |
| `python/duroxide/__init__.py` | Python wrapper: SqliteProvider, PostgresProvider, Client, Runtime, decorators |
| `python/duroxide/context.py` | OrchestrationContext, ActivityContext |
| `python/duroxide/driver.py` | Generator driver: create_generator, next_step, dispose_generator |

## ⚠️ Critical: GIL Deadlock Problem

This is the most important difference between duroxide-python and duroxide-node. **Get this wrong and the process deadlocks.**

### The Problem

PyO3 holds the GIL when Python calls into Rust `#[pymethods]`. If that method calls `TOKIO_RT.block_on()`, it blocks the thread while holding the GIL. Meanwhile, orchestration handlers running on tokio threads need the GIL via `Python::with_gil()`**deadlock**.

```
Thread A (Python → Rust):
  client.wait_for_orchestration()
    → PyO3 holds GIL
    → TOKIO_RT.block_on(async { ... })   ← BLOCKS, holding GIL

Thread B (Tokio → Python):
  orchestration handler invoked
    → block_in_place + Python::with_gil()  ← BLOCKS, waiting for GIL
```

### The Fix

EVERY method that calls `block_on` must use `py.allow_threads()` to release the GIL before blocking:

```rust
fn wait_for_orchestration(&self, py: Python<'_>, id: String, timeout: u64) -> PyResult<...> {
    py.allow_threads(|| {
        TOKIO_RT.block_on(async {
            self.client.wait_for_orchestration(&id, timeout).await
                .map_err(|e| format!("{e}"))
        })
    })
    .map_err(PyRuntimeError::new_err)
}
```

This pattern is applied to ALL 20+ methods in `client.rs` and `runtime.rs`.

### Error Handling Across the Boundary

`PyErr` is not `Send`, so you can't return `PyResult` from inside `allow_threads`. Pattern:
1. Inside `allow_threads`: map errors to `String` via `.map_err(|e| format!("{e}"))`
2. Outside `allow_threads`: map `String` to `PyErr` via `.map_err(PyRuntimeError::new_err)`

### Rules for ANY New Method

1. **Add `py: Python<'_>` parameter** to the method signature
2. **Wrap `TOKIO_RT.block_on()` in `py.allow_threads(|| { ... })`**
3. **Map errors to `String` inside, to `PyErr` outside**
4. **Never hold the GIL while blocking on tokio**

## Orchestration Interop (Blocking Generator Loop)

The replay engine calls `poll_once()` on the handler future. If the future isn't ready in one poll, it's **dropped**.

**Solution: `block_in_place` + `with_gil`**

```rust
fn call_create_blocking(&self, payload: String) -> Result<GeneratorStepResult, String> {
    tokio::task::block_in_place(|| {
        Python::with_gil(|py| {
            let result = self.create_fn.call1(py, (payload,))?;
            // parse result...
        })
    })
}
```

### Orchestration Handler Sequence

```
Rust (tokio thread)                         Python (GIL)
───────────────────                         ────────────────────
1. invoke(ctx, input)
   ├─ Store ctx in ORCHESTRATION_CTXS[instance_id]
   ├─ call_create_blocking(payload) ──────► create_generator(payload)
   │   (block_in_place + with_gil)            ├─ Create OrchestrationContext
   │                                          ├─ Create generator: fn(ctx, input)
   │                                          ├─ gen.send(None) → first yield
   │                                          └─ Return {"status": "yielded", "task": ...}
   │◄────────────────────────────────────────┘
   ├─ Loop:
   │   ├─ execute_task(ctx, task)           // Real DurableFuture or replay
   │   ├─ call_next_blocking(result) ──────► next_step(result)
   │   │   (block_in_place + with_gil)        ├─ gen.send(value) or gen.throw(exc)
   │   │                                      └─ Return next task or completion
   │   │◄────────────────────────────────────┘
   │   └─ If completed/error: break
   └─ Remove ctx from ORCHESTRATION_CTXS
```

## Activity Interop (Synchronous GIL Call)

Activities in duroxide-python are **synchronous** functions (unlike duroxide-node's async activities). They run on tokio threads via `block_in_place` + `with_gil`:

```
Rust                                        Python
────                                        ──────
invoke(ctx, input)
  ├─ Generate unique token (act-0, act-1, ...)
  ├─ Store ctx in ACTIVITY_CTXS[token]
  ├─ block_in_place + with_gil ─────────────► wrapped_fn(payload)
  │                                            ├─ Parse ctx, create ActivityContext
  │                                            ├─ Call user's function (synchronous)
  │                                            └─ Return JSON result
  │◄───────────────────────────────────────────┘
  └─ Remove token from ACTIVITY_CTXS
```

Users can use `asyncio.run()` inside activities if they need async I/O.

## Cross-Thread Tracing

Python callbacks acquire the GIL on tokio threads. Rust contexts live on tokio threads. Global `HashMap`s bridge the two:

```rust
static ACTIVITY_CTXS: LazyLock<Mutex<HashMap<String, ActivityContext>>>
static ORCHESTRATION_CTXS: LazyLock<Mutex<HashMap<String, OrchestrationContext>>>
```

**Python calls PyO3 functions that look up the Rust context:**

```python
# In OrchestrationContext (fire-and-forget, no yield)
def trace_info(self, message):
    orchestration_trace_log(self.instance_id, "info", str(message))

# In ActivityContext (fire-and-forget)
def trace_info(self, message):
    activity_trace_log(self._trace_token, "info", str(message))
```

### Rules for Tracing

1. **Never expose `is_replaying` to Python** — Rust `OrchestrationContext.trace()` handles suppression
2. **Always use global maps, not thread-locals** — Python runs on a different thread
3. **Clean up map entries on ALL exit paths** — leaked entries cause stale traces
4. **Use atomic tokens for activities** (not instance_id) — multiple activities for the same instance can run concurrently

## ScheduledTask Protocol

Python yields plain dicts. Rust deserializes them via `serde_json` into `ScheduledTask` enum variants:

```rust
#[derive(Deserialize)]
#[serde(tag = "type", rename_all = "camelCase")]
pub enum ScheduledTask {
    Activity { name: String, input: String },
    ActivityWithRetry { name: String, input: String, retry: RetryPolicyConfig },
    Timer { delay_ms: u64 },
    WaitEvent { name: String },
    SubOrchestration { name: String, input: String },
    SubOrchestrationWithId { name: String, instance_id: String, input: String },
    SubOrchestrationVersioned { name: String, version: String, input: String },
    Orchestration { name: String, instance_id: String, input: String },
    OrchestrationVersioned { name: String, version: String, instance_id: String, input: String },
    NewGuid,
    UtcNow,
    ContinueAsNew { input: String },
    ContinueAsNewVersioned { input: String, version: String },
    Join { tasks: Vec<ScheduledTask> },
    Select { tasks: Vec<ScheduledTask> },
}
```

### Adding a New ScheduledTask Type

1. Add variant to `ScheduledTask` in `src/types.rs` with correct `serde` attributes
2. Add execution branch in `execute_task()` in `src/handlers.rs`
3. If it should work in `select/race`, add branch in `make_select_future()`
4. If it should work in `join/all`, add branch in `make_join_future()`
5. Add Python method to `OrchestrationContext` in `python/duroxide/context.py`
6. Add test in `tests/test_e2e.py`
7. Rebuild: `maturin develop`

## Global Tokio Runtime

```rust
static TOKIO_RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
    tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .expect("failed to build tokio runtime")
});
```

All async operations go through `TOKIO_RT.block_on()` (with GIL released via `py.allow_threads()`). No pyo3-async-runtimes needed.

## Provider Polymorphism

PyO3 doesn't support trait objects in constructors. Python wrapper detects provider type:

```python
class Runtime:
    def __init__(self, provider, options=None):
        if getattr(provider, "_type", None) == "postgres":
            self._native = PyRuntime.from_postgres(provider._native, options)
        else:
            self._native = PyRuntime.from_sqlite(provider._native, options)
```

## PyO3 Object Mutability

`#[pyclass(get_all)]` makes fields readable but NOT writable from Python. Even with `set_all`, setting a Python `dict` to an `Option<String>` field fails with a type mismatch.

**Solution:** Create pure Python wrapper objects (e.g., `OrchestrationResult`) instead of mutating PyO3 objects:

```python
class OrchestrationResult:
    def __init__(self, status, output=None, error=None):
        self.status = status
        self.output = output  # parsed from JSON
        self.error = error

def _parse_status(raw):
    output = raw.output
    if output is not None:
        output = json.loads(output)
    return OrchestrationResult(status=raw.status, output=output, error=raw.error)
```

## Common Pitfalls

| Pitfall | What Happens | Fix |
|---------|-------------|-----|
| Missing `py.allow_threads()` around `block_on` | GIL deadlock — process hangs forever | Wrap ALL `TOKIO_RT.block_on()` calls |
| Returning `PyErr` from inside `allow_threads` | Compile error — `PyErr` is not `Send` | Map to `String` inside, `PyErr` outside |
| Thread-local for cross-thread context | Lookup returns `None` — traces silently fail | Use global `HashMap` |
| Mutating PyO3 `#[pyclass]` fields from Python | `TypeError` or silently ignored | Use Python wrapper objects |
| `cargo build` instead of `maturin develop` | Python imports stale `.so` — changes don't take effect | Always use `maturin develop` |
| Missing `serde(rename_all)` on new ScheduledTask variants | Deserialization fails — task type not recognized | Match Python naming convention (camelCase in JSON) |

## Build Requirements

```bash
# MUST use maturin (not cargo build) for the Python extension
source .venv/bin/activate
maturin develop           # Debug build + install
maturin develop --release # Release build + install
maturin build --release   # Build wheel for distribution

# cargo build alone produces a .dylib/.so that Python can't find
```