iter-coroutine
skill✓Refactor sync+async modules to eliminate duplication using the iter-coroutine
apm::install
apm install @vercel/iter-coroutineapm::skill.md
# Iter-Coroutine + Base/Runtime Migration
Refactor sync+async modules to eliminate duplication using the iter-coroutine
pattern. This skill covers the full migration workflow: identifying candidates,
structuring internal modules, writing transport-agnostic business logic, and
wiring up sync/async public APIs.
## When to use
- A module has parallel sync and async implementations with duplicated logic.
- You're adding a new feature that needs both sync and async public APIs.
- You're refactoring an existing feature to use the shared HTTP transport layer.
## Core principles
- **Public API is stable.** Same exported names, signatures, return types, and
behavior. Migrations are internal refactors, not API changes.
- **Internal core is async-first.** All shared business logic lives in async
methods on a base class.
- **Sync entrypoints are thin wrappers** over the async core via
`iter_coroutine(...)`, used only when the wrapped coroutine is non-suspending
in sync mode.
- **HTTP goes through `vercel._internal.http`** transports. Never construct
`httpx.Client` or `httpx.AsyncClient` directly in feature modules.
## Architecture overview
```
Public API (sync wrappers + async functions)
│
│ iter_coroutine() one-shot bridge (sync path)
│ await (async path)
▼
Concrete Clients (SyncFooClient / AsyncFooClient)
│
│ inherit from
▼
BaseFooClient (all async methods — shared business logic)
│
│ uses
▼
RequestClient (transport-agnostic async API)
│
▼
Transport (SyncTransport or AsyncTransport)
│
▼
httpx.Client or httpx.AsyncClient
```
## How `iter_coroutine` works
```python
# src/vercel/_internal/iter_coroutine.py
def iter_coroutine(coro: Coroutine[None, None, _T]) -> _T:
"""Execute a non-suspending coroutine synchronously."""
try:
coro.send(None)
except StopIteration as ex:
return ex.value
else:
raise RuntimeError(f"coroutine {coro!r} did not stop after one iteration!")
finally:
coro.close()
```
It drives a coroutine forward exactly one step. If the coroutine completes
without suspending, its return value is extracted from the `StopIteration`
exception. If it tries to yield/await, it raises `RuntimeError`.
### Safety rules
| Safe | Unsafe |
|---------------------------------------------------|--------------------------------------------------|
| Coroutines that only call sync code via async API | Coroutines that `await` real I/O |
| `SyncTransport.send()` (sync under async facade) | `AsyncTransport.send()` (real `await`) |
| Sync sleep, sync file I/O | `asyncio.sleep`, `asyncio.create_task` |
| `inspect.isawaitable()` guarded branches | Unguarded `await` on user callbacks |
## Step-by-step migration
### 1. Create the transport-agnostic request client
The request client wraps a `BaseTransport` and exposes an async API. The
transport implementation determines whether I/O is actually sync or async.
```python
# src/vercel/_internal/foo/core.py
from vercel._internal.http import (
AsyncTransport, BaseTransport, SyncTransport,
create_base_client, create_base_async_client,
)
class FooRequestClient:
def __init__(
self,
*,
transport: BaseTransport,
sleep_fn: SleepFn = asyncio.sleep,
await_callback: bool = True,
) -> None:
self._transport = transport
self._sleep_fn = sleep_fn
self._await_callback = await_callback
async def request(self, method: str, path: str, **kwargs) -> Any:
# Retry logic, error mapping, progress callbacks — all async-shaped
# but non-suspending when backed by SyncTransport
resp = await self._transport.send(method, path, **kwargs)
...
```
Factory functions configure transport + sleep + callback behavior:
```python
def create_sync_request_client(timeout: float = 30.0) -> FooRequestClient:
transport = SyncTransport(create_base_client(timeout=timeout))
return FooRequestClient(
transport=transport,
sleep_fn=_sync_sleep, # time.sleep, not asyncio.sleep
await_callback=False, # don't await user callbacks
)
def create_async_request_client(timeout: float = 30.0) -> FooRequestClient:
transport = AsyncTransport(create_base_async_client(timeout=timeout))
return FooRequestClient(transport=transport) # defaults are async-friendly
```
### 2. Write the shared base client
All business logic goes in async methods on a base class:
```python
class BaseFooClient:
def __init__(self, *, request_client: FooRequestClient) -> None:
self._request_client = request_client
async def do_thing(self, path: str, *, token: str) -> Result:
# Validation, request building, response parsing — all here
raw = await self._request_client.request("POST", path, token=token)
return build_result(raw)
```
### 3. Add sync and async concrete clients
```python
class SyncFooClient(BaseFooClient):
def __init__(self) -> None:
super().__init__(request_client=create_sync_request_client())
def close(self) -> None:
self._request_client.close()
def __enter__(self) -> SyncFooClient:
return self
def __exit__(self, *args: object) -> None:
self.close()
class AsyncFooClient(BaseFooClient):
def __init__(self) -> None:
super().__init__(request_client=create_async_request_client())
async def aclose(self) -> None:
await self._request_client.aclose()
async def __aenter__(self) -> AsyncFooClient:
return self
async def __aexit__(self, *args: object) -> None:
await self.aclose()
```
### 4. Wire up public API functions
```python
# src/vercel/foo/ops.py
from vercel._internal.iter_coroutine import iter_coroutine
_T = TypeVar("_T")
def _run_sync(
operation: Callable[[SyncFooClient], Coroutine[None, None, _T]],
) -> _T:
with SyncFooClient() as client:
return iter_coroutine(operation(client))
# Sync public API
def do_thing(path: str, *, token: str | None = None) -> Result:
token = ensure_token(token)
return _run_sync(lambda c: c.do_thing(path, token=token))
# Async public API
async def do_thing_async(path: str, *, token: str | None = None) -> Result:
token = ensure_token(token)
async with AsyncFooClient() as client:
return await client.do_thing(path, token=token)
```
## Base + runtime split
When sync and async paths differ materially (e.g., threading vs asyncio for
concurrent uploads), use a runtime object:
```python
# Sync runtime — ThreadPoolExecutor
class _SyncUploadRuntime:
def upload(self, *, parts, upload_fn, ...) -> list[PartResult]:
with ThreadPoolExecutor(max_workers=MAX_CONCURRENCY) as executor:
# Submit work, collect results
...
# Async runtime — anyio task group
class _AsyncUploadRuntime:
async def upload(self, *, parts, upload_fn, ...) -> list[PartResult]:
semaphore = anyio.Semaphore(MAX_CONCURRENCY)
async with anyio.create_task_group() as tg:
# Start tasks with semaphore limiting
...
```
The base client calls the runtime polymorphically:
```python
class BaseFooClient:
async def _do_upload(self, ...) -> list[PartResult]:
parts = await _await_if_necessary(
self._runtime.upload(
parts=chunks,
upload_fn=self._make_upload_fn(),
...
)
)
return order_parts(parts)
```
Concrete clients inject the right runtime and upload function:
```python
class SyncFooClient(BaseFooClient):
def __init__(self) -> None:
super().__init__(runtime=create_sync_runtime())
def _make_upload_fn(self):
# Wrap async method with iter_coroutine for sync execution
return lambda **kw: iter_coroutine(self._client.upload_part(**kw))
class AsyncFooClient(BaseFooClient):
def __init__(self) -> None:
super().__init__(runtime=create_async_runtime())
def _make_upload_fn(self):
# Return the async method directly
return self._client.upload_part
```
### Shared orchestration
Keep these in the base layer, not duplicated per runtime:
- Input validation
- Chunk/part iteration helpers
- Result normalization and ordering
- Final response shaping
## Callback handling
For callbacks that may be sync or async:
```python
async def _emit_progress(
callback: Callable[[Event], None] | Callable[[Event], Awaitable[None]] | None,
event: Event,
*,
await_callback: bool,
) -> None:
if callback is None:
return
result = callback(event)
if await_callback and inspect.isawaitable(result):
await cast(Awaitable[None], result)
```
- Sync clients set `await_callback=False` — callbacks are always sync.
- Async clients set `await_callback=True` — callbacks may be awaitable.
## Testing expectations
- Prefer integration-style tests with `respx` that verify real request flow and
sync/async parity.
- Do not rely only on monkeypatch tests that assert internal call shape.
- Test both sync and async paths for every operation.
- Validate before commit:
- `./scripts/lint.sh`
- `./scripts/test.sh` (targeted or full)
## Reference implementation
The blob module is the canonical example of this pattern:
- **Transport:** `src/vercel/_internal/http/transport.py`
- **Request client:** `src/vercel/_internal/blob/core.py` → `BlobRequestClient`
- **Base client:** `src/vercel/_internal/blob/core.py` → `BaseBlobOpsClient`
- **Sync client:** `src/vercel/_internal/blob/core.py` → `SyncBlobOpsClient`
- **Async client:** `src/vercel/_internal/blob/core.py` → `AsyncBlobOpsClient`
- **Runtime split:** `src/vercel/_internal/blob/multipart.py`
- **Public API:** `src/vercel/blob/ops.py`
- **Types re-export:** `src/vercel/blob/types.py`