APM

>Agent Skill

@datadrivenconstruction/drone-site-survey

skilldevelopment

Process drone survey data for construction sites. Generate orthomosaics, DEMs, point clouds, calculate volumes, track progress, and integrate with BIM models for comparison.

apm::install
$apm install @datadrivenconstruction/drone-site-survey
apm::skill.md
---
name: "drone-site-survey"
description: "Process drone survey data for construction sites. Generate orthomosaics, DEMs, point clouds, calculate volumes, track progress, and integrate with BIM models for comparison."
homepage: "https://datadrivenconstruction.io"
metadata: {"openclaw": {"emoji": "🚀", "os": ["darwin", "linux", "win32"], "homepage": "https://datadrivenconstruction.io", "requires": {"bins": ["python3"]}}}
---
# Drone Site Survey Processing

## Overview

This skill implements drone data processing for construction site monitoring. Process aerial imagery to generate maps, measure volumes, track progress, and compare with design models.

**Capabilities:**
- Orthomosaic generation
- Digital Elevation Model (DEM) creation
- Point cloud processing
- Volume calculations
- Progress monitoring
- BIM comparison
- Stockpile measurement

## Quick Start

```python
from dataclasses import dataclass
from typing import List, Dict, Tuple, Optional
from datetime import datetime
import numpy as np

@dataclass
class DroneImage:
    filename: str
    timestamp: datetime
    latitude: float
    longitude: float
    altitude: float
    heading: float
    pitch: float
    roll: float
    camera_model: str

@dataclass
class PointCloud:
    points: np.ndarray  # Nx3 array
    colors: Optional[np.ndarray] = None  # Nx3 RGB
    normals: Optional[np.ndarray] = None  # Nx3

@dataclass
class VolumeResult:
    volume_m3: float
    area_m2: float
    method: str
    reference_plane: str
    confidence: float

def calculate_volume_simple(point_cloud: PointCloud,
                           reference_z: float = None) -> VolumeResult:
    """Simple volume calculation from point cloud"""
    points = point_cloud.points

    if reference_z is None:
        reference_z = np.min(points[:, 2])

    # Grid-based volume calculation
    x_min, x_max = np.min(points[:, 0]), np.max(points[:, 0])
    y_min, y_max = np.min(points[:, 1]), np.max(points[:, 1])

    grid_size = 0.5  # 50cm grid
    x_bins = np.arange(x_min, x_max + grid_size, grid_size)
    y_bins = np.arange(y_min, y_max + grid_size, grid_size)

    volume = 0
    cell_area = grid_size ** 2

    for i in range(len(x_bins) - 1):
        for j in range(len(y_bins) - 1):
            mask = (
                (points[:, 0] >= x_bins[i]) & (points[:, 0] < x_bins[i + 1]) &
                (points[:, 1] >= y_bins[j]) & (points[:, 1] < y_bins[j + 1])
            )
            cell_points = points[mask]
            if len(cell_points) > 0:
                max_z = np.max(cell_points[:, 2])
                height = max_z - reference_z
                if height > 0:
                    volume += height * cell_area

    area = (x_max - x_min) * (y_max - y_min)

    return VolumeResult(
        volume_m3=volume,
        area_m2=area,
        method='grid_based',
        reference_plane=f'z={reference_z:.2f}',
        confidence=0.9
    )

# Example usage
sample_points = np.random.rand(10000, 3) * [100, 100, 10]  # 100x100m, 10m height
point_cloud = PointCloud(points=sample_points)
result = calculate_volume_simple(point_cloud)
print(f"Volume: {result.volume_m3:.2f} m³, Area: {result.area_m2:.2f} m²")
```

## Comprehensive Drone Survey System

### Image Processing Pipeline

```python
from dataclasses import dataclass, field
from typing import List, Dict, Tuple, Optional
from datetime import datetime
import numpy as np
from pathlib import Path
import json

@dataclass
class CameraParameters:
    focal_length_mm: float
    sensor_width_mm: float
    sensor_height_mm: float
    image_width_px: int
    image_height_px: int

@dataclass
class GeoReference:
    crs: str  # Coordinate Reference System (e.g., "EPSG:4326")
    origin: Tuple[float, float, float]  # lat, lon, alt
    rotation: Tuple[float, float, float]  # heading, pitch, roll

@dataclass
class SurveyFlight:
    flight_id: str
    date: datetime
    site_name: str
    images: List[DroneImage]
    camera: CameraParameters
    geo_reference: GeoReference
    flight_altitude: float
    overlap_forward: float = 0.8
    overlap_side: float = 0.7
    gsd: float = 0  # Ground Sample Distance (cm/pixel)

    def __post_init__(self):
        if self.gsd == 0 and self.camera:
            # Calculate GSD
            sensor_width = self.camera.sensor_width_mm
            focal_length = self.camera.focal_length_mm
            image_width = self.camera.image_width_px
            altitude = self.flight_altitude

            self.gsd = (altitude * sensor_width) / (focal_length * image_width) * 100  # cm

@dataclass
class ProcessingResult:
    orthomosaic_path: Optional[str] = None
    dem_path: Optional[str] = None
    dsm_path: Optional[str] = None
    point_cloud_path: Optional[str] = None
    report_path: Optional[str] = None
    statistics: Dict = field(default_factory=dict)

class DroneDataProcessor:
    """Process drone survey data"""

    def __init__(self, output_dir: str):
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)

    def process_survey(self, flight: SurveyFlight,
                      generate_ortho: bool = True,
                      generate_dem: bool = True,
                      generate_pointcloud: bool = True) -> ProcessingResult:
        """Process drone survey data"""
        result = ProcessingResult()
        result.statistics['flight_id'] = flight.flight_id
        result.statistics['image_count'] = len(flight.images)
        result.statistics['gsd_cm'] = flight.gsd
        result.statistics['flight_date'] = flight.date.isoformat()

        # In production, these would call actual photogrammetry libraries
        # like OpenDroneMap, Pix4D API, or custom SfM pipeline

        if generate_ortho:
            result.orthomosaic_path = str(self.output_dir / f"{flight.flight_id}_ortho.tif")
            result.statistics['ortho_resolution'] = flight.gsd

        if generate_dem:
            result.dem_path = str(self.output_dir / f"{flight.flight_id}_dem.tif")
            result.dsm_path = str(self.output_dir / f"{flight.flight_id}_dsm.tif")

        if generate_pointcloud:
            result.point_cloud_path = str(self.output_dir / f"{flight.flight_id}_pointcloud.las")

        # Generate report
        result.report_path = str(self.output_dir / f"{flight.flight_id}_report.json")
        with open(result.report_path, 'w') as f:
            json.dump(result.statistics, f, indent=2)

        return result

    def extract_point_cloud(self, las_path: str) -> PointCloud:
        """Extract point cloud from LAS file"""
        # In production, use laspy or similar
        # Simulated point cloud for demonstration
        n_points = 100000
        points = np.random.rand(n_points, 3) * [100, 100, 20]
        colors = np.random.randint(0, 255, (n_points, 3), dtype=np.uint8)

        return PointCloud(points=points, colors=colors)

    def compare_surveys(self, survey1: ProcessingResult,
                       survey2: ProcessingResult) -> Dict:
        """Compare two surveys for change detection"""
        # Load point clouds
        pc1 = self.extract_point_cloud(survey1.point_cloud_path)
        pc2 = self.extract_point_cloud(survey2.point_cloud_path)

        # Calculate elevation differences
        # In production, use proper point cloud registration and comparison

        comparison = {
            'survey1_date': survey1.statistics.get('flight_date'),
            'survey2_date': survey2.statistics.get('flight_date'),
            'point_count_diff': len(pc2.points) - len(pc1.points),
            'changes_detected': []
        }

        return comparison
```

### Volume Calculation Engine

```python
from scipy.spatial import Delaunay
from scipy.interpolate import griddata
import numpy as np

class VolumeCalculator:
    """Advanced volume calculations from drone data"""

    def __init__(self, point_cloud: PointCloud):
        self.points = point_cloud.points
        self.colors = point_cloud.colors

    def calculate_cut_fill(self, design_surface: np.ndarray,
                          grid_size: float = 0.5) -> Dict:
        """Calculate cut and fill volumes compared to design surface"""
        # Create grid
        x_min, x_max = np.min(self.points[:, 0]), np.max(self.points[:, 0])
        y_min, y_max = np.min(self.points[:, 1]), np.max(self.points[:, 1])

        x_grid = np.arange(x_min, x_max, grid_size)
        y_grid = np.arange(y_min, y_max, grid_size)
        xx, yy = np.meshgrid(x_grid, y_grid)

        # Interpolate actual surface
        actual_z = griddata(
            self.points[:, :2],
            self.points[:, 2],
            (xx, yy),
            method='linear'
        )

        # Interpolate design surface
        design_z = griddata(
            design_surface[:, :2],
            design_surface[:, 2],
            (xx, yy),
            method='linear'
        )

        # Calculate differences
        diff = actual_z - design_z
        cell_area = grid_size ** 2

        cut_volume = np.nansum(diff[diff > 0]) * cell_area
        fill_volume = np.nansum(np.abs(diff[diff < 0])) * cell_area
        net_volume = cut_volume - fill_volume

        return {
            'cut_volume_m3': float(cut_volume),
            'fill_volume_m3': float(fill_volume),
            'net_volume_m3': float(net_volume),
            'balance': 'cut' if net_volume > 0 else 'fill',
            'grid_size_m': grid_size,
            'area_m2': float((x_max - x_min) * (y_max - y_min))
        }

    def calculate_stockpile_volume(self, base_method: str = 'lowest_perimeter') -> VolumeResult:
        """Calculate stockpile volume"""
        # Find boundary points
        from scipy.spatial import ConvexHull
        hull = ConvexHull(self.points[:, :2])
        boundary_points = self.points[hull.vertices]

        if base_method == 'lowest_perimeter':
            reference_z = np.min(boundary_points[:, 2])
        elif base_method == 'average_perimeter':
            reference_z = np.mean(boundary_points[:, 2])
        elif base_method == 'triangulated':
            # Create base triangulation from boundary
            reference_z = np.min(self.points[:, 2])
        else:
            reference_z = np.min(self.points[:, 2])

        # Calculate volume using triangulated surface
        volume = self._triangulated_volume(reference_z)

        hull_area = self._calculate_hull_area(boundary_points[:, :2])

        return VolumeResult(
            volume_m3=volume,
            area_m2=hull_area,
            method='triangulated_' + base_method,
            reference_plane=f'z={reference_z:.2f}',
            confidence=0.95
        )

    def _triangulated_volume(self, reference_z: float) -> float:
        """Calculate volume using Delaunay triangulation"""
        # Create 2D triangulation
        points_2d = self.points[:, :2]
        tri = Delaunay(points_2d)

        volume = 0
        for simplex in tri.simplices:
            # Get triangle vertices
            p1 = self.points[simplex[0]]
            p2 = self.points[simplex[1]]
            p3 = self.points[simplex[2]]

            # Calculate prism volume
            avg_height = (p1[2] + p2[2] + p3[2]) / 3 - reference_z
            if avg_height > 0:
                # Triangle area
                area = 0.5 * abs(
                    (p2[0] - p1[0]) * (p3[1] - p1[1]) -
                    (p3[0] - p1[0]) * (p2[1] - p1[1])
                )
                volume += area * avg_height

        return volume

    def _calculate_hull_area(self, points_2d: np.ndarray) -> float:
        """Calculate area of convex hull"""
        hull = ConvexHull(points_2d)
        return hull.volume  # In 2D, volume is actually area

    def generate_contours(self, interval: float = 1.0) -> List[Dict]:
        """Generate elevation contours"""
        z_min = np.min(self.points[:, 2])
        z_max = np.max(self.points[:, 2])

        contour_levels = np.arange(
            np.floor(z_min / interval) * interval,
            np.ceil(z_max / interval) * interval + interval,
            interval
        )

        contours = []
        for level in contour_levels:
            # In production, use proper contouring algorithm
            contours.append({
                'elevation': float(level),
                'type': 'major' if level % 5 == 0 else 'minor'
            })

        return contours
```

### Progress Monitoring

```python
from datetime import date
from typing import List, Dict

@dataclass
class ProgressPoint:
    date: date
    point_cloud: PointCloud
    orthomosaic_path: str
    annotations: Dict = field(default_factory=dict)

class ConstructionProgressMonitor:
    """Monitor construction progress from drone surveys"""

    def __init__(self, project_name: str):
        self.project_name = project_name
        self.surveys: List[ProgressPoint] = []
        self.volume_calculator = None

    def add_survey(self, survey_date: date, point_cloud: PointCloud,
                  orthomosaic_path: str):
        """Add survey for progress tracking"""
        self.surveys.append(ProgressPoint(
            date=survey_date,
            point_cloud=point_cloud,
            orthomosaic_path=orthomosaic_path
        ))
        self.surveys.sort(key=lambda x: x.date)

    def calculate_earthwork_progress(self, design_surface: np.ndarray) -> List[Dict]:
        """Calculate earthwork progress over time"""
        progress = []

        for i, survey in enumerate(self.surveys):
            calc = VolumeCalculator(survey.point_cloud)
            cut_fill = calc.calculate_cut_fill(design_surface)

            progress.append({
                'date': survey.date.isoformat(),
                'survey_index': i + 1,
                'cut_volume_m3': cut_fill['cut_volume_m3'],
                'fill_volume_m3': cut_fill['fill_volume_m3'],
                'net_volume_m3': cut_fill['net_volume_m3']
            })

        # Calculate progress percentages
        if len(progress) > 1:
            initial = progress[0]
            for p in progress[1:]:
                if initial['net_volume_m3'] != 0:
                    p['progress_pct'] = (
                        (initial['net_volume_m3'] - p['net_volume_m3']) /
                        abs(initial['net_volume_m3']) * 100
                    )
                else:
                    p['progress_pct'] = 0

        return progress

    def detect_changes(self, survey1_idx: int, survey2_idx: int,
                      threshold_m: float = 0.5) -> Dict:
        """Detect changes between two surveys"""
        pc1 = self.surveys[survey1_idx].point_cloud
        pc2 = self.surveys[survey2_idx].point_cloud

        # Create grids for comparison
        grid_size = 1.0  # 1m grid

        x_min = min(np.min(pc1.points[:, 0]), np.min(pc2.points[:, 0]))
        x_max = max(np.max(pc1.points[:, 0]), np.max(pc2.points[:, 0]))
        y_min = min(np.min(pc1.points[:, 1]), np.min(pc2.points[:, 1]))
        y_max = max(np.max(pc1.points[:, 1]), np.max(pc2.points[:, 1]))

        x_bins = np.arange(x_min, x_max + grid_size, grid_size)
        y_bins = np.arange(y_min, y_max + grid_size, grid_size)

        changes = {
            'increased': [],  # Areas where elevation increased
            'decreased': [],  # Areas where elevation decreased
            'unchanged': 0
        }

        for i in range(len(x_bins) - 1):
            for j in range(len(y_bins) - 1):
                # Get points in cell for both surveys
                cell_x = (x_bins[i] + x_bins[i + 1]) / 2
                cell_y = (y_bins[j] + y_bins[j + 1]) / 2

                mask1 = (
                    (pc1.points[:, 0] >= x_bins[i]) & (pc1.points[:, 0] < x_bins[i + 1]) &
                    (pc1.points[:, 1] >= y_bins[j]) & (pc1.points[:, 1] < y_bins[j + 1])
                )
                mask2 = (
                    (pc2.points[:, 0] >= x_bins[i]) & (pc2.points[:, 0] < x_bins[i + 1]) &
                    (pc2.points[:, 1] >= y_bins[j]) & (pc2.points[:, 1] < y_bins[j + 1])
                )

                if np.sum(mask1) > 0 and np.sum(mask2) > 0:
                    z1 = np.mean(pc1.points[mask1, 2])
                    z2 = np.mean(pc2.points[mask2, 2])
                    diff = z2 - z1

                    if diff > threshold_m:
                        changes['increased'].append({
                            'x': cell_x, 'y': cell_y, 'change_m': diff
                        })
                    elif diff < -threshold_m:
                        changes['decreased'].append({
                            'x': cell_x, 'y': cell_y, 'change_m': diff
                        })
                    else:
                        changes['unchanged'] += 1

        changes['summary'] = {
            'increased_cells': len(changes['increased']),
            'decreased_cells': len(changes['decreased']),
            'unchanged_cells': changes['unchanged'],
            'date_from': self.surveys[survey1_idx].date.isoformat(),
            'date_to': self.surveys[survey2_idx].date.isoformat()
        }

        return changes

    def generate_progress_report(self, output_path: str,
                                design_surface: np.ndarray = None) -> str:
        """Generate progress report"""
        import pandas as pd

        report_data = {
            'project': self.project_name,
            'surveys': len(self.surveys),
            'date_range': {
                'start': self.surveys[0].date.isoformat() if self.surveys else None,
                'end': self.surveys[-1].date.isoformat() if self.surveys else None
            }
        }

        if design_surface is not None:
            report_data['earthwork_progress'] = self.calculate_earthwork_progress(design_surface)

        with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
            # Summary
            pd.DataFrame([report_data]).to_excel(writer, sheet_name='Summary', index=False)

            # Survey list
            survey_data = [{
                'Date': s.date,
                'Orthomosaic': s.orthomosaic_path
            } for s in self.surveys]
            pd.DataFrame(survey_data).to_excel(writer, sheet_name='Surveys', index=False)

            # Progress (if available)
            if 'earthwork_progress' in report_data:
                pd.DataFrame(report_data['earthwork_progress']).to_excel(
                    writer, sheet_name='Progress', index=False
                )

        return output_path
```

### BIM Comparison

```python
class BIMDroneComparator:
    """Compare drone survey with BIM model"""

    def __init__(self, bim_surface: np.ndarray, drone_pointcloud: PointCloud):
        self.bim = bim_surface
        self.drone = drone_pointcloud

    def compare_elevations(self, grid_size: float = 1.0) -> Dict:
        """Compare drone elevations with BIM design"""
        # Create comparison grid
        x_min = max(np.min(self.bim[:, 0]), np.min(self.drone.points[:, 0]))
        x_max = min(np.max(self.bim[:, 0]), np.max(self.drone.points[:, 0]))
        y_min = max(np.min(self.bim[:, 1]), np.min(self.drone.points[:, 1]))
        y_max = min(np.max(self.bim[:, 1]), np.max(self.drone.points[:, 1]))

        x_grid = np.arange(x_min, x_max, grid_size)
        y_grid = np.arange(y_min, y_max, grid_size)
        xx, yy = np.meshgrid(x_grid, y_grid)

        # Interpolate both surfaces
        bim_z = griddata(self.bim[:, :2], self.bim[:, 2], (xx, yy), method='linear')
        drone_z = griddata(
            self.drone.points[:, :2],
            self.drone.points[:, 2],
            (xx, yy),
            method='linear'
        )

        # Calculate differences
        diff = drone_z - bim_z

        valid_mask = ~np.isnan(diff)
        valid_diff = diff[valid_mask]

        return {
            'mean_diff_m': float(np.mean(valid_diff)),
            'std_diff_m': float(np.std(valid_diff)),
            'max_diff_m': float(np.max(valid_diff)),
            'min_diff_m': float(np.min(valid_diff)),
            'rmse_m': float(np.sqrt(np.mean(valid_diff ** 2))),
            'within_5cm': float(np.sum(np.abs(valid_diff) < 0.05) / len(valid_diff) * 100),
            'within_10cm': float(np.sum(np.abs(valid_diff) < 0.10) / len(valid_diff) * 100),
            'comparison_points': int(len(valid_diff))
        }

    def find_deviations(self, tolerance_m: float = 0.1) -> List[Dict]:
        """Find areas with significant deviations"""
        deviations = []
        grid_size = 1.0

        # Grid comparison
        x_min = max(np.min(self.bim[:, 0]), np.min(self.drone.points[:, 0]))
        x_max = min(np.max(self.bim[:, 0]), np.max(self.drone.points[:, 0]))
        y_min = max(np.min(self.bim[:, 1]), np.min(self.drone.points[:, 1]))
        y_max = min(np.max(self.bim[:, 1]), np.max(self.drone.points[:, 1]))

        for x in np.arange(x_min, x_max, grid_size):
            for y in np.arange(y_min, y_max, grid_size):
                # Get average elevations
                bim_mask = (
                    (self.bim[:, 0] >= x) & (self.bim[:, 0] < x + grid_size) &
                    (self.bim[:, 1] >= y) & (self.bim[:, 1] < y + grid_size)
                )
                drone_mask = (
                    (self.drone.points[:, 0] >= x) & (self.drone.points[:, 0] < x + grid_size) &
                    (self.drone.points[:, 1] >= y) & (self.drone.points[:, 1] < y + grid_size)
                )

                if np.sum(bim_mask) > 0 and np.sum(drone_mask) > 0:
                    bim_z = np.mean(self.bim[bim_mask, 2])
                    drone_z = np.mean(self.drone.points[drone_mask, 2])
                    diff = drone_z - bim_z

                    if abs(diff) > tolerance_m:
                        deviations.append({
                            'x': x + grid_size / 2,
                            'y': y + grid_size / 2,
                            'bim_elevation': bim_z,
                            'actual_elevation': drone_z,
                            'deviation_m': diff,
                            'type': 'high' if diff > 0 else 'low'
                        })

        return deviations
```

## Quick Reference

| Measurement | Method | Accuracy |
|-------------|--------|----------|
| Stockpile Volume | Triangulated | ±2-5% |
| Cut/Fill Volume | Grid comparison | ±5% |
| Area Measurement | Orthomosaic | <1cm GSD |
| Elevation (DEM) | Photogrammetry | ±2-5cm |
| Progress Tracking | Multi-temporal | Relative |

## Resources

- **OpenDroneMap**: https://www.opendronemap.org
- **Pix4D**: https://www.pix4d.com
- **LAStools**: https://rapidlasso.com/lastools/
- **DDC Website**: https://datadrivenconstruction.io

## Next Steps

- See `progress-monitoring-cv` for image-based progress
- See `bim-validation-pipeline` for model comparison
- See `data-visualization` for 3D visualization