incident-reporting

Incident Reporting System

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "incident-reporting" with this command: npx skills add datadrivenconstruction/ddc_skills_for_ai_agents_in_construction/datadrivenconstruction-ddc-skills-for-ai-agents-in-construction-incident-reporting

Incident Reporting System

Overview

Comprehensive incident reporting system for construction safety. Capture near-misses, injuries, and property damage. Conduct root cause analysis and track corrective actions to prevent recurrence.

"Near-miss reporting prevents 90% of future serious incidents" — DDC Community

Incident Pyramid

                △
               /│\        Fatality (1)
              / │ \
             /  │  \      Serious Injury (10)
            /   │   \
           /    │    \    Minor Injury (30)
          /     │     \
         /      │      \  Near Miss (300)
        /       │       \
       /        │        \ Unsafe Acts (3000)
      ──────────┴──────────

Technical Implementation

from dataclasses import dataclass, field from typing import List, Dict, Optional from enum import Enum from datetime import datetime, timedelta import json

class IncidentType(Enum): NEAR_MISS = "near_miss" FIRST_AID = "first_aid" MEDICAL_TREATMENT = "medical_treatment" LOST_TIME = "lost_time" FATALITY = "fatality" PROPERTY_DAMAGE = "property_damage" ENVIRONMENTAL = "environmental"

class IncidentCategory(Enum): FALL = "fall" STRUCK_BY = "struck_by" CAUGHT_IN = "caught_in" ELECTROCUTION = "electrocution" VEHICLE = "vehicle" MATERIAL_HANDLING = "material_handling" TOOL_EQUIPMENT = "tool_equipment" SLIP_TRIP = "slip_trip" FIRE = "fire" CHEMICAL = "chemical" OTHER = "other"

class InvestigationStatus(Enum): REPORTED = "reported" UNDER_INVESTIGATION = "under_investigation" ROOT_CAUSE_IDENTIFIED = "root_cause_identified" CORRECTIVE_ACTIONS_ASSIGNED = "corrective_actions_assigned" IN_REMEDIATION = "in_remediation" CLOSED = "closed"

@dataclass class Person: name: str company: str role: str contact: str years_experience: int = 0

@dataclass class CorrectiveAction: id: str description: str assigned_to: str due_date: datetime status: str = "open" completed_date: Optional[datetime] = None verification_notes: str = ""

@dataclass class Incident: id: str incident_type: IncidentType category: IncidentCategory date_time: datetime location: str project_id: str project_name: str

# Description
description: str
immediate_actions: str

# People involved
injured_person: Optional[Person] = None
witnesses: List[Person] = field(default_factory=list)
reported_by: str = ""

# Investigation
status: InvestigationStatus = InvestigationStatus.REPORTED
root_causes: List[str] = field(default_factory=list)
contributing_factors: List[str] = field(default_factory=list)
corrective_actions: List[CorrectiveAction] = field(default_factory=list)

# Documentation
photos: List[str] = field(default_factory=list)
weather_conditions: str = ""
equipment_involved: List[str] = field(default_factory=list)

# Metrics
days_lost: int = 0
property_damage_cost: float = 0.0
osha_recordable: bool = False

class IncidentManager: """Manage construction incident reporting and investigation."""

# 5 Whys root cause categories
ROOT_CAUSE_CATEGORIES = [
    "Training/Competency",
    "Procedures/Work Instructions",
    "Equipment/Tools",
    "Supervision",
    "Communication",
    "Housekeeping",
    "PPE",
    "Work Environment",
    "Physical/Mental State",
    "Management System"
]

def __init__(self):
    self.incidents: Dict[str, Incident] = {}
    self.corrective_actions: Dict[str, CorrectiveAction] = {}

def report_incident(self, incident_type: IncidentType,
                   category: IncidentCategory,
                   date_time: datetime,
                   location: str,
                   project_id: str,
                   project_name: str,
                   description: str,
                   immediate_actions: str,
                   reported_by: str,
                   injured_person: Dict = None) -> Incident:
    """Report new incident."""
    incident_id = f"INC-{datetime.now().strftime('%Y%m%d%H%M%S')}"

    injured = None
    if injured_person:
        injured = Person(**injured_person)

    incident = Incident(
        id=incident_id,
        incident_type=incident_type,
        category=category,
        date_time=date_time,
        location=location,
        project_id=project_id,
        project_name=project_name,
        description=description,
        immediate_actions=immediate_actions,
        reported_by=reported_by,
        injured_person=injured
    )

    # Auto-flag OSHA recordable
    if incident_type in [IncidentType.MEDICAL_TREATMENT,
                        IncidentType.LOST_TIME,
                        IncidentType.FATALITY]:
        incident.osha_recordable = True

    self.incidents[incident_id] = incident
    return incident

def add_witness(self, incident_id: str, witness: Dict) -> Incident:
    """Add witness to incident."""
    if incident_id not in self.incidents:
        raise ValueError(f"Incident {incident_id} not found")

    self.incidents[incident_id].witnesses.append(Person(**witness))
    return self.incidents[incident_id]

def conduct_investigation(self, incident_id: str,
                         root_causes: List[str],
                         contributing_factors: List[str]) -> Incident:
    """Record investigation findings."""
    if incident_id not in self.incidents:
        raise ValueError(f"Incident {incident_id} not found")

    incident = self.incidents[incident_id]
    incident.root_causes = root_causes
    incident.contributing_factors = contributing_factors
    incident.status = InvestigationStatus.ROOT_CAUSE_IDENTIFIED
    return incident

def five_whys_analysis(self, incident_id: str, whys: List[str]) -> Dict:
    """Conduct 5 Whys analysis."""
    if incident_id not in self.incidents:
        raise ValueError(f"Incident {incident_id} not found")

    analysis = {
        "incident_id": incident_id,
        "analysis_date": datetime.now().isoformat(),
        "whys": []
    }

    for i, why in enumerate(whys):
        analysis["whys"].append({
            "level": i + 1,
            "question": f"Why #{i+1}?",
            "answer": why
        })

    # The last "why" is typically the root cause
    if whys:
        self.incidents[incident_id].root_causes.append(whys[-1])

    return analysis

def assign_corrective_action(self, incident_id: str,
                            description: str,
                            assigned_to: str,
                            due_days: int = 7) -> CorrectiveAction:
    """Assign corrective action."""
    if incident_id not in self.incidents:
        raise ValueError(f"Incident {incident_id} not found")

    action_id = f"CA-{datetime.now().strftime('%Y%m%d%H%M%S')}"
    action = CorrectiveAction(
        id=action_id,
        description=description,
        assigned_to=assigned_to,
        due_date=datetime.now() + timedelta(days=due_days)
    )

    self.incidents[incident_id].corrective_actions.append(action)
    self.incidents[incident_id].status = InvestigationStatus.CORRECTIVE_ACTIONS_ASSIGNED
    self.corrective_actions[action_id] = action
    return action

def complete_corrective_action(self, action_id: str,
                               verification_notes: str) -> CorrectiveAction:
    """Mark corrective action complete."""
    if action_id not in self.corrective_actions:
        raise ValueError(f"Corrective action {action_id} not found")

    action = self.corrective_actions[action_id]
    action.status = "completed"
    action.completed_date = datetime.now()
    action.verification_notes = verification_notes
    return action

def get_incident_metrics(self, project_id: str = None,
                        start_date: datetime = None,
                        end_date: datetime = None) -> Dict:
    """Calculate incident metrics."""
    incidents = list(self.incidents.values())

    if project_id:
        incidents = [i for i in incidents if i.project_id == project_id]
    if start_date:
        incidents = [i for i in incidents if i.date_time >= start_date]
    if end_date:
        incidents = [i for i in incidents if i.date_time <= end_date]

    # Calculate metrics
    total = len(incidents)
    near_misses = len([i for i in incidents if i.incident_type == IncidentType.NEAR_MISS])
    first_aid = len([i for i in incidents if i.incident_type == IncidentType.FIRST_AID])
    recordables = len([i for i in incidents if i.osha_recordable])
    lost_time = len([i for i in incidents if i.incident_type == IncidentType.LOST_TIME])
    total_days_lost = sum(i.days_lost for i in incidents)

    # Category breakdown
    by_category = {}
    for cat in IncidentCategory:
        count = len([i for i in incidents if i.category == cat])
        if count > 0:
            by_category[cat.value] = count

    return {
        "total_incidents": total,
        "near_misses": near_misses,
        "first_aid_cases": first_aid,
        "osha_recordables": recordables,
        "lost_time_incidents": lost_time,
        "total_days_lost": total_days_lost,
        "by_category": by_category,
        "near_miss_ratio": near_misses / recordables if recordables else 0
    }

def calculate_trir(self, hours_worked: int, project_id: str = None) -> float:
    """Calculate Total Recordable Incident Rate."""
    incidents = list(self.incidents.values())
    if project_id:
        incidents = [i for i in incidents if i.project_id == project_id]

    recordables = len([i for i in incidents if i.osha_recordable])

    if hours_worked == 0:
        return 0

    # TRIR = (Recordables × 200,000) / Hours Worked
    return (recordables * 200000) / hours_worked

def calculate_dart(self, hours_worked: int, project_id: str = None) -> float:
    """Calculate Days Away, Restricted, or Transferred rate."""
    incidents = list(self.incidents.values())
    if project_id:
        incidents = [i for i in incidents if i.project_id == project_id]

    dart_cases = len([i for i in incidents
                     if i.incident_type in [IncidentType.LOST_TIME]])

    if hours_worked == 0:
        return 0

    return (dart_cases * 200000) / hours_worked

def get_trend_analysis(self, months: int = 6) -> List[Dict]:
    """Analyze incident trends over time."""
    trends = []
    now = datetime.now()

    for i in range(months):
        month_start = datetime(now.year, now.month - i, 1) if now.month > i else datetime(now.year - 1, 12 - (i - now.month), 1)
        month_end = month_start.replace(day=28) + timedelta(days=4)
        month_end = month_end - timedelta(days=month_end.day)

        month_incidents = [inc for inc in self.incidents.values()
                         if month_start <= inc.date_time <= month_end]

        trends.append({
            "month": month_start.strftime("%Y-%m"),
            "total": len(month_incidents),
            "near_misses": len([i for i in month_incidents if i.incident_type == IncidentType.NEAR_MISS]),
            "recordables": len([i for i in month_incidents if i.osha_recordable])
        })

    return list(reversed(trends))

def generate_incident_report(self, incident_id: str) -> str:
    """Generate detailed incident report."""
    if incident_id not in self.incidents:
        return "Incident not found"

    inc = self.incidents[incident_id]

    lines = [
        f"# Incident Report",
        f"",
        f"**Incident ID:** {inc.id}",
        f"**Type:** {inc.incident_type.value}",
        f"**Category:** {inc.category.value}",
        f"**Date/Time:** {inc.date_time.strftime('%Y-%m-%d %H:%M')}",
        f"**Location:** {inc.location}",
        f"**Project:** {inc.project_name}",
        f"**Status:** {inc.status.value}",
        f"**OSHA Recordable:** {'Yes' if inc.osha_recordable else 'No'}",
        f"",
        f"## Description",
        f"{inc.description}",
        f"",
        f"## Immediate Actions Taken",
        f"{inc.immediate_actions}",
        f"",
    ]

    if inc.injured_person:
        lines.extend([
            f"## Injured Person",
            f"- Name: {inc.injured_person.name}",
            f"- Company: {inc.injured_person.company}",
            f"- Role: {inc.injured_person.role}",
            f"- Experience: {inc.injured_person.years_experience} years",
            f""
        ])

    if inc.root_causes:
        lines.extend([
            f"## Root Causes",
            *[f"- {rc}" for rc in inc.root_causes],
            f""
        ])

    if inc.corrective_actions:
        lines.extend([
            f"## Corrective Actions",
            f"| Action | Assigned To | Due | Status |",
            f"|--------|-------------|-----|--------|"
        ])
        for ca in inc.corrective_actions:
            lines.append(f"| {ca.description} | {ca.assigned_to} | {ca.due_date.strftime('%Y-%m-%d')} | {ca.status} |")

    return "\n".join(lines)

Quick Start

Initialize manager

manager = IncidentManager()

Report near-miss

incident = manager.report_incident( incident_type=IncidentType.NEAR_MISS, category=IncidentCategory.STRUCK_BY, date_time=datetime.now(), location="Level 5, Grid C-4", project_id="PRJ-001", project_name="Office Tower", description="Unsecured tool fell from scaffold, landed 2 feet from worker", immediate_actions="Area cordoned off, toolbox talk conducted", reported_by="Site Foreman" )

5 Whys analysis

analysis = manager.five_whys_analysis(incident.id, [ "Tool fell from scaffold", "Tool was not secured", "No tool lanyard was used", "Worker not trained on tool tethering", "Tool tethering training not included in orientation" ])

Assign corrective actions

manager.assign_corrective_action( incident.id, "Update orientation to include tool tethering training", "Safety Manager", due_days=14 )

manager.assign_corrective_action( incident.id, "Provide tool lanyards to all workers at height", "Procurement", due_days=7 )

Get metrics

metrics = manager.get_incident_metrics() print(f"Total Incidents: {metrics['total_incidents']}") print(f"Near Miss Ratio: {metrics['near_miss_ratio']:.1f}")

Generate report

print(manager.generate_incident_report(incident.id))

Requirements

pip install (no external dependencies)

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

cad-to-data

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

drawing-analyzer

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

cost-estimation-resource

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

daily-progress-report

No summary provided by upstream source.

Repository SourceNeeds Review