airflow-dag
skillCreate Apache Airflow DAGs for construction data pipelines. Orchestrate ETL, validation, and reporting workflows.
apm::install
apm install @datadrivenconstruction/airflow-dagapm::skill.md
---
name: "airflow-dag"
description: "Create Apache Airflow DAGs for construction data pipelines. Orchestrate ETL, validation, and reporting workflows."
homepage: "https://datadrivenconstruction.io"
metadata: {"openclaw": {"emoji": "⚙️", "os": ["win32"], "homepage": "https://datadrivenconstruction.io", "requires": {"bins": ["python3"]}}}
---
# Apache Airflow DAG for Construction
## Overview
Apache Airflow orchestrates complex data pipelines. This skill creates DAGs for construction ETL processes - from BIM extraction to cost reports.
## Python Implementation
```python
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass
from enum import Enum
import json
class TaskStatus(Enum):
"""Task execution status."""
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class DAGTask:
"""Single task in DAG."""
task_id: str
operator: str
params: Dict[str, Any]
upstream: List[str]
downstream: List[str]
@dataclass
class DAGConfig:
"""DAG configuration."""
dag_id: str
schedule: str
start_date: datetime
catchup: bool
default_args: Dict[str, Any]
tags: List[str]
class ConstructionDAGBuilder:
"""Build Airflow DAGs for construction pipelines."""
# Default DAG arguments
DEFAULT_ARGS = {
'owner': 'ddc',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(hours=2)
}
def __init__(self, dag_id: str,
schedule: str = '@daily',
tags: List[str] = None):
self.dag_id = dag_id
self.schedule = schedule
self.tags = tags or ['construction', 'ddc']
self.tasks: Dict[str, DAGTask] = {}
def add_bash_task(self, task_id: str,
command: str,
upstream: List[str] = None) -> str:
"""Add bash command task."""
self.tasks[task_id] = DAGTask(
task_id=task_id,
operator='BashOperator',
params={'bash_command': command},
upstream=upstream or [],
downstream=[]
)
self._update_downstream(task_id, upstream)
return task_id
def add_python_task(self, task_id: str,
python_callable: str,
op_kwargs: Dict = None,
upstream: List[str] = None) -> str:
"""Add Python callable task."""
self.tasks[task_id] = DAGTask(
task_id=task_id,
operator='PythonOperator',
params={
'python_callable': python_callable,
'op_kwargs': op_kwargs or {}
},
upstream=upstream or [],
downstream=[]
)
self._update_downstream(task_id, upstream)
return task_id
def add_sensor_task(self, task_id: str,
filepath: str,
upstream: List[str] = None) -> str:
"""Add file sensor task."""
self.tasks[task_id] = DAGTask(
task_id=task_id,
operator='FileSensor',
params={
'filepath': filepath,
'poke_interval': 300,
'timeout': 3600
},
upstream=upstream or [],
downstream=[]
)
self._update_downstream(task_id, upstream)
return task_id
def add_branch_task(self, task_id: str,
python_callable: str,
upstream: List[str] = None) -> str:
"""Add branching task."""
self.tasks[task_id] = DAGTask(
task_id=task_id,
operator='BranchPythonOperator',
params={'python_callable': python_callable},
upstream=upstream or [],
downstream=[]
)
self._update_downstream(task_id, upstream)
return task_id
def _update_downstream(self, task_id: str, upstream: List[str]):
"""Update downstream references."""
if upstream:
for up_task in upstream:
if up_task in self.tasks:
self.tasks[up_task].downstream.append(task_id)
def generate_dag_code(self) -> str:
"""Generate Airflow DAG Python code."""
code = '''
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.sensors.filesystem import FileSensor
from datetime import datetime, timedelta
default_args = {
'owner': 'ddc',
'depends_on_past': False,
'email_on_failure': True,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
'''
code += f'''
with DAG(
dag_id='{self.dag_id}',
default_args=default_args,
schedule_interval='{self.schedule}',
start_date=datetime(2024, 1, 1),
catchup=False,
tags={self.tags}
) as dag:
'''
# Generate task definitions
for task_id, task in self.tasks.items():
code += self._generate_task_code(task)
code += '\n'
# Generate dependencies
code += '\n # Task dependencies\n'
for task_id, task in self.tasks.items():
if task.upstream:
for upstream in task.upstream:
code += f" {upstream} >> {task_id}\n"
return code
def _generate_task_code(self, task: DAGTask) -> str:
"""Generate code for single task."""
if task.operator == 'BashOperator':
return f''' {task.task_id} = BashOperator(
task_id='{task.task_id}',
bash_command="{task.params['bash_command']}"
)'''
elif task.operator == 'PythonOperator':
kwargs = json.dumps(task.params.get('op_kwargs', {}))
return f''' {task.task_id} = PythonOperator(
task_id='{task.task_id}',
python_callable={task.params['python_callable']},
op_kwargs={kwargs}
)'''
elif task.operator == 'FileSensor':
return f''' {task.task_id} = FileSensor(
task_id='{task.task_id}',
filepath='{task.params["filepath"]}',
poke_interval={task.params['poke_interval']},
timeout={task.params['timeout']}
)'''
elif task.operator == 'BranchPythonOperator':
return f''' {task.task_id} = BranchPythonOperator(
task_id='{task.task_id}',
python_callable={task.params['python_callable']}
)'''
return ''
def save_dag(self, output_path: str):
"""Save DAG to file."""
code = self.generate_dag_code()
with open(output_path, 'w') as f:
f.write(code)
return output_path
class ConstructionPipelineTemplates:
"""Pre-built construction pipeline templates."""
@staticmethod
def bim_validation_pipeline(dag_id: str = 'bim_validation') -> ConstructionDAGBuilder:
"""Create BIM validation pipeline."""
builder = ConstructionDAGBuilder(dag_id, schedule='@daily',
tags=['bim', 'validation'])
# Wait for file
builder.add_sensor_task('wait_for_model', '/data/input/*.ifc')
# Convert to Excel
builder.add_bash_task(
'convert_ifc',
'IfcExporter.exe /data/input/*.ifc bbox',
upstream=['wait_for_model']
)
# Validate data
builder.add_python_task(
'validate_data',
'validate_bim_data',
{'rules_file': '/config/validation_rules.xlsx'},
upstream=['convert_ifc']
)
# Branch based on validation
builder.add_branch_task(
'check_validation',
'check_validation_result',
upstream=['validate_data']
)
# Success path
builder.add_python_task(
'generate_report',
'generate_validation_report',
upstream=['check_validation']
)
# Failure path
builder.add_python_task(
'send_alert',
'send_validation_alert',
upstream=['check_validation']
)
return builder
@staticmethod
def cost_estimation_pipeline(dag_id: str = 'cost_estimation') -> ConstructionDAGBuilder:
"""Create cost estimation pipeline."""
builder = ConstructionDAGBuilder(dag_id, schedule='@weekly',
tags=['cost', 'estimation'])
# Extract BIM data
builder.add_bash_task('extract_bim', 'RvtExporter.exe /data/model.rvt complete bbox')
# Generate QTO
builder.add_python_task(
'generate_qto',
'generate_quantity_takeoff',
upstream=['extract_bim']
)
# Match with cost database
builder.add_python_task(
'match_costs',
'match_cwicr_costs',
upstream=['generate_qto']
)
# Calculate estimate
builder.add_python_task(
'calculate_estimate',
'calculate_project_estimate',
upstream=['match_costs']
)
# Generate report
builder.add_python_task(
'create_report',
'create_cost_report',
upstream=['calculate_estimate']
)
return builder
@staticmethod
def batch_conversion_pipeline(dag_id: str = 'batch_convert') -> ConstructionDAGBuilder:
"""Create batch CAD conversion pipeline."""
builder = ConstructionDAGBuilder(dag_id, schedule='0 2 * * *', # 2 AM daily
tags=['conversion', 'batch'])
# Scan for new files
builder.add_python_task('scan_files', 'scan_input_folder')
# Convert Revit files
builder.add_bash_task(
'convert_rvt',
'for %%f in (/data/input/*.rvt) do RvtExporter.exe "%%f" standard',
upstream=['scan_files']
)
# Convert IFC files
builder.add_bash_task(
'convert_ifc',
'for %%f in (/data/input/*.ifc) do IfcExporter.exe "%%f"',
upstream=['scan_files']
)
# Convert DWG files
builder.add_bash_task(
'convert_dwg',
'for %%f in (/data/input/*.dwg) do DwgExporter.exe "%%f"',
upstream=['scan_files']
)
# Consolidate results
builder.add_python_task(
'consolidate',
'consolidate_conversion_results',
upstream=['convert_rvt', 'convert_ifc', 'convert_dwg']
)
# Archive input files
builder.add_python_task(
'archive',
'archive_processed_files',
upstream=['consolidate']
)
return builder
```
## Quick Start
```python
# Create custom pipeline
builder = ConstructionDAGBuilder('my_pipeline', schedule='@daily')
# Add tasks
builder.add_bash_task('convert', 'RvtExporter.exe model.rvt')
builder.add_python_task('analyze', 'analyze_data', upstream=['convert'])
builder.add_python_task('report', 'create_report', upstream=['analyze'])
# Generate DAG code
code = builder.generate_dag_code()
print(code)
# Save to file
builder.save_dag('/airflow/dags/my_pipeline.py')
```
## Pipeline Templates
### 1. BIM Validation
```python
templates = ConstructionPipelineTemplates()
validation_dag = templates.bim_validation_pipeline()
validation_dag.save_dag('/airflow/dags/bim_validation.py')
```
### 2. Cost Estimation
```python
cost_dag = templates.cost_estimation_pipeline()
cost_dag.save_dag('/airflow/dags/cost_estimation.py')
```
### 3. Batch Conversion
```python
batch_dag = templates.batch_conversion_pipeline()
batch_dag.save_dag('/airflow/dags/batch_convert.py')
```
## Resources
- **DDC Book**: Chapter 4.2 - Apache Airflow Orchestration
- **Airflow Docs**: https://airflow.apache.org/docs/