APM

>Agent Skill

@datadrivenconstruction/equipment-telematics

skilldevelopment

Integrate and analyze telematics data from heavy construction equipment. Track location, utilization, fuel consumption, maintenance needs, and operator behavior.

apm::install
$apm install @datadrivenconstruction/equipment-telematics
apm::skill.md
---
name: "equipment-telematics"
description: "Integrate and analyze telematics data from heavy construction equipment. Track location, utilization, fuel consumption, maintenance needs, and operator behavior."
homepage: "https://datadrivenconstruction.io"
metadata: {"openclaw": {"emoji": "🚀", "os": ["darwin", "linux", "win32"], "homepage": "https://datadrivenconstruction.io", "requires": {"bins": ["python3"]}}}
---
# Equipment Telematics

## Overview

Integrate telematics data from heavy construction equipment (excavators, cranes, loaders, trucks) to monitor utilization, track location, analyze fuel efficiency, predict maintenance needs, and ensure safe operation.

## Telematics Data Flow

```
┌─────────────────────────────────────────────────────────────────┐
│                  EQUIPMENT TELEMATICS                            │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  EQUIPMENT                  TELEMATICS              ANALYTICS   │
│  ─────────                  ──────────              ─────────   │
│                                                                  │
│  🚜 Excavator  ────┐       📍 Location              📊 Utilization│
│  🏗️ Crane      ────┼──────→ 🔧 Engine Hours ────────→ ⛽ Fuel      │
│  🚛 Truck      ────┤       ⛽ Fuel Level             🔧 Maintenance│
│  🚧 Loader     ────┘       ⚡ Performance            👷 Operator   │
│                                                                  │
│  METRICS TRACKED:                                               │
│  • GPS location and geofencing                                  │
│  • Engine hours and idle time                                   │
│  • Fuel consumption rate                                        │
│  • Load cycles and productivity                                 │
│  • Fault codes and diagnostics                                  │
│  • Operator behavior and safety                                 │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘
```

## Technical Implementation

```python
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple
from datetime import datetime, timedelta
from enum import Enum
import statistics
import math

class EquipmentType(Enum):
    EXCAVATOR = "excavator"
    CRANE = "crane"
    LOADER = "loader"
    BULLDOZER = "bulldozer"
    DUMP_TRUCK = "dump_truck"
    CONCRETE_MIXER = "concrete_mixer"
    FORKLIFT = "forklift"
    COMPACTOR = "compactor"
    GRADER = "grader"
    TELEHANDLER = "telehandler"

class OperatingStatus(Enum):
    OPERATING = "operating"
    IDLE = "idle"
    OFF = "off"
    MAINTENANCE = "maintenance"
    FAULT = "fault"

class FaultSeverity(Enum):
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"
    SHUTDOWN = "shutdown"

@dataclass
class GPSLocation:
    latitude: float
    longitude: float
    altitude: float = 0.0
    speed: float = 0.0
    heading: float = 0.0
    timestamp: datetime = field(default_factory=datetime.now)

@dataclass
class TelematicsReading:
    equipment_id: str
    timestamp: datetime
    location: GPSLocation
    engine_hours: float
    fuel_level: float  # Percentage
    fuel_rate: float   # L/hr
    engine_rpm: int
    hydraulic_temp: float
    coolant_temp: float
    operating_status: OperatingStatus
    load_percentage: float = 0.0
    operator_id: str = ""

@dataclass
class FaultCode:
    code: str
    description: str
    severity: FaultSeverity
    timestamp: datetime
    equipment_id: str
    resolved: bool = False

@dataclass
class Equipment:
    id: str
    name: str
    equipment_type: EquipmentType
    make: str
    model: str
    year: int
    serial_number: str
    hourly_rate: float = 0.0
    fuel_capacity: float = 0.0  # Liters
    current_hours: float = 0.0
    next_service_hours: float = 0.0
    assigned_site: str = ""
    assigned_operator: str = ""

@dataclass
class Geofence:
    id: str
    name: str
    center_lat: float
    center_lon: float
    radius_meters: float
    allowed_equipment: List[str] = field(default_factory=list)

@dataclass
class UtilizationReport:
    equipment_id: str
    period_start: datetime
    period_end: datetime
    total_hours: float
    operating_hours: float
    idle_hours: float
    off_hours: float
    utilization_pct: float
    idle_pct: float
    fuel_consumed: float
    fuel_efficiency: float  # L/operating hour
    cycles: int

class EquipmentTelematics:
    """Integrate and analyze equipment telematics data."""

    # Maintenance intervals by type (hours)
    SERVICE_INTERVALS = {
        EquipmentType.EXCAVATOR: 250,
        EquipmentType.CRANE: 200,
        EquipmentType.LOADER: 250,
        EquipmentType.BULLDOZER: 250,
        EquipmentType.DUMP_TRUCK: 300,
    }

    # Typical fuel rates (L/hr)
    TYPICAL_FUEL_RATES = {
        EquipmentType.EXCAVATOR: 15,
        EquipmentType.CRANE: 12,
        EquipmentType.LOADER: 18,
        EquipmentType.BULLDOZER: 25,
        EquipmentType.DUMP_TRUCK: 20,
    }

    def __init__(self, fleet_name: str):
        self.fleet_name = fleet_name
        self.equipment: Dict[str, Equipment] = {}
        self.readings: List[TelematicsReading] = []
        self.faults: List[FaultCode] = []
        self.geofences: Dict[str, Geofence] = {}

    def register_equipment(self, id: str, name: str, equipment_type: EquipmentType,
                          make: str, model: str, year: int, serial_number: str,
                          hourly_rate: float = 0, fuel_capacity: float = 0) -> Equipment:
        """Register equipment in fleet."""
        equipment = Equipment(
            id=id,
            name=name,
            equipment_type=equipment_type,
            make=make,
            model=model,
            year=year,
            serial_number=serial_number,
            hourly_rate=hourly_rate,
            fuel_capacity=fuel_capacity
        )
        self.equipment[id] = equipment
        return equipment

    def add_geofence(self, id: str, name: str, center_lat: float,
                    center_lon: float, radius_meters: float,
                    allowed_equipment: List[str] = None) -> Geofence:
        """Add geofence boundary."""
        geofence = Geofence(
            id=id,
            name=name,
            center_lat=center_lat,
            center_lon=center_lon,
            radius_meters=radius_meters,
            allowed_equipment=allowed_equipment or []
        )
        self.geofences[id] = geofence
        return geofence

    def ingest_reading(self, equipment_id: str, location: GPSLocation,
                      engine_hours: float, fuel_level: float, fuel_rate: float,
                      engine_rpm: int, hydraulic_temp: float, coolant_temp: float,
                      load_percentage: float = 0, operator_id: str = "") -> TelematicsReading:
        """Ingest telematics reading from equipment."""
        if equipment_id not in self.equipment:
            raise ValueError(f"Unknown equipment: {equipment_id}")

        # Determine operating status
        if engine_rpm == 0:
            status = OperatingStatus.OFF
        elif engine_rpm < 800 or load_percentage < 10:
            status = OperatingStatus.IDLE
        else:
            status = OperatingStatus.OPERATING

        reading = TelematicsReading(
            equipment_id=equipment_id,
            timestamp=location.timestamp,
            location=location,
            engine_hours=engine_hours,
            fuel_level=fuel_level,
            fuel_rate=fuel_rate,
            engine_rpm=engine_rpm,
            hydraulic_temp=hydraulic_temp,
            coolant_temp=coolant_temp,
            operating_status=status,
            load_percentage=load_percentage,
            operator_id=operator_id
        )

        self.readings.append(reading)

        # Update equipment status
        equip = self.equipment[equipment_id]
        equip.current_hours = engine_hours

        # Check for issues
        self._check_diagnostics(equipment_id, reading)
        self._check_geofence(equipment_id, location)

        return reading

    def _check_diagnostics(self, equipment_id: str, reading: TelematicsReading):
        """Check for diagnostic issues."""
        equip = self.equipment[equipment_id]

        # High temperature warning
        if reading.hydraulic_temp > 90:
            self._add_fault(equipment_id, "HYD_TEMP_HIGH",
                          "Hydraulic temperature high", FaultSeverity.WARNING)

        if reading.coolant_temp > 100:
            self._add_fault(equipment_id, "COOLANT_TEMP_HIGH",
                          "Coolant temperature critical", FaultSeverity.CRITICAL)

        # Low fuel warning
        if reading.fuel_level < 15:
            self._add_fault(equipment_id, "FUEL_LOW",
                          "Fuel level below 15%", FaultSeverity.WARNING)

        # Service due
        service_interval = self.SERVICE_INTERVALS.get(equip.equipment_type, 250)
        hours_to_service = equip.next_service_hours - reading.engine_hours

        if hours_to_service < 0:
            self._add_fault(equipment_id, "SERVICE_OVERDUE",
                          "Maintenance service overdue", FaultSeverity.WARNING)
        elif hours_to_service < 50:
            self._add_fault(equipment_id, "SERVICE_DUE",
                          f"Service due in {hours_to_service:.0f} hours", FaultSeverity.INFO)

    def _check_geofence(self, equipment_id: str, location: GPSLocation):
        """Check geofence violations."""
        for geofence in self.geofences.values():
            # Calculate distance from center
            distance = self._haversine_distance(
                location.latitude, location.longitude,
                geofence.center_lat, geofence.center_lon
            )

            if distance > geofence.radius_meters:
                if (not geofence.allowed_equipment or
                    equipment_id in geofence.allowed_equipment):
                    self._add_fault(equipment_id, "GEOFENCE_EXIT",
                                  f"Equipment left {geofence.name} boundary",
                                  FaultSeverity.WARNING)

    def _haversine_distance(self, lat1: float, lon1: float,
                           lat2: float, lon2: float) -> float:
        """Calculate distance between two coordinates in meters."""
        R = 6371000  # Earth radius in meters

        phi1 = math.radians(lat1)
        phi2 = math.radians(lat2)
        delta_phi = math.radians(lat2 - lat1)
        delta_lambda = math.radians(lon2 - lon1)

        a = (math.sin(delta_phi/2)**2 +
             math.cos(phi1) * math.cos(phi2) * math.sin(delta_lambda/2)**2)
        c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))

        return R * c

    def _add_fault(self, equipment_id: str, code: str,
                  description: str, severity: FaultSeverity):
        """Add fault code."""
        # Check if same fault already active
        existing = [f for f in self.faults
                   if f.equipment_id == equipment_id
                   and f.code == code
                   and not f.resolved]
        if existing:
            return

        fault = FaultCode(
            code=code,
            description=description,
            severity=severity,
            timestamp=datetime.now(),
            equipment_id=equipment_id
        )
        self.faults.append(fault)

    def get_current_status(self, equipment_id: str) -> Dict:
        """Get current status of equipment."""
        if equipment_id not in self.equipment:
            raise ValueError(f"Unknown equipment: {equipment_id}")

        equip = self.equipment[equipment_id]

        # Get latest reading
        readings = [r for r in self.readings if r.equipment_id == equipment_id]
        if not readings:
            return {"equipment": equip, "status": "no_data"}

        latest = max(readings, key=lambda r: r.timestamp)

        # Active faults
        active_faults = [f for f in self.faults
                        if f.equipment_id == equipment_id and not f.resolved]

        return {
            "equipment_id": equip.id,
            "name": equip.name,
            "type": equip.equipment_type.value,
            "status": latest.operating_status.value,
            "location": {
                "lat": latest.location.latitude,
                "lon": latest.location.longitude,
                "speed": latest.location.speed
            },
            "engine_hours": latest.engine_hours,
            "fuel_level": latest.fuel_level,
            "fuel_rate": latest.fuel_rate,
            "temps": {
                "hydraulic": latest.hydraulic_temp,
                "coolant": latest.coolant_temp
            },
            "operator": latest.operator_id,
            "active_faults": len(active_faults),
            "last_update": latest.timestamp
        }

    def calculate_utilization(self, equipment_id: str,
                             start_date: datetime,
                             end_date: datetime) -> UtilizationReport:
        """Calculate utilization metrics for equipment."""
        readings = [r for r in self.readings
                   if r.equipment_id == equipment_id
                   and start_date <= r.timestamp <= end_date]

        if not readings:
            return None

        readings.sort(key=lambda r: r.timestamp)

        total_hours = (end_date - start_date).total_seconds() / 3600
        operating_hours = 0
        idle_hours = 0
        fuel_consumed = 0

        # Calculate from readings
        for i in range(1, len(readings)):
            prev = readings[i-1]
            curr = readings[i]

            interval_hours = (curr.timestamp - prev.timestamp).total_seconds() / 3600

            if prev.operating_status == OperatingStatus.OPERATING:
                operating_hours += interval_hours
                fuel_consumed += prev.fuel_rate * interval_hours
            elif prev.operating_status == OperatingStatus.IDLE:
                idle_hours += interval_hours
                fuel_consumed += prev.fuel_rate * interval_hours * 0.3  # Idle uses ~30% fuel

        off_hours = total_hours - operating_hours - idle_hours
        utilization_pct = (operating_hours / total_hours * 100) if total_hours > 0 else 0
        idle_pct = (idle_hours / (operating_hours + idle_hours) * 100) if (operating_hours + idle_hours) > 0 else 0
        fuel_efficiency = (fuel_consumed / operating_hours) if operating_hours > 0 else 0

        return UtilizationReport(
            equipment_id=equipment_id,
            period_start=start_date,
            period_end=end_date,
            total_hours=total_hours,
            operating_hours=operating_hours,
            idle_hours=idle_hours,
            off_hours=off_hours,
            utilization_pct=utilization_pct,
            idle_pct=idle_pct,
            fuel_consumed=fuel_consumed,
            fuel_efficiency=fuel_efficiency,
            cycles=0  # Would need load cycle detection
        )

    def get_fleet_summary(self) -> Dict:
        """Get summary of entire fleet."""
        summary = {
            "total_equipment": len(self.equipment),
            "by_status": {},
            "by_type": {},
            "active_faults": 0,
            "service_due": []
        }

        for equip in self.equipment.values():
            # Count by type
            eq_type = equip.equipment_type.value
            summary["by_type"][eq_type] = summary["by_type"].get(eq_type, 0) + 1

            # Get current status
            try:
                status = self.get_current_status(equip.id)
                op_status = status.get("status", "unknown")
                summary["by_status"][op_status] = summary["by_status"].get(op_status, 0) + 1

                # Check service due
                service_interval = self.SERVICE_INTERVALS.get(equip.equipment_type, 250)
                hours_to_service = equip.next_service_hours - equip.current_hours
                if hours_to_service < 50:
                    summary["service_due"].append({
                        "equipment": equip.name,
                        "hours_remaining": hours_to_service
                    })
            except Exception:
                summary["by_status"]["unknown"] = summary["by_status"].get("unknown", 0) + 1

        # Count active faults
        summary["active_faults"] = len([f for f in self.faults if not f.resolved])

        return summary

    def predict_maintenance(self, equipment_id: str) -> Dict:
        """Predict maintenance needs based on usage patterns."""
        if equipment_id not in self.equipment:
            raise ValueError(f"Unknown equipment: {equipment_id}")

        equip = self.equipment[equipment_id]

        # Calculate average daily hours
        week_ago = datetime.now() - timedelta(days=7)
        recent_readings = [r for r in self.readings
                         if r.equipment_id == equipment_id
                         and r.timestamp > week_ago]

        if len(recent_readings) < 2:
            return {"prediction": "insufficient_data"}

        hours_start = min(r.engine_hours for r in recent_readings)
        hours_end = max(r.engine_hours for r in recent_readings)
        days = (max(r.timestamp for r in recent_readings) -
                min(r.timestamp for r in recent_readings)).days or 1

        daily_hours = (hours_end - hours_start) / days

        # Predict service date
        service_interval = self.SERVICE_INTERVALS.get(equip.equipment_type, 250)
        hours_to_service = equip.next_service_hours - equip.current_hours

        if daily_hours > 0:
            days_to_service = hours_to_service / daily_hours
            service_date = datetime.now() + timedelta(days=days_to_service)
        else:
            service_date = None

        return {
            "equipment_id": equipment_id,
            "current_hours": equip.current_hours,
            "next_service_hours": equip.next_service_hours,
            "hours_to_service": hours_to_service,
            "avg_daily_hours": daily_hours,
            "predicted_service_date": service_date,
            "service_type": "Routine maintenance",
            "estimated_downtime_hours": 8
        }

    def generate_report(self) -> str:
        """Generate fleet telematics report."""
        summary = self.get_fleet_summary()

        lines = [
            "# Equipment Telematics Report",
            "",
            f"**Fleet:** {self.fleet_name}",
            f"**Report Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}",
            "",
            "## Fleet Summary",
            "",
            f"| Metric | Value |",
            f"|--------|-------|",
            f"| Total Equipment | {summary['total_equipment']} |",
            f"| Active Faults | {summary['active_faults']} |",
            f"| Service Due | {len(summary['service_due'])} |",
            "",
            "## Status Distribution",
            ""
        ]

        for status, count in summary["by_status"].items():
            lines.append(f"- {status}: {count}")

        # Equipment details
        lines.extend([
            "",
            "## Equipment Status",
            "",
            "| Equipment | Type | Status | Hours | Fuel | Faults |",
            "|-----------|------|--------|-------|------|--------|"
        ])

        for equip in self.equipment.values():
            try:
                status = self.get_current_status(equip.id)
                status_icon = "✅" if status['status'] == 'operating' else "⏸️" if status['status'] == 'idle' else "⏹️"
                lines.append(
                    f"| {equip.name} | {equip.equipment_type.value} | "
                    f"{status_icon} {status['status']} | {status['engine_hours']:.0f} | "
                    f"{status['fuel_level']:.0f}% | {status['active_faults']} |"
                )
            except Exception:
                lines.append(
                    f"| {equip.name} | {equip.equipment_type.value} | ⚠️ No data | - | - | - |"
                )

        # Service due
        if summary["service_due"]:
            lines.extend([
                "",
                "## Service Due Soon",
                "",
                "| Equipment | Hours Remaining |",
                "|-----------|-----------------|"
            ])
            for svc in summary["service_due"]:
                lines.append(f"| {svc['equipment']} | {svc['hours_remaining']:.0f} |")

        # Active faults
        active_faults = [f for f in self.faults if not f.resolved]
        if active_faults:
            lines.extend([
                "",
                "## Active Faults",
                "",
                "| Equipment | Code | Description | Severity |",
                "|-----------|------|-------------|----------|"
            ])
            for fault in active_faults[:10]:
                sev_icon = "🔴" if fault.severity == FaultSeverity.CRITICAL else "🟡"
                equip = self.equipment.get(fault.equipment_id)
                lines.append(
                    f"| {equip.name if equip else fault.equipment_id} | "
                    f"{fault.code} | {fault.description} | {sev_icon} {fault.severity.value} |"
                )

        return "\n".join(lines)
```

## Quick Start

```python
from datetime import datetime, timedelta

# Initialize telematics system
telematics = EquipmentTelematics("Site A Fleet")

# Register equipment
telematics.register_equipment(
    "EX-001", "Excavator #1", EquipmentType.EXCAVATOR,
    make="Caterpillar", model="320", year=2022,
    serial_number="CAT320X12345",
    hourly_rate=150, fuel_capacity=400
)

telematics.register_equipment(
    "CR-001", "Tower Crane #1", EquipmentType.CRANE,
    make="Liebherr", model="200EC-H", year=2021,
    serial_number="LH200EC54321",
    hourly_rate=200, fuel_capacity=300
)

# Add geofence for site boundary
telematics.add_geofence(
    "SITE-A", "Site A Boundary",
    center_lat=40.7128, center_lon=-74.0060,
    radius_meters=500
)

# Ingest telematics reading
location = GPSLocation(
    latitude=40.7128, longitude=-74.0059,
    speed=5.0, timestamp=datetime.now()
)

telematics.ingest_reading(
    "EX-001", location,
    engine_hours=1250.5,
    fuel_level=65.0,
    fuel_rate=18.5,
    engine_rpm=1800,
    hydraulic_temp=75.0,
    coolant_temp=85.0,
    load_percentage=75,
    operator_id="OP-101"
)

# Get current status
status = telematics.get_current_status("EX-001")
print(f"Excavator status: {status['status']}")
print(f"Location: {status['location']}")
print(f"Fuel: {status['fuel_level']}%")

# Calculate utilization
util = telematics.calculate_utilization(
    "EX-001",
    datetime.now() - timedelta(days=7),
    datetime.now()
)
if util:
    print(f"Utilization: {util.utilization_pct:.1f}%")
    print(f"Fuel efficiency: {util.fuel_efficiency:.1f} L/hr")

# Predict maintenance
maintenance = telematics.predict_maintenance("EX-001")
print(f"Days to service: {maintenance.get('predicted_service_date')}")

# Fleet summary
summary = telematics.get_fleet_summary()
print(f"Fleet: {summary['total_equipment']} units")

# Generate report
print(telematics.generate_report())
```

## Requirements

```bash
pip install (no external dependencies)
```