safety-architecture
skillFramework for designing safeguards in autonomous systems including kill switches, resource limits, anomaly detection, and human-in-the-loop patterns.
apm::install
apm install @majiayu000/safety-architectureapm::skill.md
---
name: safety-architecture
version: 1.0.0
description: |
Framework for designing safeguards in autonomous systems including kill switches,
resource limits, anomaly detection, and human-in-the-loop patterns.
author: QuantQuiver AI R&D
license: MIT
category: workflow
tags:
- safety
- autonomous-systems
- kill-switch
- monitoring
- risk-management
- circuit-breaker
- failsafe
dependencies:
skills: []
python: ">=3.9"
packages:
- pyyaml
tools:
- code_execution
- bash
triggers:
- "safety architecture"
- "kill switch"
- "autonomous safeguards"
- "circuit breaker"
- "failsafe design"
- "human in the loop"
- "anomaly detection"
---
# Safety Architecture Framework
## Purpose
A framework for designing safeguards in autonomous systems including kill switches, resource limits, anomaly detection, and human-in-the-loop patterns. Ensures autonomous systems fail safely and maintain human oversight.
**Problem Space:**
- Autonomous systems can cause runaway damage if unchecked
- Default behavior during failures often unsafe
- Human oversight mechanisms afterthoughts
- Resource exhaustion can cascade to other systems
**Solution Approach:**
- Defense-in-depth safety layers
- Fail-safe defaults
- Mandatory human checkpoints
- Resource isolation and limits
- Comprehensive anomaly detection
## When to Use
- Designing any autonomous system
- Systems that control resources (money, compute, data)
- AI agents with tool access
- Automated trading/scheduling systems
- Any system requiring regulatory compliance
- Production systems with business-critical operations
## When NOT to Use
- Simple scripts with no autonomous decision-making
- Read-only analysis tools
- Systems with no resource impact
- Prototype/experimental code (but consider adding before production)
---
## Core Instructions
### Safety Layer Architecture
```
┌─────────────────────────────────────────────────────────────────┐
│ SAFETY LAYER ARCHITECTURE │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Layer 5: Human Oversight │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ • Manual approval gates for critical actions │ │
│ │ • Alert escalation with response requirements │ │
│ │ • Audit logging with human review │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ Layer 4: Business Logic Guards │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ • Domain-specific validation rules │ │
│ │ • Value limits and thresholds │ │
│ │ • Timing and rate constraints │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ Layer 3: Anomaly Detection │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ • Statistical outlier detection │ │
│ │ • Behavioral drift monitoring │ │
│ │ • Pattern deviation alerts │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ Layer 2: Circuit Breakers │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ • Error rate thresholds │ │
│ │ • Consecutive failure limits │ │
│ │ • Automatic recovery with backoff │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ Layer 1: Resource Limits (Hard Stops) │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ • Memory limits │ │
│ │ • CPU/time limits │ │
│ │ • Cost/budget caps │ │
│ │ • Action count limits │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
```
### Standard Safety Components
#### 1. Kill Switch Patterns
```python
from datetime import datetime, timedelta
from enum import Enum
import threading
import signal
import sys
class KillSwitchState(Enum):
ACTIVE = "active"
PAUSED = "paused"
KILLED = "killed"
class KillSwitch:
"""
Global kill switch for autonomous system.
Features:
- Manual activation (API/CLI)
- Automatic activation on anomaly
- Graceful shutdown with cleanup
- State persistence for recovery
"""
def __init__(self, cleanup_callback=None, timeout_seconds=30):
self.state = KillSwitchState.ACTIVE
self.cleanup_callback = cleanup_callback
self.timeout_seconds = timeout_seconds
self.activation_reason = None
self.activation_time = None
self._lock = threading.Lock()
# Register signal handlers
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
"""Handle OS signals for graceful shutdown."""
self.activate(f"Received signal {signum}")
def activate(self, reason: str):
"""
Activate kill switch - stops all autonomous operations.
Args:
reason: Why the kill switch was activated
"""
with self._lock:
if self.state == KillSwitchState.KILLED:
return # Already killed
self.state = KillSwitchState.KILLED
self.activation_reason = reason
self.activation_time = datetime.now()
print(f"[KILL SWITCH ACTIVATED] {reason}")
# Run cleanup with timeout
if self.cleanup_callback:
cleanup_thread = threading.Thread(
target=self.cleanup_callback
)
cleanup_thread.start()
cleanup_thread.join(timeout=self.timeout_seconds)
if cleanup_thread.is_alive():
print("[KILL SWITCH] Cleanup timeout - forcing exit")
def pause(self, reason: str):
"""Pause operations without full shutdown."""
with self._lock:
if self.state == KillSwitchState.ACTIVE:
self.state = KillSwitchState.PAUSED
self.activation_reason = reason
print(f"[SYSTEM PAUSED] {reason}")
def resume(self):
"""Resume from paused state."""
with self._lock:
if self.state == KillSwitchState.PAUSED:
self.state = KillSwitchState.ACTIVE
self.activation_reason = None
print("[SYSTEM RESUMED]")
def check(self) -> bool:
"""Check if operations should continue."""
return self.state == KillSwitchState.ACTIVE
@property
def is_active(self) -> bool:
return self.state == KillSwitchState.ACTIVE
@property
def is_paused(self) -> bool:
return self.state == KillSwitchState.PAUSED
@property
def is_killed(self) -> bool:
return self.state == KillSwitchState.KILLED
```
#### 2. Circuit Breaker Pattern
```python
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
import threading
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing - reject calls
HALF_OPEN = "half_open" # Testing recovery
@dataclass
class CircuitBreakerConfig:
failure_threshold: int = 5
success_threshold: int = 3
timeout_seconds: int = 60
half_open_max_calls: int = 3
class CircuitBreaker:
"""
Circuit breaker for protecting against cascading failures.
States:
- CLOSED: Normal operation, tracking failures
- OPEN: Failing, rejecting all calls
- HALF_OPEN: Testing if service recovered
"""
def __init__(self, name: str, config: CircuitBreakerConfig = None):
self.name = name
self.config = config or CircuitBreakerConfig()
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = None
self.half_open_calls = 0
self._lock = threading.Lock()
def can_execute(self) -> bool:
"""Check if call should be allowed."""
with self._lock:
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
# Check if timeout elapsed
if self._timeout_elapsed():
self._transition_to_half_open()
return True
return False
if self.state == CircuitState.HALF_OPEN:
if self.half_open_calls < self.config.half_open_max_calls:
self.half_open_calls += 1
return True
return False
return False
def record_success(self):
"""Record successful call."""
with self._lock:
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.config.success_threshold:
self._transition_to_closed()
elif self.state == CircuitState.CLOSED:
# Reset failure count on success
self.failure_count = 0
def record_failure(self):
"""Record failed call."""
with self._lock:
self.last_failure_time = datetime.now()
if self.state == CircuitState.HALF_OPEN:
self._transition_to_open()
elif self.state == CircuitState.CLOSED:
self.failure_count += 1
if self.failure_count >= self.config.failure_threshold:
self._transition_to_open()
def _timeout_elapsed(self) -> bool:
if not self.last_failure_time:
return True
elapsed = datetime.now() - self.last_failure_time
return elapsed.total_seconds() >= self.config.timeout_seconds
def _transition_to_open(self):
self.state = CircuitState.OPEN
print(f"[CIRCUIT BREAKER] {self.name}: OPEN (failures: {self.failure_count})")
def _transition_to_half_open(self):
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
self.success_count = 0
print(f"[CIRCUIT BREAKER] {self.name}: HALF_OPEN (testing recovery)")
def _transition_to_closed(self):
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
print(f"[CIRCUIT BREAKER] {self.name}: CLOSED (recovered)")
def __call__(self, func):
"""Decorator for wrapping functions with circuit breaker."""
def wrapper(*args, **kwargs):
if not self.can_execute():
raise CircuitOpenError(f"Circuit {self.name} is open")
try:
result = func(*args, **kwargs)
self.record_success()
return result
except Exception as e:
self.record_failure()
raise
return wrapper
class CircuitOpenError(Exception):
pass
```
#### 3. Resource Limiter
```python
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
import threading
@dataclass
class ResourceLimits:
"""Configuration for resource limits."""
max_actions_per_hour: int = 1000
max_value_per_action: float = 10000.0
max_total_value_per_hour: float = 100000.0
max_api_calls_per_minute: int = 60
max_memory_mb: int = 1024
max_execution_time_seconds: int = 300
class ResourceLimiter:
"""
Tracks and enforces resource usage limits.
Prevents runaway consumption of:
- API calls
- Financial value
- Compute resources
- Time
"""
def __init__(self, limits: ResourceLimits = None):
self.limits = limits or ResourceLimits()
self.action_count = 0
self.total_value = 0.0
self.api_calls = [] # Timestamps
self.start_time = datetime.now()
self.hour_start = datetime.now()
self._lock = threading.Lock()
def check_action(self, value: float = 0.0) -> tuple[bool, str]:
"""
Check if action is within limits.
Returns:
(allowed, reason) tuple
"""
with self._lock:
self._cleanup_old_records()
# Check execution time
elapsed = (datetime.now() - self.start_time).total_seconds()
if elapsed > self.limits.max_execution_time_seconds:
return False, f"Max execution time exceeded ({elapsed:.0f}s)"
# Check action count
if self.action_count >= self.limits.max_actions_per_hour:
return False, f"Max actions per hour exceeded ({self.action_count})"
# Check single value
if value > self.limits.max_value_per_action:
return False, f"Action value ${value:.2f} exceeds max ${self.limits.max_value_per_action:.2f}"
# Check total value
if self.total_value + value > self.limits.max_total_value_per_hour:
return False, f"Total value would exceed limit (${self.total_value + value:.2f})"
return True, "OK"
def record_action(self, value: float = 0.0):
"""Record an action and its value."""
with self._lock:
self.action_count += 1
self.total_value += value
def check_api_call(self) -> tuple[bool, str]:
"""Check if API call is within rate limit."""
with self._lock:
now = datetime.now()
minute_ago = now - timedelta(minutes=1)
# Clean old calls
self.api_calls = [t for t in self.api_calls if t > minute_ago]
if len(self.api_calls) >= self.limits.max_api_calls_per_minute:
return False, f"API rate limit exceeded ({len(self.api_calls)}/min)"
return True, "OK"
def record_api_call(self):
"""Record an API call."""
with self._lock:
self.api_calls.append(datetime.now())
def _cleanup_old_records(self):
"""Reset hourly counters if hour elapsed."""
now = datetime.now()
if (now - self.hour_start).total_seconds() >= 3600:
self.action_count = 0
self.total_value = 0.0
self.hour_start = now
def get_usage_report(self) -> Dict[str, Any]:
"""Get current resource usage."""
with self._lock:
return {
"actions_this_hour": self.action_count,
"actions_limit": self.limits.max_actions_per_hour,
"value_this_hour": self.total_value,
"value_limit": self.limits.max_total_value_per_hour,
"api_calls_last_minute": len(self.api_calls),
"api_limit": self.limits.max_api_calls_per_minute,
"execution_seconds": (datetime.now() - self.start_time).total_seconds(),
"execution_limit": self.limits.max_execution_time_seconds,
}
```
#### 4. Anomaly Detector
```python
from dataclasses import dataclass
from collections import deque
from typing import Optional, List
import statistics
import math
@dataclass
class AnomalyConfig:
window_size: int = 100
z_score_threshold: float = 3.0
min_samples: int = 10
consecutive_anomalies_alert: int = 3
class AnomalyDetector:
"""
Statistical anomaly detection for continuous metrics.
Uses Z-score method with rolling statistics.
"""
def __init__(self, name: str, config: AnomalyConfig = None):
self.name = name
self.config = config or AnomalyConfig()
self.values = deque(maxlen=self.config.window_size)
self.consecutive_anomalies = 0
def check(self, value: float) -> tuple[bool, Optional[str]]:
"""
Check if value is anomalous.
Returns:
(is_anomaly, reason) tuple
"""
if len(self.values) < self.config.min_samples:
self.values.append(value)
return False, None
mean = statistics.mean(self.values)
stdev = statistics.stdev(self.values)
if stdev == 0:
self.values.append(value)
return False, None
z_score = abs((value - mean) / stdev)
self.values.append(value)
if z_score > self.config.z_score_threshold:
self.consecutive_anomalies += 1
reason = f"Z-score {z_score:.2f} > threshold {self.config.z_score_threshold}"
if self.consecutive_anomalies >= self.config.consecutive_anomalies_alert:
reason += f" (ALERT: {self.consecutive_anomalies} consecutive anomalies)"
return True, reason
else:
self.consecutive_anomalies = 0
return False, None
@property
def should_alert(self) -> bool:
"""Check if anomaly pattern warrants alert."""
return self.consecutive_anomalies >= self.config.consecutive_anomalies_alert
```
#### 5. Human-in-the-Loop Gate
```python
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Optional, Callable
import uuid
class ApprovalStatus(Enum):
PENDING = "pending"
APPROVED = "approved"
REJECTED = "rejected"
EXPIRED = "expired"
@dataclass
class ApprovalRequest:
id: str
action_type: str
description: str
value: float
context: dict
created_at: datetime
expires_at: datetime
status: ApprovalStatus = ApprovalStatus.PENDING
reviewer: Optional[str] = None
reviewed_at: Optional[datetime] = None
review_notes: Optional[str] = None
class HumanApprovalGate:
"""
Requires human approval for critical actions.
Features:
- Configurable approval thresholds
- Timeout handling
- Audit trail
- Notification hooks
"""
def __init__(
self,
approval_timeout_minutes: int = 60,
notify_callback: Callable = None,
auto_approve_below: float = 0.0 # Auto-approve small values
):
self.timeout_minutes = approval_timeout_minutes
self.notify_callback = notify_callback
self.auto_approve_below = auto_approve_below
self.pending_approvals: dict[str, ApprovalRequest] = {}
def request_approval(
self,
action_type: str,
description: str,
value: float,
context: dict = None
) -> ApprovalRequest:
"""
Create approval request for critical action.
Returns:
ApprovalRequest object to track
"""
# Auto-approve small values
if value <= self.auto_approve_below:
request = ApprovalRequest(
id=str(uuid.uuid4()),
action_type=action_type,
description=description,
value=value,
context=context or {},
created_at=datetime.now(),
expires_at=datetime.now(),
status=ApprovalStatus.APPROVED,
reviewer="auto",
reviewed_at=datetime.now(),
review_notes="Auto-approved (below threshold)"
)
return request
# Create pending request
request = ApprovalRequest(
id=str(uuid.uuid4()),
action_type=action_type,
description=description,
value=value,
context=context or {},
created_at=datetime.now(),
expires_at=datetime.now() + timedelta(minutes=self.timeout_minutes)
)
self.pending_approvals[request.id] = request
# Send notification
if self.notify_callback:
self.notify_callback(request)
print(f"[APPROVAL REQUIRED] {request.id}: {description} (${value:.2f})")
return request
def approve(
self,
request_id: str,
reviewer: str,
notes: str = None
) -> bool:
"""Approve a pending request."""
if request_id not in self.pending_approvals:
return False
request = self.pending_approvals[request_id]
if request.status != ApprovalStatus.PENDING:
return False
if datetime.now() > request.expires_at:
request.status = ApprovalStatus.EXPIRED
return False
request.status = ApprovalStatus.APPROVED
request.reviewer = reviewer
request.reviewed_at = datetime.now()
request.review_notes = notes
print(f"[APPROVED] {request_id} by {reviewer}")
return True
def reject(
self,
request_id: str,
reviewer: str,
notes: str = None
) -> bool:
"""Reject a pending request."""
if request_id not in self.pending_approvals:
return False
request = self.pending_approvals[request_id]
if request.status != ApprovalStatus.PENDING:
return False
request.status = ApprovalStatus.REJECTED
request.reviewer = reviewer
request.reviewed_at = datetime.now()
request.review_notes = notes
print(f"[REJECTED] {request_id} by {reviewer}: {notes}")
return True
def check_status(self, request_id: str) -> ApprovalStatus:
"""Check approval status."""
if request_id not in self.pending_approvals:
return None
request = self.pending_approvals[request_id]
# Check expiration
if request.status == ApprovalStatus.PENDING:
if datetime.now() > request.expires_at:
request.status = ApprovalStatus.EXPIRED
return request.status
```
### Decision Framework
**When to Require Human Approval:**
- Actions above value threshold
- First-time or unusual actions
- Actions affecting external systems
- Any irreversible action
- Actions during anomaly state
**Circuit Breaker vs Kill Switch:**
| Aspect | Circuit Breaker | Kill Switch |
|--------|-----------------|-------------|
| Scope | Single service/endpoint | Entire system |
| Trigger | Automatic (errors) | Manual or critical anomaly |
| Recovery | Automatic with backoff | Manual restart required |
| Use Case | Service resilience | Emergency stop |
---
## Templates
### Complete Safety-Wrapped System
```python
class SafeAutonomousSystem:
"""
Template for autonomous system with full safety architecture.
"""
def __init__(self, config: dict):
# Layer 1: Resource limits
self.resource_limiter = ResourceLimiter(ResourceLimits(
max_actions_per_hour=config.get("max_actions", 1000),
max_value_per_action=config.get("max_value", 10000),
max_total_value_per_hour=config.get("max_total", 100000)
))
# Layer 2: Circuit breakers
self.circuit_breakers = {
"api": CircuitBreaker("api", CircuitBreakerConfig(
failure_threshold=5,
timeout_seconds=60
)),
"database": CircuitBreaker("database", CircuitBreakerConfig(
failure_threshold=3,
timeout_seconds=120
))
}
# Layer 3: Anomaly detection
self.anomaly_detectors = {
"value": AnomalyDetector("value", AnomalyConfig(z_score_threshold=3.0)),
"frequency": AnomalyDetector("frequency", AnomalyConfig(z_score_threshold=2.5))
}
# Layer 4: Human approval
self.approval_gate = HumanApprovalGate(
approval_timeout_minutes=60,
auto_approve_below=100.0
)
# Layer 5: Kill switch
self.kill_switch = KillSwitch(cleanup_callback=self._cleanup)
def execute_action(self, action: dict) -> dict:
"""
Execute action through all safety layers.
"""
# Check kill switch
if not self.kill_switch.check():
return {"error": "System is stopped", "reason": self.kill_switch.activation_reason}
value = action.get("value", 0)
# Check resource limits
allowed, reason = self.resource_limiter.check_action(value)
if not allowed:
return {"error": "Resource limit exceeded", "reason": reason}
# Check anomaly
is_anomaly, anomaly_reason = self.anomaly_detectors["value"].check(value)
if is_anomaly and self.anomaly_detectors["value"].should_alert:
self.kill_switch.pause(f"Anomaly detected: {anomaly_reason}")
return {"error": "Anomaly detected - paused", "reason": anomaly_reason}
# Check circuit breakers
if not self.circuit_breakers["api"].can_execute():
return {"error": "Circuit open", "service": "api"}
# Human approval for high-value actions
if value > 1000:
request = self.approval_gate.request_approval(
action_type=action.get("type", "unknown"),
description=action.get("description", ""),
value=value,
context=action
)
if request.status != ApprovalStatus.APPROVED:
return {"error": "Approval required", "request_id": request.id}
# Execute the actual action
try:
result = self._do_action(action)
self.resource_limiter.record_action(value)
self.circuit_breakers["api"].record_success()
return {"success": True, "result": result}
except Exception as e:
self.circuit_breakers["api"].record_failure()
return {"error": str(e)}
def _do_action(self, action: dict):
"""Actual action implementation."""
pass
def _cleanup(self):
"""Cleanup on kill switch activation."""
print("Running cleanup...")
```
---
## Examples
### Example 1: Trading System Safety
**Input**: "Add safety controls to my automated trading system"
**Output**: Implementation with:
- Position size limits per trade
- Daily loss limits with auto-stop
- Anomaly detection on trade frequency
- Human approval for large trades
- Kill switch for market anomalies
### Example 2: AI Agent Safeguards
**Input**: "Add safeguards to my autonomous AI agent"
**Output**: Implementation with:
- API call rate limiting
- Cost accumulation limits
- Circuit breakers for external services
- Human approval for destructive actions
- Emergency stop capability
---
## Validation Checklist
Before deploying autonomous system:
- [ ] Kill switch tested and accessible
- [ ] Circuit breakers configured for all external calls
- [ ] Resource limits set appropriately
- [ ] Anomaly detection calibrated on historical data
- [ ] Human approval thresholds defined
- [ ] Alert notifications tested
- [ ] Cleanup procedures verified
- [ ] Recovery procedures documented
- [ ] Monitoring dashboards created
---
## Related Resources
- Skill: `repository-auditor` - Security review of safety code
- Skill: `cicd-pipeline-generator` - CI with safety tests
- Pattern: Circuit Breaker (Martin Fowler)
- Pattern: Bulkhead (Resilience patterns)
---
## Changelog
### 1.0.0 (January 2026)
- Initial release
- Kill switch implementation
- Circuit breaker pattern
- Resource limiter
- Anomaly detection
- Human-in-the-loop gates