APM

>Agent Skill

@vercel/iter-coroutine

skilldevelopment

Refactor sync+async modules to eliminate duplication using the iter-coroutine

pythonapi-design
apm::install
$apm install @vercel/iter-coroutine
apm::skill.md
# Iter-Coroutine + Base/Runtime Migration

Refactor sync+async modules to eliminate duplication using the iter-coroutine
pattern. This skill covers the full migration workflow: identifying candidates,
structuring internal modules, writing transport-agnostic business logic, and
wiring up sync/async public APIs.

## When to use

- A module has parallel sync and async implementations with duplicated logic.
- You're adding a new feature that needs both sync and async public APIs.
- You're refactoring an existing feature to use the shared HTTP transport layer.

## Core principles

- **Public API is stable.** Same exported names, signatures, return types, and
  behavior. Migrations are internal refactors, not API changes.
- **Internal core is async-first.** All shared business logic lives in async
  methods on a base class.
- **Sync entrypoints are thin wrappers** over the async core via
  `iter_coroutine(...)`, used only when the wrapped coroutine is non-suspending
  in sync mode.
- **HTTP goes through `vercel._internal.http`** transports. Never construct
  `httpx.Client` or `httpx.AsyncClient` directly in feature modules.

## Architecture overview

```
Public API (sync wrappers + async functions)

    │ iter_coroutine() one-shot bridge (sync path)
    │ await (async path)

Concrete Clients (SyncFooClient / AsyncFooClient)

    │ inherit from

BaseFooClient (all async methods — shared business logic)

    │ uses

RequestClient (transport-agnostic async API)


Transport (SyncTransport or AsyncTransport)


httpx.Client or httpx.AsyncClient
```

## How `iter_coroutine` works

```python
# src/vercel/_internal/iter_coroutine.py
def iter_coroutine(coro: Coroutine[None, None, _T]) -> _T:
    """Execute a non-suspending coroutine synchronously."""
    try:
        coro.send(None)
    except StopIteration as ex:
        return ex.value
    else:
        raise RuntimeError(f"coroutine {coro!r} did not stop after one iteration!")
    finally:
        coro.close()
```

It drives a coroutine forward exactly one step. If the coroutine completes
without suspending, its return value is extracted from the `StopIteration`
exception. If it tries to yield/await, it raises `RuntimeError`.

### Safety rules

| Safe                                              | Unsafe                                           |
|---------------------------------------------------|--------------------------------------------------|
| Coroutines that only call sync code via async API | Coroutines that `await` real I/O                 |
| `SyncTransport.send()` (sync under async facade)  | `AsyncTransport.send()` (real `await`)           |
| Sync sleep, sync file I/O                         | `asyncio.sleep`, `asyncio.create_task`           |
| `inspect.isawaitable()` guarded branches          | Unguarded `await` on user callbacks              |

## Step-by-step migration

### 1. Create the transport-agnostic request client

The request client wraps a `BaseTransport` and exposes an async API. The
transport implementation determines whether I/O is actually sync or async.

```python
# src/vercel/_internal/foo/core.py
from vercel._internal.http import (
    AsyncTransport, BaseTransport, SyncTransport,
    create_base_client, create_base_async_client,
)

class FooRequestClient:
    def __init__(
        self,
        *,
        transport: BaseTransport,
        sleep_fn: SleepFn = asyncio.sleep,
        await_callback: bool = True,
    ) -> None:
        self._transport = transport
        self._sleep_fn = sleep_fn
        self._await_callback = await_callback

    async def request(self, method: str, path: str, **kwargs) -> Any:
        # Retry logic, error mapping, progress callbacks — all async-shaped
        # but non-suspending when backed by SyncTransport
        resp = await self._transport.send(method, path, **kwargs)
        ...
```

Factory functions configure transport + sleep + callback behavior:

```python
def create_sync_request_client(timeout: float = 30.0) -> FooRequestClient:
    transport = SyncTransport(create_base_client(timeout=timeout))
    return FooRequestClient(
        transport=transport,
        sleep_fn=_sync_sleep,         # time.sleep, not asyncio.sleep
        await_callback=False,          # don't await user callbacks
    )

def create_async_request_client(timeout: float = 30.0) -> FooRequestClient:
    transport = AsyncTransport(create_base_async_client(timeout=timeout))
    return FooRequestClient(transport=transport)  # defaults are async-friendly
```

### 2. Write the shared base client

All business logic goes in async methods on a base class:

```python
class BaseFooClient:
    def __init__(self, *, request_client: FooRequestClient) -> None:
        self._request_client = request_client

    async def do_thing(self, path: str, *, token: str) -> Result:
        # Validation, request building, response parsing — all here
        raw = await self._request_client.request("POST", path, token=token)
        return build_result(raw)
```

### 3. Add sync and async concrete clients

```python
class SyncFooClient(BaseFooClient):
    def __init__(self) -> None:
        super().__init__(request_client=create_sync_request_client())

    def close(self) -> None:
        self._request_client.close()

    def __enter__(self) -> SyncFooClient:
        return self

    def __exit__(self, *args: object) -> None:
        self.close()


class AsyncFooClient(BaseFooClient):
    def __init__(self) -> None:
        super().__init__(request_client=create_async_request_client())

    async def aclose(self) -> None:
        await self._request_client.aclose()

    async def __aenter__(self) -> AsyncFooClient:
        return self

    async def __aexit__(self, *args: object) -> None:
        await self.aclose()
```

### 4. Wire up public API functions

```python
# src/vercel/foo/ops.py
from vercel._internal.iter_coroutine import iter_coroutine

_T = TypeVar("_T")

def _run_sync(
    operation: Callable[[SyncFooClient], Coroutine[None, None, _T]],
) -> _T:
    with SyncFooClient() as client:
        return iter_coroutine(operation(client))


# Sync public API
def do_thing(path: str, *, token: str | None = None) -> Result:
    token = ensure_token(token)
    return _run_sync(lambda c: c.do_thing(path, token=token))


# Async public API
async def do_thing_async(path: str, *, token: str | None = None) -> Result:
    token = ensure_token(token)
    async with AsyncFooClient() as client:
        return await client.do_thing(path, token=token)
```

## Base + runtime split

When sync and async paths differ materially (e.g., threading vs asyncio for
concurrent uploads), use a runtime object:

```python
# Sync runtime — ThreadPoolExecutor
class _SyncUploadRuntime:
    def upload(self, *, parts, upload_fn, ...) -> list[PartResult]:
        with ThreadPoolExecutor(max_workers=MAX_CONCURRENCY) as executor:
            # Submit work, collect results
            ...

# Async runtime — anyio task group
class _AsyncUploadRuntime:
    async def upload(self, *, parts, upload_fn, ...) -> list[PartResult]:
        semaphore = anyio.Semaphore(MAX_CONCURRENCY)
        async with anyio.create_task_group() as tg:
            # Start tasks with semaphore limiting
            ...
```

The base client calls the runtime polymorphically:

```python
class BaseFooClient:
    async def _do_upload(self, ...) -> list[PartResult]:
        parts = await _await_if_necessary(
            self._runtime.upload(
                parts=chunks,
                upload_fn=self._make_upload_fn(),
                ...
            )
        )
        return order_parts(parts)
```

Concrete clients inject the right runtime and upload function:

```python
class SyncFooClient(BaseFooClient):
    def __init__(self) -> None:
        super().__init__(runtime=create_sync_runtime())

    def _make_upload_fn(self):
        # Wrap async method with iter_coroutine for sync execution
        return lambda **kw: iter_coroutine(self._client.upload_part(**kw))


class AsyncFooClient(BaseFooClient):
    def __init__(self) -> None:
        super().__init__(runtime=create_async_runtime())

    def _make_upload_fn(self):
        # Return the async method directly
        return self._client.upload_part
```

### Shared orchestration

Keep these in the base layer, not duplicated per runtime:

- Input validation
- Chunk/part iteration helpers
- Result normalization and ordering
- Final response shaping

## Callback handling

For callbacks that may be sync or async:

```python
async def _emit_progress(
    callback: Callable[[Event], None] | Callable[[Event], Awaitable[None]] | None,
    event: Event,
    *,
    await_callback: bool,
) -> None:
    if callback is None:
        return
    result = callback(event)
    if await_callback and inspect.isawaitable(result):
        await cast(Awaitable[None], result)
```

- Sync clients set `await_callback=False` — callbacks are always sync.
- Async clients set `await_callback=True` — callbacks may be awaitable.

## Testing expectations

- Prefer integration-style tests with `respx` that verify real request flow and
  sync/async parity.
- Do not rely only on monkeypatch tests that assert internal call shape.
- Test both sync and async paths for every operation.
- Validate before commit:
  - `./scripts/lint.sh`
  - `./scripts/test.sh` (targeted or full)

## Reference implementation

The blob module is the canonical example of this pattern:

- **Transport:** `src/vercel/_internal/http/transport.py`
- **Request client:** `src/vercel/_internal/blob/core.py``BlobRequestClient`
- **Base client:** `src/vercel/_internal/blob/core.py``BaseBlobOpsClient`
- **Sync client:** `src/vercel/_internal/blob/core.py``SyncBlobOpsClient`
- **Async client:** `src/vercel/_internal/blob/core.py``AsyncBlobOpsClient`
- **Runtime split:** `src/vercel/_internal/blob/multipart.py`
- **Public API:** `src/vercel/blob/ops.py`
- **Types re-export:** `src/vercel/blob/types.py`