input-validation
skillValidate construction data inputs before processing: cost estimates, schedules, BIM data, field reports. Catch errors early with domain-specific rules.
apm::install
apm install @datadrivenconstruction/input-validationapm::skill.md
---
name: "input-validation"
description: "Validate construction data inputs before processing: cost estimates, schedules, BIM data, field reports. Catch errors early with domain-specific rules."
homepage: "https://datadrivenconstruction.io"
metadata: {"openclaw": {"emoji": "✔️", "os": ["darwin", "linux", "win32"], "homepage": "https://datadrivenconstruction.io", "requires": {"bins": ["python3"]}}}
---
# Input Validation for Construction Data
## Overview
Validate incoming construction data before processing to catch errors early. Domain-specific validation rules for estimates, schedules, BIM exports, and field data.
## Validation Framework
### Core Validator Class
```python
from dataclasses import dataclass, field
from typing import List, Dict, Any, Callable, Optional
from enum import Enum
import re
from datetime import datetime
class ValidationSeverity(Enum):
ERROR = "error" # Must fix, blocks processing
WARNING = "warning" # Should review, allows processing
INFO = "info" # FYI, no action needed
@dataclass
class ValidationIssue:
field: str
message: str
severity: ValidationSeverity
value: Any = None
suggestion: str = None
@dataclass
class ValidationResult:
is_valid: bool
issues: List[ValidationIssue] = field(default_factory=list)
def add_error(self, field: str, message: str, value: Any = None, suggestion: str = None):
self.issues.append(ValidationIssue(field, message, ValidationSeverity.ERROR, value, suggestion))
self.is_valid = False
def add_warning(self, field: str, message: str, value: Any = None, suggestion: str = None):
self.issues.append(ValidationIssue(field, message, ValidationSeverity.WARNING, value, suggestion))
def add_info(self, field: str, message: str, value: Any = None):
self.issues.append(ValidationIssue(field, message, ValidationSeverity.INFO, value))
@property
def errors(self) -> List[ValidationIssue]:
return [i for i in self.issues if i.severity == ValidationSeverity.ERROR]
@property
def warnings(self) -> List[ValidationIssue]:
return [i for i in self.issues if i.severity == ValidationSeverity.WARNING]
def to_report(self) -> str:
lines = ["VALIDATION REPORT", "=" * 50]
lines.append(f"Status: {'PASSED' if self.is_valid else 'FAILED'}")
lines.append(f"Errors: {len(self.errors)}, Warnings: {len(self.warnings)}")
lines.append("")
for issue in self.issues:
icon = "❌" if issue.severity == ValidationSeverity.ERROR else "⚠️" if issue.severity == ValidationSeverity.WARNING else "ℹ️"
lines.append(f"{icon} [{issue.field}] {issue.message}")
if issue.suggestion:
lines.append(f" Suggestion: {issue.suggestion}")
return "\n".join(lines)
```
### Cost Estimate Validation
```python
class CostEstimateValidator:
"""Validate cost estimate inputs."""
# Typical cost ranges per CSI division ($/SF)
TYPICAL_RANGES = {
'03': (15, 45), # Concrete
'04': (8, 25), # Masonry
'05': (12, 35), # Metals
'06': (5, 20), # Wood/Plastics
'07': (8, 30), # Thermal/Moisture
'08': (15, 50), # Openings
'09': (10, 40), # Finishes
'22': (8, 25), # Plumbing
'23': (12, 40), # HVAC
'26': (10, 35), # Electrical
}
def validate(self, estimate_data: Dict[str, Any]) -> ValidationResult:
result = ValidationResult(is_valid=True)
# Required fields
self._validate_required_fields(estimate_data, result)
# Line item validation
if 'line_items' in estimate_data:
self._validate_line_items(estimate_data['line_items'], result)
# Total validation
self._validate_totals(estimate_data, result)
# Cost range validation
if 'gross_area' in estimate_data:
self._validate_cost_ranges(estimate_data, result)
return result
def _validate_required_fields(self, data: dict, result: ValidationResult):
required = ['project_name', 'estimate_date', 'line_items', 'total']
for field in required:
if field not in data or data[field] is None:
result.add_error(field, f"Required field '{field}' is missing")
def _validate_line_items(self, items: list, result: ValidationResult):
for i, item in enumerate(items):
# Check for negative values
if item.get('quantity', 0) < 0:
result.add_error(f"line_items[{i}].quantity", "Quantity cannot be negative", item.get('quantity'))
if item.get('unit_cost', 0) < 0:
result.add_error(f"line_items[{i}].unit_cost", "Unit cost cannot be negative", item.get('unit_cost'))
# Check for missing descriptions
if not item.get('description'):
result.add_warning(f"line_items[{i}].description", "Line item missing description")
# Check for valid CSI code
if item.get('csi_code'):
if not re.match(r'^\d{2}\s?\d{2}\s?\d{2}$', item['csi_code']):
result.add_warning(f"line_items[{i}].csi_code", f"Invalid CSI code format: {item['csi_code']}", suggestion="Use format: XX XX XX")
# Check for zero amounts
amount = item.get('quantity', 0) * item.get('unit_cost', 0)
if amount == 0:
result.add_warning(f"line_items[{i}]", "Line item has zero amount")
def _validate_totals(self, data: dict, result: ValidationResult):
if 'line_items' not in data or 'total' not in data:
return
calculated = sum(
item.get('quantity', 0) * item.get('unit_cost', 0)
for item in data['line_items']
)
declared = data['total']
variance = abs(calculated - declared)
if variance > 0.01:
result.add_error("total", f"Total mismatch: calculated {calculated:.2f}, declared {declared:.2f}", variance)
def _validate_cost_ranges(self, data: dict, result: ValidationResult):
gross_area = data['gross_area']
for item in data.get('line_items', []):
csi_div = item.get('csi_code', '')[:2]
if csi_div in self.TYPICAL_RANGES:
amount = item.get('quantity', 0) * item.get('unit_cost', 0)
cost_per_sf = amount / gross_area if gross_area > 0 else 0
low, high = self.TYPICAL_RANGES[csi_div]
if cost_per_sf < low * 0.5 or cost_per_sf > high * 2:
result.add_warning(
f"line_items[{item.get('description', 'Unknown')}]",
f"Cost ${cost_per_sf:.2f}/SF outside typical range ${low}-${high}/SF for Division {csi_div}",
cost_per_sf,
"Review unit costs and quantities"
)
```
### Schedule Validation
```python
class ScheduleValidator:
"""Validate schedule/planning inputs."""
def validate(self, schedule_data: Dict[str, Any]) -> ValidationResult:
result = ValidationResult(is_valid=True)
# Required fields
self._validate_required_fields(schedule_data, result)
# Task validation
if 'tasks' in schedule_data:
self._validate_tasks(schedule_data['tasks'], result)
self._validate_dependencies(schedule_data['tasks'], result)
self._validate_resources(schedule_data['tasks'], result)
return result
def _validate_required_fields(self, data: dict, result: ValidationResult):
required = ['project_name', 'start_date', 'tasks']
for field in required:
if field not in data:
result.add_error(field, f"Required field '{field}' is missing")
def _validate_tasks(self, tasks: list, result: ValidationResult):
task_ids = set()
for i, task in enumerate(tasks):
# Check for duplicate IDs
task_id = task.get('id')
if task_id in task_ids:
result.add_error(f"tasks[{i}].id", f"Duplicate task ID: {task_id}")
task_ids.add(task_id)
# Check dates
start = task.get('start_date')
end = task.get('end_date')
if start and end:
try:
start_dt = datetime.fromisoformat(start) if isinstance(start, str) else start
end_dt = datetime.fromisoformat(end) if isinstance(end, str) else end
if end_dt < start_dt:
result.add_error(f"tasks[{i}]", f"End date before start date", f"{start} -> {end}")
# Check for unrealistic durations
duration = (end_dt - start_dt).days
if duration > 365:
result.add_warning(f"tasks[{i}]", f"Task duration exceeds 1 year ({duration} days)")
if duration == 0 and task.get('type') != 'milestone':
result.add_warning(f"tasks[{i}]", "Task has zero duration but is not marked as milestone")
except ValueError as e:
result.add_error(f"tasks[{i}]", f"Invalid date format: {e}")
# Check for missing duration
if not task.get('duration') and not (start and end):
result.add_error(f"tasks[{i}]", "Task missing duration or start/end dates")
def _validate_dependencies(self, tasks: list, result: ValidationResult):
task_ids = {t.get('id') for t in tasks}
task_dict = {t.get('id'): t for t in tasks}
for task in tasks:
predecessors = task.get('predecessors', [])
for pred_id in predecessors:
# Check predecessor exists
if pred_id not in task_ids:
result.add_error(f"tasks[{task.get('id')}].predecessors", f"Predecessor '{pred_id}' does not exist")
continue
# Check for logical sequence (if dates available)
pred = task_dict.get(pred_id)
if pred and pred.get('end_date') and task.get('start_date'):
pred_end = datetime.fromisoformat(pred['end_date']) if isinstance(pred['end_date'], str) else pred['end_date']
task_start = datetime.fromisoformat(task['start_date']) if isinstance(task['start_date'], str) else task['start_date']
if task_start < pred_end:
result.add_error(
f"tasks[{task.get('id')}]",
f"Task starts before predecessor '{pred_id}' ends",
f"Pred ends: {pred_end}, Task starts: {task_start}"
)
def _validate_resources(self, tasks: list, result: ValidationResult):
# Check for resource over-allocation by date
resource_usage = {}
for task in tasks:
resources = task.get('resources', [])
start = task.get('start_date')
end = task.get('end_date')
if not (resources and start and end):
continue
# Simplified: just check if any resource assigned to multiple tasks
for resource in resources:
res_id = resource.get('id') or resource.get('name')
if res_id not in resource_usage:
resource_usage[res_id] = []
resource_usage[res_id].append({
'task': task.get('id'),
'start': start,
'end': end,
'allocation': resource.get('allocation', 100)
})
# Check allocations
for res_id, assignments in resource_usage.items():
if len(assignments) > 1:
# Simple overlap check
total_allocation = sum(a['allocation'] for a in assignments)
if total_allocation > 100:
result.add_warning(
f"resource[{res_id}]",
f"Resource may be over-allocated ({total_allocation}%)",
suggestion="Check for overlapping assignments"
)
```
### BIM Data Validation
```python
class BIMDataValidator:
"""Validate BIM export data (IFC, COBie, etc.)."""
def validate(self, bim_data: Dict[str, Any]) -> ValidationResult:
result = ValidationResult(is_valid=True)
# Check element data
if 'elements' in bim_data:
self._validate_elements(bim_data['elements'], result)
# Check property sets
if 'property_sets' in bim_data:
self._validate_properties(bim_data['property_sets'], result)
# Check spatial structure
if 'spatial_structure' in bim_data:
self._validate_spatial(bim_data['spatial_structure'], result)
return result
def _validate_elements(self, elements: list, result: ValidationResult):
guids = set()
for i, elem in enumerate(elements):
# Check for unique GUIDs
guid = elem.get('guid')
if guid in guids:
result.add_error(f"elements[{i}].guid", f"Duplicate GUID: {guid}")
guids.add(guid)
# Check for required properties
if not elem.get('ifc_type'):
result.add_warning(f"elements[{i}]", "Element missing IFC type")
if not elem.get('name'):
result.add_warning(f"elements[{i}]", "Element missing name")
# Check geometry
if not elem.get('geometry') and not elem.get('location'):
result.add_warning(f"elements[{i}]", "Element has no geometry or location")
# Check for valid quantities
for qty_name in ['area', 'volume', 'length']:
if qty_name in elem and elem[qty_name] < 0:
result.add_error(f"elements[{i}].{qty_name}", f"Negative {qty_name} value", elem[qty_name])
def _validate_properties(self, property_sets: list, result: ValidationResult):
for pset in property_sets:
pset_name = pset.get('name', 'Unknown')
# Check for empty property sets
if not pset.get('properties'):
result.add_warning(f"property_set[{pset_name}]", "Property set has no properties")
# Check property values
for prop in pset.get('properties', []):
if prop.get('value') is None:
result.add_info(f"property_set[{pset_name}].{prop.get('name')}", "Property has null value")
def _validate_spatial(self, spatial: dict, result: ValidationResult):
# Check for proper hierarchy
if not spatial.get('site'):
result.add_warning("spatial_structure", "No site defined")
if not spatial.get('building'):
result.add_warning("spatial_structure", "No building defined")
if not spatial.get('levels') or len(spatial.get('levels', [])) == 0:
result.add_warning("spatial_structure", "No levels/floors defined")
```
### Field Data Validation
```python
class FieldDataValidator:
"""Validate field/site data inputs."""
def validate(self, field_data: Dict[str, Any]) -> ValidationResult:
result = ValidationResult(is_valid=True)
# Daily report validation
if field_data.get('type') == 'daily_report':
self._validate_daily_report(field_data, result)
# Inspection data
if field_data.get('type') == 'inspection':
self._validate_inspection(field_data, result)
# Progress data
if field_data.get('type') == 'progress':
self._validate_progress(field_data, result)
return result
def _validate_daily_report(self, data: dict, result: ValidationResult):
required = ['date', 'weather', 'workforce']
for field in required:
if field not in data:
result.add_error(field, f"Daily report missing '{field}'")
# Validate workforce
if 'workforce' in data:
total = sum(w.get('count', 0) for w in data['workforce'])
if total == 0:
result.add_warning("workforce", "No workers reported on-site")
if total > 500:
result.add_warning("workforce", f"Unusually high workforce count: {total}")
# Validate date
if 'date' in data:
try:
report_date = datetime.fromisoformat(data['date']) if isinstance(data['date'], str) else data['date']
if report_date > datetime.now():
result.add_error("date", "Report date is in the future")
except ValueError:
result.add_error("date", "Invalid date format")
def _validate_inspection(self, data: dict, result: ValidationResult):
required = ['inspection_type', 'date', 'inspector', 'result']
for field in required:
if field not in data:
result.add_error(field, f"Inspection missing '{field}'")
# Check result value
valid_results = ['pass', 'fail', 'conditional', 'not_applicable']
if data.get('result') and data['result'].lower() not in valid_results:
result.add_warning("result", f"Non-standard inspection result: {data['result']}")
def _validate_progress(self, data: dict, result: ValidationResult):
# Check percentage values
if 'percent_complete' in data:
pct = data['percent_complete']
if pct < 0 or pct > 100:
result.add_error("percent_complete", f"Invalid percentage: {pct}", suggestion="Must be 0-100")
# Check for regression (if previous value available)
if 'previous_percent' in data and 'percent_complete' in data:
if data['percent_complete'] < data['previous_percent']:
result.add_warning("percent_complete", "Progress decreased from previous report",
f"{data['previous_percent']}% -> {data['percent_complete']}%")
```
## Usage Examples
```python
# Validate a cost estimate
estimate = {
'project_name': 'Office Building',
'estimate_date': '2026-01-15',
'gross_area': 50000,
'line_items': [
{'description': 'Concrete', 'csi_code': '03 30 00', 'quantity': 5000, 'unit_cost': 150},
{'description': 'Steel', 'csi_code': '05 12 00', 'quantity': 200, 'unit_cost': 2500},
],
'total': 1250000
}
validator = CostEstimateValidator()
result = validator.validate(estimate)
print(result.to_report())
# Validate before processing
if result.is_valid:
process_estimate(estimate)
else:
print("Fix errors before processing")
for error in result.errors:
print(f" - {error.field}: {error.message}")
```
## Integration with DDC Pipeline
```python
# Validate all inputs before pipeline execution
def validate_pipeline_inputs(inputs: dict) -> bool:
validators = {
'estimate': CostEstimateValidator(),
'schedule': ScheduleValidator(),
'bim_data': BIMDataValidator(),
'field_data': FieldDataValidator()
}
all_valid = True
for input_type, data in inputs.items():
if input_type in validators:
result = validators[input_type].validate(data)
if not result.is_valid:
print(f"\n{input_type.upper()} VALIDATION FAILED:")
print(result.to_report())
all_valid = False
return all_valid
```
## Resources
- **Data Quality Best Practices**: Validate early, validate often
- **Construction Data Standards**: CSI, IFC, COBie specifications
- **Error Handling**: Always provide actionable suggestions