Drone Site Survey Processing
Overview
This skill implements drone data processing for construction site monitoring. Process aerial imagery to generate maps, measure volumes, track progress, and compare with design models.
Capabilities:
-
Orthomosaic generation
-
Digital Elevation Model (DEM) creation
-
Point cloud processing
-
Volume calculations
-
Progress monitoring
-
BIM comparison
-
Stockpile measurement
Quick Start
from dataclasses import dataclass from typing import List, Dict, Tuple, Optional from datetime import datetime import numpy as np
@dataclass class DroneImage: filename: str timestamp: datetime latitude: float longitude: float altitude: float heading: float pitch: float roll: float camera_model: str
@dataclass class PointCloud: points: np.ndarray # Nx3 array colors: Optional[np.ndarray] = None # Nx3 RGB normals: Optional[np.ndarray] = None # Nx3
@dataclass class VolumeResult: volume_m3: float area_m2: float method: str reference_plane: str confidence: float
def calculate_volume_simple(point_cloud: PointCloud, reference_z: float = None) -> VolumeResult: """Simple volume calculation from point cloud""" points = point_cloud.points
if reference_z is None:
reference_z = np.min(points[:, 2])
# Grid-based volume calculation
x_min, x_max = np.min(points[:, 0]), np.max(points[:, 0])
y_min, y_max = np.min(points[:, 1]), np.max(points[:, 1])
grid_size = 0.5 # 50cm grid
x_bins = np.arange(x_min, x_max + grid_size, grid_size)
y_bins = np.arange(y_min, y_max + grid_size, grid_size)
volume = 0
cell_area = grid_size ** 2
for i in range(len(x_bins) - 1):
for j in range(len(y_bins) - 1):
mask = (
(points[:, 0] >= x_bins[i]) & (points[:, 0] < x_bins[i + 1]) &
(points[:, 1] >= y_bins[j]) & (points[:, 1] < y_bins[j + 1])
)
cell_points = points[mask]
if len(cell_points) > 0:
max_z = np.max(cell_points[:, 2])
height = max_z - reference_z
if height > 0:
volume += height * cell_area
area = (x_max - x_min) * (y_max - y_min)
return VolumeResult(
volume_m3=volume,
area_m2=area,
method='grid_based',
reference_plane=f'z={reference_z:.2f}',
confidence=0.9
)
Example usage
sample_points = np.random.rand(10000, 3) * [100, 100, 10] # 100x100m, 10m height point_cloud = PointCloud(points=sample_points) result = calculate_volume_simple(point_cloud) print(f"Volume: {result.volume_m3:.2f} m³, Area: {result.area_m2:.2f} m²")
Comprehensive Drone Survey System
Image Processing Pipeline
from dataclasses import dataclass, field from typing import List, Dict, Tuple, Optional from datetime import datetime import numpy as np from pathlib import Path import json
@dataclass class CameraParameters: focal_length_mm: float sensor_width_mm: float sensor_height_mm: float image_width_px: int image_height_px: int
@dataclass class GeoReference: crs: str # Coordinate Reference System (e.g., "EPSG:4326") origin: Tuple[float, float, float] # lat, lon, alt rotation: Tuple[float, float, float] # heading, pitch, roll
@dataclass class SurveyFlight: flight_id: str date: datetime site_name: str images: List[DroneImage] camera: CameraParameters geo_reference: GeoReference flight_altitude: float overlap_forward: float = 0.8 overlap_side: float = 0.7 gsd: float = 0 # Ground Sample Distance (cm/pixel)
def __post_init__(self):
if self.gsd == 0 and self.camera:
# Calculate GSD
sensor_width = self.camera.sensor_width_mm
focal_length = self.camera.focal_length_mm
image_width = self.camera.image_width_px
altitude = self.flight_altitude
self.gsd = (altitude * sensor_width) / (focal_length * image_width) * 100 # cm
@dataclass class ProcessingResult: orthomosaic_path: Optional[str] = None dem_path: Optional[str] = None dsm_path: Optional[str] = None point_cloud_path: Optional[str] = None report_path: Optional[str] = None statistics: Dict = field(default_factory=dict)
class DroneDataProcessor: """Process drone survey data"""
def __init__(self, output_dir: str):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
def process_survey(self, flight: SurveyFlight,
generate_ortho: bool = True,
generate_dem: bool = True,
generate_pointcloud: bool = True) -> ProcessingResult:
"""Process drone survey data"""
result = ProcessingResult()
result.statistics['flight_id'] = flight.flight_id
result.statistics['image_count'] = len(flight.images)
result.statistics['gsd_cm'] = flight.gsd
result.statistics['flight_date'] = flight.date.isoformat()
# In production, these would call actual photogrammetry libraries
# like OpenDroneMap, Pix4D API, or custom SfM pipeline
if generate_ortho:
result.orthomosaic_path = str(self.output_dir / f"{flight.flight_id}_ortho.tif")
result.statistics['ortho_resolution'] = flight.gsd
if generate_dem:
result.dem_path = str(self.output_dir / f"{flight.flight_id}_dem.tif")
result.dsm_path = str(self.output_dir / f"{flight.flight_id}_dsm.tif")
if generate_pointcloud:
result.point_cloud_path = str(self.output_dir / f"{flight.flight_id}_pointcloud.las")
# Generate report
result.report_path = str(self.output_dir / f"{flight.flight_id}_report.json")
with open(result.report_path, 'w') as f:
json.dump(result.statistics, f, indent=2)
return result
def extract_point_cloud(self, las_path: str) -> PointCloud:
"""Extract point cloud from LAS file"""
# In production, use laspy or similar
# Simulated point cloud for demonstration
n_points = 100000
points = np.random.rand(n_points, 3) * [100, 100, 20]
colors = np.random.randint(0, 255, (n_points, 3), dtype=np.uint8)
return PointCloud(points=points, colors=colors)
def compare_surveys(self, survey1: ProcessingResult,
survey2: ProcessingResult) -> Dict:
"""Compare two surveys for change detection"""
# Load point clouds
pc1 = self.extract_point_cloud(survey1.point_cloud_path)
pc2 = self.extract_point_cloud(survey2.point_cloud_path)
# Calculate elevation differences
# In production, use proper point cloud registration and comparison
comparison = {
'survey1_date': survey1.statistics.get('flight_date'),
'survey2_date': survey2.statistics.get('flight_date'),
'point_count_diff': len(pc2.points) - len(pc1.points),
'changes_detected': []
}
return comparison
Volume Calculation Engine
from scipy.spatial import Delaunay from scipy.interpolate import griddata import numpy as np
class VolumeCalculator: """Advanced volume calculations from drone data"""
def __init__(self, point_cloud: PointCloud):
self.points = point_cloud.points
self.colors = point_cloud.colors
def calculate_cut_fill(self, design_surface: np.ndarray,
grid_size: float = 0.5) -> Dict:
"""Calculate cut and fill volumes compared to design surface"""
# Create grid
x_min, x_max = np.min(self.points[:, 0]), np.max(self.points[:, 0])
y_min, y_max = np.min(self.points[:, 1]), np.max(self.points[:, 1])
x_grid = np.arange(x_min, x_max, grid_size)
y_grid = np.arange(y_min, y_max, grid_size)
xx, yy = np.meshgrid(x_grid, y_grid)
# Interpolate actual surface
actual_z = griddata(
self.points[:, :2],
self.points[:, 2],
(xx, yy),
method='linear'
)
# Interpolate design surface
design_z = griddata(
design_surface[:, :2],
design_surface[:, 2],
(xx, yy),
method='linear'
)
# Calculate differences
diff = actual_z - design_z
cell_area = grid_size ** 2
cut_volume = np.nansum(diff[diff > 0]) * cell_area
fill_volume = np.nansum(np.abs(diff[diff < 0])) * cell_area
net_volume = cut_volume - fill_volume
return {
'cut_volume_m3': float(cut_volume),
'fill_volume_m3': float(fill_volume),
'net_volume_m3': float(net_volume),
'balance': 'cut' if net_volume > 0 else 'fill',
'grid_size_m': grid_size,
'area_m2': float((x_max - x_min) * (y_max - y_min))
}
def calculate_stockpile_volume(self, base_method: str = 'lowest_perimeter') -> VolumeResult:
"""Calculate stockpile volume"""
# Find boundary points
from scipy.spatial import ConvexHull
hull = ConvexHull(self.points[:, :2])
boundary_points = self.points[hull.vertices]
if base_method == 'lowest_perimeter':
reference_z = np.min(boundary_points[:, 2])
elif base_method == 'average_perimeter':
reference_z = np.mean(boundary_points[:, 2])
elif base_method == 'triangulated':
# Create base triangulation from boundary
reference_z = np.min(self.points[:, 2])
else:
reference_z = np.min(self.points[:, 2])
# Calculate volume using triangulated surface
volume = self._triangulated_volume(reference_z)
hull_area = self._calculate_hull_area(boundary_points[:, :2])
return VolumeResult(
volume_m3=volume,
area_m2=hull_area,
method='triangulated_' + base_method,
reference_plane=f'z={reference_z:.2f}',
confidence=0.95
)
def _triangulated_volume(self, reference_z: float) -> float:
"""Calculate volume using Delaunay triangulation"""
# Create 2D triangulation
points_2d = self.points[:, :2]
tri = Delaunay(points_2d)
volume = 0
for simplex in tri.simplices:
# Get triangle vertices
p1 = self.points[simplex[0]]
p2 = self.points[simplex[1]]
p3 = self.points[simplex[2]]
# Calculate prism volume
avg_height = (p1[2] + p2[2] + p3[2]) / 3 - reference_z
if avg_height > 0:
# Triangle area
area = 0.5 * abs(
(p2[0] - p1[0]) * (p3[1] - p1[1]) -
(p3[0] - p1[0]) * (p2[1] - p1[1])
)
volume += area * avg_height
return volume
def _calculate_hull_area(self, points_2d: np.ndarray) -> float:
"""Calculate area of convex hull"""
hull = ConvexHull(points_2d)
return hull.volume # In 2D, volume is actually area
def generate_contours(self, interval: float = 1.0) -> List[Dict]:
"""Generate elevation contours"""
z_min = np.min(self.points[:, 2])
z_max = np.max(self.points[:, 2])
contour_levels = np.arange(
np.floor(z_min / interval) * interval,
np.ceil(z_max / interval) * interval + interval,
interval
)
contours = []
for level in contour_levels:
# In production, use proper contouring algorithm
contours.append({
'elevation': float(level),
'type': 'major' if level % 5 == 0 else 'minor'
})
return contours
Progress Monitoring
from datetime import date from typing import List, Dict
@dataclass class ProgressPoint: date: date point_cloud: PointCloud orthomosaic_path: str annotations: Dict = field(default_factory=dict)
class ConstructionProgressMonitor: """Monitor construction progress from drone surveys"""
def __init__(self, project_name: str):
self.project_name = project_name
self.surveys: List[ProgressPoint] = []
self.volume_calculator = None
def add_survey(self, survey_date: date, point_cloud: PointCloud,
orthomosaic_path: str):
"""Add survey for progress tracking"""
self.surveys.append(ProgressPoint(
date=survey_date,
point_cloud=point_cloud,
orthomosaic_path=orthomosaic_path
))
self.surveys.sort(key=lambda x: x.date)
def calculate_earthwork_progress(self, design_surface: np.ndarray) -> List[Dict]:
"""Calculate earthwork progress over time"""
progress = []
for i, survey in enumerate(self.surveys):
calc = VolumeCalculator(survey.point_cloud)
cut_fill = calc.calculate_cut_fill(design_surface)
progress.append({
'date': survey.date.isoformat(),
'survey_index': i + 1,
'cut_volume_m3': cut_fill['cut_volume_m3'],
'fill_volume_m3': cut_fill['fill_volume_m3'],
'net_volume_m3': cut_fill['net_volume_m3']
})
# Calculate progress percentages
if len(progress) > 1:
initial = progress[0]
for p in progress[1:]:
if initial['net_volume_m3'] != 0:
p['progress_pct'] = (
(initial['net_volume_m3'] - p['net_volume_m3']) /
abs(initial['net_volume_m3']) * 100
)
else:
p['progress_pct'] = 0
return progress
def detect_changes(self, survey1_idx: int, survey2_idx: int,
threshold_m: float = 0.5) -> Dict:
"""Detect changes between two surveys"""
pc1 = self.surveys[survey1_idx].point_cloud
pc2 = self.surveys[survey2_idx].point_cloud
# Create grids for comparison
grid_size = 1.0 # 1m grid
x_min = min(np.min(pc1.points[:, 0]), np.min(pc2.points[:, 0]))
x_max = max(np.max(pc1.points[:, 0]), np.max(pc2.points[:, 0]))
y_min = min(np.min(pc1.points[:, 1]), np.min(pc2.points[:, 1]))
y_max = max(np.max(pc1.points[:, 1]), np.max(pc2.points[:, 1]))
x_bins = np.arange(x_min, x_max + grid_size, grid_size)
y_bins = np.arange(y_min, y_max + grid_size, grid_size)
changes = {
'increased': [], # Areas where elevation increased
'decreased': [], # Areas where elevation decreased
'unchanged': 0
}
for i in range(len(x_bins) - 1):
for j in range(len(y_bins) - 1):
# Get points in cell for both surveys
cell_x = (x_bins[i] + x_bins[i + 1]) / 2
cell_y = (y_bins[j] + y_bins[j + 1]) / 2
mask1 = (
(pc1.points[:, 0] >= x_bins[i]) & (pc1.points[:, 0] < x_bins[i + 1]) &
(pc1.points[:, 1] >= y_bins[j]) & (pc1.points[:, 1] < y_bins[j + 1])
)
mask2 = (
(pc2.points[:, 0] >= x_bins[i]) & (pc2.points[:, 0] < x_bins[i + 1]) &
(pc2.points[:, 1] >= y_bins[j]) & (pc2.points[:, 1] < y_bins[j + 1])
)
if np.sum(mask1) > 0 and np.sum(mask2) > 0:
z1 = np.mean(pc1.points[mask1, 2])
z2 = np.mean(pc2.points[mask2, 2])
diff = z2 - z1
if diff > threshold_m:
changes['increased'].append({
'x': cell_x, 'y': cell_y, 'change_m': diff
})
elif diff < -threshold_m:
changes['decreased'].append({
'x': cell_x, 'y': cell_y, 'change_m': diff
})
else:
changes['unchanged'] += 1
changes['summary'] = {
'increased_cells': len(changes['increased']),
'decreased_cells': len(changes['decreased']),
'unchanged_cells': changes['unchanged'],
'date_from': self.surveys[survey1_idx].date.isoformat(),
'date_to': self.surveys[survey2_idx].date.isoformat()
}
return changes
def generate_progress_report(self, output_path: str,
design_surface: np.ndarray = None) -> str:
"""Generate progress report"""
import pandas as pd
report_data = {
'project': self.project_name,
'surveys': len(self.surveys),
'date_range': {
'start': self.surveys[0].date.isoformat() if self.surveys else None,
'end': self.surveys[-1].date.isoformat() if self.surveys else None
}
}
if design_surface is not None:
report_data['earthwork_progress'] = self.calculate_earthwork_progress(design_surface)
with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
# Summary
pd.DataFrame([report_data]).to_excel(writer, sheet_name='Summary', index=False)
# Survey list
survey_data = [{
'Date': s.date,
'Orthomosaic': s.orthomosaic_path
} for s in self.surveys]
pd.DataFrame(survey_data).to_excel(writer, sheet_name='Surveys', index=False)
# Progress (if available)
if 'earthwork_progress' in report_data:
pd.DataFrame(report_data['earthwork_progress']).to_excel(
writer, sheet_name='Progress', index=False
)
return output_path
BIM Comparison
class BIMDroneComparator: """Compare drone survey with BIM model"""
def __init__(self, bim_surface: np.ndarray, drone_pointcloud: PointCloud):
self.bim = bim_surface
self.drone = drone_pointcloud
def compare_elevations(self, grid_size: float = 1.0) -> Dict:
"""Compare drone elevations with BIM design"""
# Create comparison grid
x_min = max(np.min(self.bim[:, 0]), np.min(self.drone.points[:, 0]))
x_max = min(np.max(self.bim[:, 0]), np.max(self.drone.points[:, 0]))
y_min = max(np.min(self.bim[:, 1]), np.min(self.drone.points[:, 1]))
y_max = min(np.max(self.bim[:, 1]), np.max(self.drone.points[:, 1]))
x_grid = np.arange(x_min, x_max, grid_size)
y_grid = np.arange(y_min, y_max, grid_size)
xx, yy = np.meshgrid(x_grid, y_grid)
# Interpolate both surfaces
bim_z = griddata(self.bim[:, :2], self.bim[:, 2], (xx, yy), method='linear')
drone_z = griddata(
self.drone.points[:, :2],
self.drone.points[:, 2],
(xx, yy),
method='linear'
)
# Calculate differences
diff = drone_z - bim_z
valid_mask = ~np.isnan(diff)
valid_diff = diff[valid_mask]
return {
'mean_diff_m': float(np.mean(valid_diff)),
'std_diff_m': float(np.std(valid_diff)),
'max_diff_m': float(np.max(valid_diff)),
'min_diff_m': float(np.min(valid_diff)),
'rmse_m': float(np.sqrt(np.mean(valid_diff ** 2))),
'within_5cm': float(np.sum(np.abs(valid_diff) < 0.05) / len(valid_diff) * 100),
'within_10cm': float(np.sum(np.abs(valid_diff) < 0.10) / len(valid_diff) * 100),
'comparison_points': int(len(valid_diff))
}
def find_deviations(self, tolerance_m: float = 0.1) -> List[Dict]:
"""Find areas with significant deviations"""
deviations = []
grid_size = 1.0
# Grid comparison
x_min = max(np.min(self.bim[:, 0]), np.min(self.drone.points[:, 0]))
x_max = min(np.max(self.bim[:, 0]), np.max(self.drone.points[:, 0]))
y_min = max(np.min(self.bim[:, 1]), np.min(self.drone.points[:, 1]))
y_max = min(np.max(self.bim[:, 1]), np.max(self.drone.points[:, 1]))
for x in np.arange(x_min, x_max, grid_size):
for y in np.arange(y_min, y_max, grid_size):
# Get average elevations
bim_mask = (
(self.bim[:, 0] >= x) & (self.bim[:, 0] < x + grid_size) &
(self.bim[:, 1] >= y) & (self.bim[:, 1] < y + grid_size)
)
drone_mask = (
(self.drone.points[:, 0] >= x) & (self.drone.points[:, 0] < x + grid_size) &
(self.drone.points[:, 1] >= y) & (self.drone.points[:, 1] < y + grid_size)
)
if np.sum(bim_mask) > 0 and np.sum(drone_mask) > 0:
bim_z = np.mean(self.bim[bim_mask, 2])
drone_z = np.mean(self.drone.points[drone_mask, 2])
diff = drone_z - bim_z
if abs(diff) > tolerance_m:
deviations.append({
'x': x + grid_size / 2,
'y': y + grid_size / 2,
'bim_elevation': bim_z,
'actual_elevation': drone_z,
'deviation_m': diff,
'type': 'high' if diff > 0 else 'low'
})
return deviations
Quick Reference
Measurement Method Accuracy
Stockpile Volume Triangulated ±2-5%
Cut/Fill Volume Grid comparison ±5%
Area Measurement Orthomosaic <1cm GSD
Elevation (DEM) Photogrammetry ±2-5cm
Progress Tracking Multi-temporal Relative
Resources
-
OpenDroneMap: https://www.opendronemap.org
-
Pix4D: https://www.pix4d.com
-
LAStools: https://rapidlasso.com/lastools/
-
DDC Website: https://datadrivenconstruction.io
Next Steps
-
See progress-monitoring-cv for image-based progress
-
See bim-validation-pipeline for model comparison
-
See data-visualization for 3D visualization