APM

>Agent Skill

@enuno/distributed-state-sync-skill

skilldata

Implements CRDT (Conflict-Free Replicated Data Types) for distributed state management with automatic conflict resolution

python
apm::install
$apm install @enuno/distributed-state-sync-skill
apm::skill.md
---
name: distributed-state-sync-skill
description: Implements CRDT (Conflict-Free Replicated Data Types) for distributed state management with automatic conflict resolution
version: 1.0.0
tags: [orchestration, crdt, state-management, distributed-systems, conflict-resolution]
---

# Distributed State Sync Skill

## Purpose

This skill provides Conflict-Free Replicated Data Types (CRDTs) for managing distributed state across multiple agents with automatic conflict resolution. It enables agents to update shared state concurrently without coordination while guaranteeing eventual consistency.

## When to Use This Skill

**Use this skill when:**
- ✅ Multiple agents need to share and update state concurrently
- ✅ Network partitions or delays are possible
- ✅ Need automatic conflict resolution without locking
- ✅ Want eventually consistent distributed data structures
- ✅ Implementing collaborative multi-agent workflows

**Don't use this skill for:**
- ❌ Single-agent workflows (use local state)
- ❌ Scenarios requiring immediate consistency (use locking)
- ❌ Simple read-only state sharing
- ❌ States that don't conflict (use simple replication)

## Core Data Structures

### 1. OR-Set (Observed-Remove Set)

**Purpose**: Distributed set where adds and removes can happen concurrently

**Properties**:
- Concurrent adds are preserved
- Removes only affect observed elements
- Eventually consistent across all replicas

**Implementation**:
```python
from dataclasses import dataclass, field
from typing import Any, Dict, Set, Tuple
import uuid

@dataclass
class ORSet:
    """
    Observed-Remove Set (OR-Set) CRDT.

    Maintains both added and removed elements with unique IDs.
    Elements can be added and removed concurrently.
    """
    added: Dict[Any, Set[str]] = field(default_factory=dict)  # element -> set of unique IDs
    removed: Set[Tuple[Any, str]] = field(default_factory=set)  # set of (element, unique_id) pairs

    def add(self, element: Any) -> str:
        """
        Add an element to the set.

        Returns:
            Unique ID for this add operation
        """
        unique_id = str(uuid.uuid4())

        if element not in self.added:
            self.added[element] = set()

        self.added[element].add(unique_id)

        return unique_id

    def remove(self, element: Any) -> None:
        """
        Remove an element from the set.

        Removes all currently observed instances of the element.
        """
        if element in self.added:
            for unique_id in self.added[element]:
                self.removed.add((element, unique_id))

    def contains(self, element: Any) -> bool:
        """Check if element is in the set."""
        if element not in self.added:
            return False

        # Element exists if it has any non-removed instances
        for unique_id in self.added[element]:
            if (element, unique_id) not in self.removed:
                return True

        return False

    def get_elements(self) -> Set[Any]:
        """Get all elements currently in the set."""
        elements = set()

        for element, unique_ids in self.added.items():
            # Check if element has any non-removed instances
            if any((element, uid) not in self.removed for uid in unique_ids):
                elements.add(element)

        return elements

    def merge(self, other: 'ORSet') -> 'ORSet':
        """
        Merge with another OR-Set replica.

        Returns:
            New OR-Set with merged state
        """
        merged = ORSet()

        # Merge added elements
        all_elements = set(self.added.keys()) | set(other.added.keys())
        for element in all_elements:
            merged.added[element] = (
                self.added.get(element, set()) |
                other.added.get(element, set())
            )

        # Merge removed elements
        merged.removed = self.removed | other.removed

        return merged
```

**Example**:
```python
# Agent A and Agent B working concurrently
agent_a_set = ORSet()
agent_b_set = ORSet()

# Agent A adds "task-1"
agent_a_set.add("task-1")

# Agent B adds "task-2" (concurrent)
agent_b_set.add("task-2")

# Merge sets
final_set = agent_a_set.merge(agent_b_set)

# Result: Both tasks present
assert final_set.contains("task-1")
assert final_set.contains("task-2")
```

### 2. G-Counter (Grow-Only Counter)

**Purpose**: Distributed counter that only increments

**Properties**:
- Each replica has its own counter
- Total = sum of all replica counters
- Merge is max operation per replica

**Implementation**:
```python
@dataclass
class GCounter:
    """
    Grow-Only Counter CRDT.

    Each replica maintains its own counter.
    Total value is the sum of all replica counters.
    """
    counters: Dict[str, int] = field(default_factory=dict)  # replica_id -> count
    replica_id: str = field(default_factory=lambda: str(uuid.uuid4()))

    def increment(self, amount: int = 1) -> None:
        """Increment this replica's counter."""
        if self.replica_id not in self.counters:
            self.counters[self.replica_id] = 0

        self.counters[self.replica_id] += amount

    def value(self) -> int:
        """Get total value across all replicas."""
        return sum(self.counters.values())

    def merge(self, other: 'GCounter') -> 'GCounter':
        """
        Merge with another G-Counter replica.

        Returns:
            New G-Counter with merged state
        """
        merged = GCounter(replica_id=self.replica_id)

        # Take max of each replica's counter
        all_replicas = set(self.counters.keys()) | set(other.counters.keys())
        for replica in all_replicas:
            merged.counters[replica] = max(
                self.counters.get(replica, 0),
                other.counters.get(replica, 0)
            )

        return merged
```

**Example**:
```python
# Three agents counting tasks completed
agent_a_counter = GCounter(replica_id="agent-a")
agent_b_counter = GCounter(replica_id="agent-b")
agent_c_counter = GCounter(replica_id="agent-c")

# Each agent increments locally
agent_a_counter.increment(5)  # Completed 5 tasks
agent_b_counter.increment(3)  # Completed 3 tasks
agent_c_counter.increment(7)  # Completed 7 tasks

# Merge all counters
merged = agent_a_counter.merge(agent_b_counter).merge(agent_c_counter)

# Total tasks completed
assert merged.value() == 15  # 5 + 3 + 7
```

### 3. LWW-Register (Last-Write-Wins Register)

**Purpose**: Distributed register where last write wins based on timestamp

**Properties**:
- Each update has a timestamp
- Latest timestamp wins on conflict
- Simple but may lose data

**Implementation**:
```python
@dataclass
class LWWRegister:
    """
    Last-Write-Wins Register CRDT.

    Stores a value with a timestamp.
    On merge, value with latest timestamp wins.
    """
    value: Any = None
    timestamp: float = 0.0
    replica_id: str = field(default_factory=lambda: str(uuid.uuid4()))

    def set(self, value: Any) -> None:
        """Set value with current timestamp."""
        import time
        self.value = value
        self.timestamp = time.time()

    def get(self) -> Any:
        """Get current value."""
        return self.value

    def merge(self, other: 'LWWRegister') -> 'LWWRegister':
        """
        Merge with another LWW-Register.

        Returns:
            New register with value from latest write
        """
        merged = LWWRegister()

        if self.timestamp > other.timestamp:
            merged.value = self.value
            merged.timestamp = self.timestamp
            merged.replica_id = self.replica_id
        elif other.timestamp > self.timestamp:
            merged.value = other.value
            merged.timestamp = other.timestamp
            merged.replica_id = other.replica_id
        else:
            # Timestamps equal - use replica_id as tiebreaker
            if self.replica_id > other.replica_id:
                merged.value = self.value
                merged.timestamp = self.timestamp
                merged.replica_id = self.replica_id
            else:
                merged.value = other.value
                merged.timestamp = other.timestamp
                merged.replica_id = other.replica_id

        return merged
```

**Example**:
```python
# Two agents updating same configuration
agent_a_config = LWWRegister(replica_id="agent-a")
agent_b_config = LWWRegister(replica_id="agent-b")

# Agent A updates at T=1000
agent_a_config.timestamp = 1000.0
agent_a_config.value = {"mode": "production"}

# Agent B updates at T=1001 (1 second later)
agent_b_config.timestamp = 1001.0
agent_b_config.value = {"mode": "staging"}

# Merge: B's value wins (latest timestamp)
merged = agent_a_config.merge(agent_b_config)
assert merged.value == {"mode": "staging"}
```

### 4. PN-Counter (Positive-Negative Counter)

**Purpose**: Distributed counter supporting both increment and decrement

**Properties**:
- Combines two G-Counters (positive and negative)
- Value = positive.value() - negative.value()
- Fully decentralized

**Implementation**:
```python
@dataclass
class PNCounter:
    """
    Positive-Negative Counter CRDT.

    Supports both increment and decrement operations.
    Internally uses two G-Counters.
    """
    positive: GCounter = field(default_factory=GCounter)
    negative: GCounter = field(default_factory=GCounter)

    def increment(self, amount: int = 1) -> None:
        """Increment counter."""
        self.positive.increment(amount)

    def decrement(self, amount: int = 1) -> None:
        """Decrement counter."""
        self.negative.increment(amount)

    def value(self) -> int:
        """Get current value (positive - negative)."""
        return self.positive.value() - self.negative.value()

    def merge(self, other: 'PNCounter') -> 'PNCounter':
        """Merge with another PN-Counter."""
        merged = PNCounter()
        merged.positive = self.positive.merge(other.positive)
        merged.negative = self.negative.merge(other.negative)
        return merged
```

**Example**:
```python
# Agent tracking resource pool (add/remove resources)
agent_a_pool = PNCounter()
agent_b_pool = PNCounter()

# Agent A adds 10 resources
agent_a_pool.increment(10)

# Agent B removes 3 resources (concurrent)
agent_b_pool.decrement(3)

# Merge
merged = agent_a_pool.merge(agent_b_pool)
assert merged.value() == 7  # 10 - 3
```

## Workflow

### Step 1: Initialize Distributed State

```python
from skills.orchestration.distributed_state_sync import ORSet, GCounter

# Initialize state manager
class DistributedStateManager:
    def __init__(self, replica_id: str):
        self.replica_id = replica_id
        self.pending_tasks = ORSet()       # Shared task list
        self.completed_count = GCounter(replica_id=replica_id)  # Task counter
        self.agent_status = {}             # Per-agent state

    def add_task(self, task_id: str) -> None:
        """Add task to pending list."""
        self.pending_tasks.add(task_id)

    def complete_task(self, task_id: str) -> None:
        """Mark task as completed."""
        self.pending_tasks.remove(task_id)
        self.completed_count.increment()

    def get_pending_tasks(self) -> Set[str]:
        """Get all pending tasks."""
        return self.pending_tasks.get_elements()

    def get_completed_count(self) -> int:
        """Get total completed tasks across all agents."""
        return self.completed_count.value()
```

### Step 2: Local Operations

```python
# Agent A performs local operations
agent_a_state = DistributedStateManager(replica_id="agent-a")

agent_a_state.add_task("task-1")
agent_a_state.add_task("task-2")
agent_a_state.complete_task("task-1")

# Agent B performs concurrent operations
agent_b_state = DistributedStateManager(replica_id="agent-b")

agent_b_state.add_task("task-3")
agent_b_state.complete_task("task-3")
```

### Step 3: Periodic Synchronization

```python
def synchronize_state(local_state: DistributedStateManager,
                      remote_state: DistributedStateManager) -> DistributedStateManager:
    """
    Synchronize state between two replicas.

    Returns:
        New state with merged updates
    """
    merged_state = DistributedStateManager(replica_id=local_state.replica_id)

    # Merge pending tasks (OR-Set)
    merged_state.pending_tasks = local_state.pending_tasks.merge(remote_state.pending_tasks)

    # Merge completed count (G-Counter)
    merged_state.completed_count = local_state.completed_count.merge(remote_state.completed_count)

    return merged_state

# Synchronize every 30 seconds
import time

while True:
    # Get state from other agents
    remote_states = fetch_remote_states()

    # Merge with local state
    for remote_state in remote_states:
        agent_a_state = synchronize_state(agent_a_state, remote_state)

    # Broadcast local state to others
    broadcast_state(agent_a_state)

    time.sleep(30)
```

### Step 4: Conflict Resolution

```python
# Conflicts are automatically resolved by CRDT semantics

# Example: Two agents add different tasks concurrently
agent_a_state.add_task("task-1")
agent_b_state.add_task("task-2")

# After merge: Both tasks present (OR-Set preserves both adds)
merged = synchronize_state(agent_a_state, agent_b_state)
assert "task-1" in merged.get_pending_tasks()
assert "task-2" in merged.get_pending_tasks()

# Example: Two agents increment counter concurrently
agent_a_state.completed_count.increment(5)
agent_b_state.completed_count.increment(3)

# After merge: Counts are added (G-Counter sums all increments)
merged = synchronize_state(agent_a_state, agent_b_state)
assert merged.get_completed_count() == 8  # 5 + 3
```

## Advanced Patterns

### 1. Multi-Value Register (MVRegister)

**Purpose**: Keep all concurrent values until resolved

```python
@dataclass
class MVRegister:
    """
    Multi-Value Register CRDT.

    Maintains all concurrent values with vector clocks.
    Application can choose resolution strategy.
    """
    values: Dict[Tuple, Any] = field(default_factory=dict)  # vector_clock -> value

    def set(self, value: Any, vector_clock: Tuple) -> None:
        """Set value with vector clock."""
        # Remove values dominated by this clock
        self.values = {
            vc: v for vc, v in self.values.items()
            if not self._dominates(vector_clock, vc)
        }

        self.values[vector_clock] = value

    def get(self) -> Set[Any]:
        """Get all concurrent values."""
        return set(self.values.values())

    def _dominates(self, vc1: Tuple, vc2: Tuple) -> bool:
        """Check if vc1 dominates vc2 (happens-before)."""
        return all(a >= b for a, b in zip(vc1, vc2)) and vc1 != vc2

    def merge(self, other: 'MVRegister') -> 'MVRegister':
        """Merge two MV-Registers."""
        merged = MVRegister()

        all_clocks = set(self.values.keys()) | set(other.values.keys())

        for vc in all_clocks:
            # Keep if not dominated by any other clock
            if not any(self._dominates(other_vc, vc) for other_vc in all_clocks if other_vc != vc):
                value = self.values.get(vc) or other.values.get(vc)
                merged.values[vc] = value

        return merged
```

### 2. LWW-Map (Last-Write-Wins Map)

**Purpose**: Distributed key-value map with LWW resolution

```python
class LWWMap:
    """
    Last-Write-Wins Map CRDT.

    Each key is an LWW-Register.
    """
    def __init__(self):
        self.map: Dict[str, LWWRegister] = {}

    def set(self, key: str, value: Any) -> None:
        """Set key-value pair."""
        if key not in self.map:
            self.map[key] = LWWRegister()

        self.map[key].set(value)

    def get(self, key: str) -> Any:
        """Get value for key."""
        if key not in self.map:
            return None

        return self.map[key].get()

    def merge(self, other: 'LWWMap') -> 'LWWMap':
        """Merge two LWW-Maps."""
        merged = LWWMap()

        all_keys = set(self.map.keys()) | set(other.map.keys())

        for key in all_keys:
            self_reg = self.map.get(key)
            other_reg = other.map.get(key)

            if self_reg and other_reg:
                merged.map[key] = self_reg.merge(other_reg)
            elif self_reg:
                merged.map[key] = self_reg
            else:
                merged.map[key] = other_reg

        return merged
```

## Integration Patterns

### With State Manager Agent

```python
# State Manager Agent uses this skill for CRDT operations
from skills.orchestration.distributed_state_sync import ORSet, GCounter, LWWMap

class StateManagerAgent:
    def __init__(self, replica_id: str):
        self.replica_id = replica_id

        # Use CRDTs for conflict-free state
        self.tasks = ORSet()           # Pending tasks
        self.metrics = GCounter(replica_id)  # Task counts
        self.config = LWWMap()         # Configuration values

    def handle_state_update(self, operation: str, **kwargs):
        """Handle state update operation."""
        if operation == "add_task":
            self.tasks.add(kwargs["task_id"])

        elif operation == "complete_task":
            self.tasks.remove(kwargs["task_id"])
            self.metrics.increment()

        elif operation == "update_config":
            self.config.set(kwargs["key"], kwargs["value"])

    def sync_with_peer(self, peer_state):
        """Synchronize state with peer agent."""
        self.tasks = self.tasks.merge(peer_state.tasks)
        self.metrics = self.metrics.merge(peer_state.metrics)
        self.config = self.config.merge(peer_state.config)
```

## Examples

### Example 1: Collaborative Task List

```python
# Three agents managing shared task list
agent_a = ORSet()
agent_b = ORSet()
agent_c = ORSet()

# Agent A adds tasks
agent_a.add("implement-auth")
agent_a.add("write-tests")

# Agent B adds task concurrently
agent_b.add("update-docs")

# Agent C removes task (also concurrent)
agent_c.add("implement-auth")  # Observed this task
agent_c.remove("implement-auth")  # Then removed it

# Merge all replicas
merged = agent_a.merge(agent_b).merge(agent_c)

# Result: OR-Set semantics preserve correct state
assert not merged.contains("implement-auth")  # Correctly removed
assert merged.contains("write-tests")         # Preserved from A
assert merged.contains("update-docs")         # Preserved from B
```

### Example 2: Distributed Metrics

```python
# Agents tracking workflow progress
orchestrator = GCounter(replica_id="orchestrator")
agent_1 = GCounter(replica_id="agent-1")
agent_2 = GCounter(replica_id="agent-2")

# Each agent completes tasks
agent_1.increment(5)  # Completed 5 tasks
agent_2.increment(7)  # Completed 7 tasks

# Orchestrator tracks overall progress
orchestrator.increment(2)  # Completed 2 coordination tasks

# Merge for total count
total = orchestrator.merge(agent_1).merge(agent_2)
assert total.value() == 14  # 2 + 5 + 7
```

## Best Practices

1. **Choose Right CRDT for Use Case**
   ```python
   # For sets: Use OR-Set (preserves concurrent adds)
   pending_tasks = ORSet()

   # For counters: Use G-Counter (increment only) or PN-Counter (inc/dec)
   task_count = GCounter()

   # For single values: Use LWW-Register (simple) or MV-Register (complex)
   current_config = LWWRegister()
   ```

2. **Synchronize Periodically**
   ```python
   # Every 30-60 seconds is usually sufficient
   sync_interval = 30  # seconds

   # More frequent for high-concurrency scenarios
   if high_concurrency:
       sync_interval = 10
   ```

3. **Handle Network Partitions**
   ```python
   try:
       remote_state = fetch_state_from_peer()
       merged = local_state.merge(remote_state)
   except NetworkError:
       # Continue with local operations
       # State will eventually sync when partition heals
       log("Network partition - continuing with local state")
   ```

4. **Monitor State Size**
   ```python
   # CRDTs can grow unbounded (especially OR-Set tombstones)
   if len(or_set.removed) > 1000:
       # Garbage collect old tombstones
       or_set.gc_tombstones(older_than=7*24*3600)  # 7 days
   ```

## Related Skills

- `state-manager-skill`: Uses CRDTs for distributed state
- `observability-tracker-skill`: Tracks CRDT synchronization metrics

## References

- [CRDT Wikipedia](https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type)
- Document 15, Section 4: State Management Patterns
- Command: `/state-coordinator`

---

**Version**: 1.0.0
**Status**: Production Ready
**Complexity**: High (advanced distributed systems concepts)
**Token Cost**: Low (local operations, periodic sync)