erp-data-extractor
skillExtract and analyze data from construction ERP systems. Pull project data for analytics, reporting, and integration.
apm::install
apm install @datadrivenconstruction/erp-data-extractorapm::skill.md
---
name: "erp-data-extractor"
description: "Extract and analyze data from construction ERP systems. Pull project data for analytics, reporting, and integration."
homepage: "https://datadrivenconstruction.io"
metadata: {"openclaw": {"emoji": "🔄", "os": ["darwin", "linux", "win32"], "homepage": "https://datadrivenconstruction.io", "requires": {"bins": ["python3"]}}}
---
# ERP Data Extractor
## Business Case
### Problem Statement
ERP data extraction challenges:
- Complex database structures
- Multiple interconnected modules
- Data transformation needs
- Integration with analytics
### Solution
Structured extraction and transformation of construction ERP data for analytics, reporting, and cross-system integration.
## Technical Implementation
```python
import pandas as pd
from typing import Dict, Any, List, Optional
from dataclasses import dataclass, field
from datetime import date, datetime
from enum import Enum
import json
class ERPModule(Enum):
PROJECT = "project"
COST = "cost"
PROCUREMENT = "procurement"
INVENTORY = "inventory"
HR = "hr"
EQUIPMENT = "equipment"
SUBCONTRACT = "subcontract"
BILLING = "billing"
@dataclass
class DataSource:
name: str
module: ERPModule
table_name: str
columns: List[str]
filters: Dict[str, Any] = field(default_factory=dict)
@dataclass
class ExtractedData:
source: str
module: ERPModule
data: pd.DataFrame
extracted_at: datetime
record_count: int
class ERPDataExtractor:
"""Extract and transform data from construction ERP systems."""
def __init__(self, erp_name: str = "Generic"):
self.erp_name = erp_name
self.data_sources: List[DataSource] = []
self.extracted_data: Dict[str, ExtractedData] = {}
self._connection = None
def add_data_source(self, source: DataSource):
"""Add data source for extraction."""
self.data_sources.append(source)
def define_project_extraction(self):
"""Define standard project data extraction."""
self.add_data_source(DataSource(
name="projects",
module=ERPModule.PROJECT,
table_name="projects",
columns=["id", "code", "name", "status", "start_date", "end_date", "budget", "client_id"]
))
self.add_data_source(DataSource(
name="project_phases",
module=ERPModule.PROJECT,
table_name="project_phases",
columns=["id", "project_id", "phase_name", "start_date", "end_date", "status"]
))
def define_cost_extraction(self):
"""Define standard cost data extraction."""
self.add_data_source(DataSource(
name="cost_items",
module=ERPModule.COST,
table_name="cost_items",
columns=["id", "project_id", "wbs_code", "description", "budgeted", "actual", "committed"]
))
self.add_data_source(DataSource(
name="cost_transactions",
module=ERPModule.COST,
table_name="cost_transactions",
columns=["id", "project_id", "cost_item_id", "amount", "transaction_date", "type"]
))
def define_procurement_extraction(self):
"""Define procurement data extraction."""
self.add_data_source(DataSource(
name="purchase_orders",
module=ERPModule.PROCUREMENT,
table_name="purchase_orders",
columns=["id", "project_id", "vendor_id", "amount", "status", "order_date", "delivery_date"]
))
self.add_data_source(DataSource(
name="vendors",
module=ERPModule.PROCUREMENT,
table_name="vendors",
columns=["id", "name", "category", "rating", "status"]
))
def extract_from_dataframe(self, source_name: str, df: pd.DataFrame):
"""Extract data from DataFrame (simulating ERP extraction)."""
source = next((s for s in self.data_sources if s.name == source_name), None)
if not source:
return None
# Apply column selection
available_cols = [c for c in source.columns if c in df.columns]
extracted = df[available_cols].copy()
# Apply filters
for col, value in source.filters.items():
if col in extracted.columns:
extracted = extracted[extracted[col] == value]
self.extracted_data[source_name] = ExtractedData(
source=source_name,
module=source.module,
data=extracted,
extracted_at=datetime.now(),
record_count=len(extracted)
)
return self.extracted_data[source_name]
def transform_data(self, source_name: str,
transformations: List[Dict[str, Any]]) -> pd.DataFrame:
"""Apply transformations to extracted data."""
if source_name not in self.extracted_data:
return pd.DataFrame()
df = self.extracted_data[source_name].data.copy()
for transform in transformations:
action = transform.get('action')
if action == 'rename':
df = df.rename(columns=transform.get('mapping', {}))
elif action == 'filter':
col = transform.get('column')
op = transform.get('operator', '==')
val = transform.get('value')
if op == '==':
df = df[df[col] == val]
elif op == '>':
df = df[df[col] > val]
elif op == '<':
df = df[df[col] < val]
elif action == 'calculate':
new_col = transform.get('new_column')
formula = transform.get('formula')
if formula == 'variance':
df[new_col] = df[transform['col1']] - df[transform['col2']]
elif action == 'date_parse':
col = transform.get('column')
df[col] = pd.to_datetime(df[col])
return df
def join_data(self, left_source: str, right_source: str,
left_key: str, right_key: str,
join_type: str = "left") -> pd.DataFrame:
"""Join two extracted data sources."""
if left_source not in self.extracted_data or right_source not in self.extracted_data:
return pd.DataFrame()
left_df = self.extracted_data[left_source].data
right_df = self.extracted_data[right_source].data
return pd.merge(left_df, right_df, left_on=left_key, right_on=right_key, how=join_type)
def aggregate_data(self, source_name: str,
group_by: List[str],
aggregations: Dict[str, str]) -> pd.DataFrame:
"""Aggregate extracted data."""
if source_name not in self.extracted_data:
return pd.DataFrame()
df = self.extracted_data[source_name].data
return df.groupby(group_by).agg(aggregations).reset_index()
def get_extraction_summary(self) -> Dict[str, Any]:
"""Get summary of all extractions."""
summary = {
'erp_system': self.erp_name,
'sources_defined': len(self.data_sources),
'sources_extracted': len(self.extracted_data),
'total_records': sum(e.record_count for e in self.extracted_data.values()),
'by_module': {}
}
for ext in self.extracted_data.values():
module = ext.module.value
if module not in summary['by_module']:
summary['by_module'][module] = {'sources': 0, 'records': 0}
summary['by_module'][module]['sources'] += 1
summary['by_module'][module]['records'] += ext.record_count
return summary
def export_to_excel(self, output_path: str) -> str:
"""Export all extracted data to Excel."""
with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
# Summary
summary = self.get_extraction_summary()
summary_df = pd.DataFrame([{
'ERP System': summary['erp_system'],
'Sources Defined': summary['sources_defined'],
'Sources Extracted': summary['sources_extracted'],
'Total Records': summary['total_records']
}])
summary_df.to_excel(writer, sheet_name='Summary', index=False)
# Each extracted source
for name, extracted in self.extracted_data.items():
sheet_name = name[:31] # Excel sheet name limit
extracted.data.to_excel(writer, sheet_name=sheet_name, index=False)
return output_path
def export_to_json(self, output_path: str) -> str:
"""Export extracted data to JSON."""
output = {
'summary': self.get_extraction_summary(),
'data': {}
}
for name, extracted in self.extracted_data.items():
output['data'][name] = {
'module': extracted.module.value,
'extracted_at': extracted.extracted_at.isoformat(),
'record_count': extracted.record_count,
'records': extracted.data.to_dict(orient='records')
}
with open(output_path, 'w') as f:
json.dump(output, f, indent=2, default=str)
return output_path
def generate_sql_query(self, source: DataSource) -> str:
"""Generate SQL query for data source."""
columns = ", ".join(source.columns)
query = f"SELECT {columns}\nFROM {source.table_name}"
if source.filters:
conditions = []
for col, value in source.filters.items():
if isinstance(value, str):
conditions.append(f"{col} = '{value}'")
else:
conditions.append(f"{col} = {value}")
query += "\nWHERE " + " AND ".join(conditions)
return query + ";"
```
## Quick Start
```python
# Initialize extractor
extractor = ERPDataExtractor("Procore")
# Define standard extractions
extractor.define_project_extraction()
extractor.define_cost_extraction()
# Simulate extraction from DataFrames
projects_df = pd.DataFrame([
{"id": 1, "code": "PRJ-001", "name": "Office Building", "status": "Active", "budget": 5000000},
{"id": 2, "code": "PRJ-002", "name": "Warehouse", "status": "Planning", "budget": 2000000}
])
extractor.extract_from_dataframe("projects", projects_df)
# Get summary
summary = extractor.get_extraction_summary()
print(f"Total records: {summary['total_records']}")
```
## Common Use Cases
### 1. Transform Data
```python
transformed = extractor.transform_data("cost_items", [
{"action": "rename", "mapping": {"budgeted": "budget", "actual": "spent"}},
{"action": "calculate", "new_column": "variance", "formula": "variance", "col1": "budget", "col2": "spent"}
])
```
### 2. Join Sources
```python
joined = extractor.join_data("cost_items", "projects", "project_id", "id")
```
### 3. Aggregate
```python
by_project = extractor.aggregate_data("cost_items", ["project_id"], {"budgeted": "sum", "actual": "sum"})
```
## Resources
- **DDC Book**: Chapter 3.4 - Construction ERP Systems
- **Website**: https://datadrivenconstruction.io