sql-injection-prevention

SQL Injection Prevention Expert

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "sql-injection-prevention" with this command: npx skills add dengineproblem/agents-monorepo/dengineproblem-agents-monorepo-sql-injection-prevention

SQL Injection Prevention Expert

Эксперт по идентификации, предотвращению и митигации SQL injection уязвимостей во всех языках программирования и СУБД.

Core Principles

Primary Defense Mechanisms

defense_layers:

  • name: "Parameterized Queries" priority: 1 description: "The gold standard for preventing SQL injection"

  • name: "Input Validation" priority: 2 description: "Whitelist validation with strict data type enforcement"

  • name: "Stored Procedures" priority: 3 description: "When implemented correctly with parameterized inputs"

  • name: "Escaping" priority: 4 description: "Last resort, database-specific escaping functions"

  • name: "Least Privilege" priority: 5 description: "Database users with minimal required permissions"

Defense in Depth Strategy

  • Никогда не полагайся на один метод защиты

  • Комбинируй несколько слоёв: input validation, parameterized queries, WAF, мониторинг

  • Внедряй и превентивные, и детектирующие контроли

  • Регулярное тестирование безопасности и code review

Parameterized Queries Implementation

Java (JDBC)

// VULNERABLE - String concatenation String query = "SELECT * FROM users WHERE username='" + username + "' AND password='" + password + "'"; Statement stmt = connection.createStatement(); ResultSet rs = stmt.executeQuery(query);

// SECURE - Prepared statements String query = "SELECT * FROM users WHERE username=? AND password=?"; PreparedStatement pstmt = connection.prepareStatement(query); pstmt.setString(1, username); pstmt.setString(2, password); ResultSet rs = pstmt.executeQuery();

Python (SQLAlchemy)

VULNERABLE - String formatting

query = f"SELECT * FROM users WHERE email = '{email}' AND status = '{status}'" result = db.execute(query)

SECURE - Parameterized query

from sqlalchemy import text query = text("SELECT * FROM users WHERE email = :email AND status = :status") result = db.execute(query, {"email": email, "status": status})

SECURE - ORM approach (preferred)

result = db.session.query(User).filter( User.email == email, User.status == status ).all()

PHP (PDO)

// VULNERABLE - Direct concatenation $query = "SELECT * FROM products WHERE category = '$category' AND price < $maxPrice"; $result = $pdo->query($query);

// SECURE - Prepared statements $query = "SELECT * FROM products WHERE category = :category AND price < :maxPrice"; $stmt = $pdo->prepare($query); $stmt->bindParam(':category', $category, PDO::PARAM_STR); $stmt->bindParam(':maxPrice', $maxPrice, PDO::PARAM_INT); $stmt->execute();

Node.js (MySQL2)

// VULNERABLE - Template literals const query = SELECT * FROM orders WHERE user_id = ${userId} AND status = '${status}'; connection.query(query, (error, results) => { /* ... */ });

// SECURE - Parameterized queries const query = 'SELECT * FROM orders WHERE user_id = ? AND status = ?'; connection.execute(query, [userId, status], (error, results) => { // Handle results });

Go (database/sql)

// VULNERABLE query := fmt.Sprintf("SELECT * FROM users WHERE id = %s", userID) rows, err := db.Query(query)

// SECURE - Parameterized query query := "SELECT * FROM users WHERE id = $1" rows, err := db.Query(query, userID)

Input Validation and Sanitization

Robust Input Validation

import re from typing import Optional, List

def validate_user_input(user_id: str, email: str, role: str) -> dict: """Validate user input with strict rules""" errors: List[str] = []

# Validate user ID (numeric only)
if not user_id.isdigit() or int(user_id) &#x3C;= 0:
    errors.append("Invalid user ID format")

# Validate email format
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, email):
    errors.append("Invalid email format")

# Validate role against whitelist
allowed_roles = ['user', 'admin', 'moderator']
if role not in allowed_roles:
    errors.append("Invalid role specified")

return {'valid': len(errors) == 0, 'errors': errors}

def sanitize_identifier(identifier: str) -> Optional[str]: """Sanitize SQL identifiers (table/column names)""" # Only allow alphanumeric and underscore if re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', identifier): return identifier return None

TypeScript Input Validation

interface ValidationResult { valid: boolean; errors: string[]; sanitized?: Record<string, unknown>; }

function validateSearchParams(params: { query?: string; limit?: number; sortBy?: string; }): ValidationResult { const errors: string[] = []; const sanitized: Record<string, unknown> = {};

// Validate query (alphanumeric, spaces, basic punctuation) if (params.query) { const cleanQuery = params.query.replace(/[^a-zA-Z0-9\s-_.]/g, ''); if (cleanQuery.length > 100) { errors.push('Query too long'); } else { sanitized.query = cleanQuery; } }

// Validate limit (positive integer, max 100) if (params.limit !== undefined) { const limit = Number(params.limit); if (!Number.isInteger(limit) || limit < 1 || limit > 100) { errors.push('Invalid limit'); } else { sanitized.limit = limit; } }

// Validate sortBy against whitelist const allowedSortFields = ['name', 'created_at', 'updated_at', 'price']; if (params.sortBy && !allowedSortFields.includes(params.sortBy)) { errors.push('Invalid sort field'); } else if (params.sortBy) { sanitized.sortBy = params.sortBy; }

return { valid: errors.length === 0, errors, sanitized: errors.length === 0 ? sanitized : undefined }; }

Advanced Prevention Techniques

Stored Procedures with Parameters

-- SQL Server stored procedure CREATE PROCEDURE GetUserOrders @UserID INT, @Status NVARCHAR(20), @StartDate DATE AS BEGIN SET NOCOUNT ON;

SELECT OrderID, OrderDate, TotalAmount
FROM Orders
WHERE UserID = @UserID
    AND Status = @Status
    AND OrderDate >= @StartDate
ORDER BY OrderDate DESC;

END GO

Dynamic Query Building (Secure Approach)

public class SecureQueryBuilder { private static final Set<String> ALLOWED_SORT_COLUMNS = Set.of("name", "email", "created_date", "status");

private static final Set&#x3C;String> ALLOWED_SORT_ORDERS =
    Set.of("ASC", "DESC");

public PreparedStatement buildUserQuery(
        Connection conn,
        String sortColumn,
        String sortOrder,
        String statusFilter) throws SQLException {

    // Validate sort column against whitelist
    if (!ALLOWED_SORT_COLUMNS.contains(sortColumn)) {
        throw new IllegalArgumentException("Invalid sort column: " + sortColumn);
    }

    // Validate sort order
    String normalizedOrder = sortOrder.toUpperCase();
    if (!ALLOWED_SORT_ORDERS.contains(normalizedOrder)) {
        throw new IllegalArgumentException("Invalid sort order: " + sortOrder);
    }

    // Build query with validated column names and parameterized values
    String query = "SELECT user_id, name, email FROM users " +
                  "WHERE status = ? " +
                  "ORDER BY " + sortColumn + " " + normalizedOrder;

    PreparedStatement stmt = conn.prepareStatement(query);
    stmt.setString(1, statusFilter);
    return stmt;
}

}

Query Builder Pattern (Node.js)

import { Pool } from 'pg';

interface QueryOptions { table: string; filters: Record<string, unknown>; sortBy?: string; sortOrder?: 'ASC' | 'DESC'; limit?: number; offset?: number; }

class SecureQueryBuilder { private static readonly ALLOWED_TABLES = new Set([ 'users', 'orders', 'products', 'categories' ]);

private static readonly ALLOWED_COLUMNS: Record<string, Set<string>> = { users: new Set(['id', 'name', 'email', 'status', 'created_at']), orders: new Set(['id', 'user_id', 'total', 'status', 'created_at']), products: new Set(['id', 'name', 'price', 'category_id', 'created_at']), categories: new Set(['id', 'name', 'parent_id']) };

buildSelectQuery(options: QueryOptions): { text: string; values: unknown[] } { // Validate table if (!SecureQueryBuilder.ALLOWED_TABLES.has(options.table)) { throw new Error(Invalid table: ${options.table}); }

const allowedColumns = SecureQueryBuilder.ALLOWED_COLUMNS[options.table];
const values: unknown[] = [];
const conditions: string[] = [];
let paramIndex = 1;

// Build WHERE clause with parameterized values
for (const [column, value] of Object.entries(options.filters)) {
  if (!allowedColumns.has(column)) {
    throw new Error(`Invalid column: ${column}`);
  }
  conditions.push(`${column} = $${paramIndex}`);
  values.push(value);
  paramIndex++;
}

let query = `SELECT * FROM ${options.table}`;

if (conditions.length > 0) {
  query += ` WHERE ${conditions.join(' AND ')}`;
}

// Validate and add ORDER BY
if (options.sortBy) {
  if (!allowedColumns.has(options.sortBy)) {
    throw new Error(`Invalid sort column: ${options.sortBy}`);
  }
  const order = options.sortOrder === 'DESC' ? 'DESC' : 'ASC';
  query += ` ORDER BY ${options.sortBy} ${order}`;
}

// Add LIMIT and OFFSET with parameterized values
if (options.limit !== undefined) {
  query += ` LIMIT $${paramIndex}`;
  values.push(Math.min(Math.max(1, options.limit), 100));
  paramIndex++;
}

if (options.offset !== undefined) {
  query += ` OFFSET $${paramIndex}`;
  values.push(Math.max(0, options.offset));
}

return { text: query, values };

} }

Database Security Configuration

MySQL Security Settings

-- Create limited privilege user for web application CREATE USER 'webapp'@'localhost' IDENTIFIED BY 'strong_random_password_here';

-- Grant minimal required permissions GRANT SELECT, INSERT, UPDATE ON myapp.users TO 'webapp'@'localhost'; GRANT SELECT, INSERT, UPDATE, DELETE ON myapp.orders TO 'webapp'@'localhost'; GRANT SELECT ON myapp.products TO 'webapp'@'localhost';

-- Explicitly deny dangerous permissions -- (These are denied by default, but good to be explicit) REVOKE FILE, PROCESS, SUPER ON . FROM 'webapp'@'localhost';

-- Disable dangerous global settings SET GLOBAL log_bin_trust_function_creators = 0; SET GLOBAL local_infile = 0;

-- Flush privileges FLUSH PRIVILEGES;

PostgreSQL Row Level Security

-- Enable RLS on sensitive table ALTER TABLE user_data ENABLE ROW LEVEL SECURITY;

-- Create policy to restrict access by current user CREATE POLICY user_data_isolation_policy ON user_data FOR ALL TO webapp_user USING (user_id = current_setting('app.current_user_id')::int);

-- Create policy for admin access CREATE POLICY user_data_admin_policy ON user_data FOR ALL TO admin_user USING (true);

-- Force RLS even for table owners ALTER TABLE user_data FORCE ROW LEVEL SECURITY;

Detection and Monitoring

SQL Injection Detection Patterns

import re import logging from typing import List, Tuple from datetime import datetime

class SQLInjectionDetector: """Detect potential SQL injection patterns in user input"""

SUSPICIOUS_PATTERNS: List[Tuple[str, str]] = [
    (r"('|(\-\-)|(;)|(\||\|)|(\*|\*))", "SQL metacharacters"),
    (r"((union\s*(all\s*)?select))", "UNION attack"),
    (r"((select\s+.*\s+from)|(insert\s+into)|(update\s+.*\s+set)|(delete\s+from))", "SQL keywords"),
    (r"(exec(ute)?\s*(sp_|xp_))", "Stored procedure execution"),
    (r"(waitfor\s+delay|benchmark\s*\(|sleep\s*\()", "Time-based attack"),
    (r"(0x[0-9a-fA-F]+)", "Hex encoding"),
    (r"(char\s*\(|concat\s*\(|substr\s*\()", "String functions"),
    (r"(information_schema|sys\.)", "Schema enumeration"),
    (r"(or\s+1\s*=\s*1|and\s+1\s*=\s*1|or\s+'[^']*'\s*=\s*'[^']*')", "Boolean injection"),
]

def __init__(self, logger: logging.Logger = None):
    self.logger = logger or logging.getLogger(__name__)
    self.compiled_patterns = [
        (re.compile(pattern, re.IGNORECASE), name)
        for pattern, name in self.SUSPICIOUS_PATTERNS
    ]

def detect(self, user_input: str, context: dict = None) -> dict:
    """Detect potential SQL injection in user input"""
    detections = []

    for pattern, attack_type in self.compiled_patterns:
        match = pattern.search(user_input)
        if match:
            detections.append({
                'type': attack_type,
                'matched': match.group()[:50],  # Truncate for logging
                'position': match.start()
            })

    if detections:
        self.logger.warning(
            f"Potential SQL injection detected: {detections}",
            extra={
                'input_preview': user_input[:100],
                'context': context,
                'timestamp': datetime.utcnow().isoformat()
            }
        )

    return {
        'is_suspicious': len(detections) > 0,
        'detections': detections,
        'risk_score': min(len(detections) * 25, 100)
    }

Monitoring Query Patterns

-- PostgreSQL: Enable query logging for analysis ALTER SYSTEM SET log_statement = 'all'; ALTER SYSTEM SET log_min_duration_statement = 0;

-- Create table for storing suspicious queries CREATE TABLE security_audit_log ( id SERIAL PRIMARY KEY, timestamp TIMESTAMPTZ DEFAULT NOW(), query_text TEXT, user_name VARCHAR(100), client_ip INET, risk_indicators JSONB, blocked BOOLEAN DEFAULT FALSE );

-- Function to log suspicious queries CREATE OR REPLACE FUNCTION log_suspicious_query( p_query TEXT, p_user VARCHAR, p_ip INET, p_indicators JSONB ) RETURNS VOID AS $$ BEGIN INSERT INTO security_audit_log (query_text, user_name, client_ip, risk_indicators) VALUES (p_query, p_user, p_ip, p_indicators); END; $$ LANGUAGE plpgsql;

Web Application Firewall Rules

ModSecurity Rules for SQL Injection

ModSecurity Core Rule Set - SQL Injection Detection

Detect SQL injection in request parameters

SecRule ARGS "@detectSQLi"
"id:942100,
phase:2,
block,
msg:'SQL Injection Attack Detected',
logdata:'Matched Data: %{MATCHED_VAR} found within %{MATCHED_VAR_NAME}',
t:none,t:urlDecodeUni,t:htmlEntityDecode,
ctl:auditLogParts=+E,
ver:'OWASP_CRS/3.3.0',
severity:'CRITICAL',
setvar:'tx.sql_injection_score=+%{tx.critical_anomaly_score}',
setvar:'tx.anomaly_score_pl1=+%{tx.critical_anomaly_score}'"

Block common SQL injection patterns

SecRule ARGS|ARGS_NAMES|REQUEST_COOKIES|REQUEST_COOKIES_NAMES
"(?i:(\b(select|union|insert|update|delete|drop|alter|create|truncate)\b))"
"id:942110,
phase:2,
block,
msg:'SQL Keyword Detected in User Input',
severity:'WARNING'"

Block SQL comment sequences

SecRule ARGS "--"
"id:942120,
phase:2,
block,
msg:'SQL Comment Sequence Detected',
severity:'WARNING'"

Testing and Validation

Automated Security Testing Script

#!/bin/bash

SQLMap testing script for your own applications (authorized testing only)

TARGET_URL="http://localhost:8080" COOKIE="JSESSIONID=your_session_cookie" OUTPUT_DIR="./sqlmap_results"

mkdir -p "$OUTPUT_DIR"

Test login form

echo "Testing login form..." sqlmap -u "$TARGET_URL/login"
--data="username=admin&password=pass"
--cookie="$COOKIE"
--level=3
--risk=2
--batch
--output-dir="$OUTPUT_DIR/login"
--forms

Test search endpoint

echo "Testing search endpoint..." sqlmap -u "$TARGET_URL/api/search?q=test&category=1"
--cookie="$COOKIE"
--level=3
--risk=2
--batch
--output-dir="$OUTPUT_DIR/search"

Test with different techniques

echo "Testing with all techniques..." sqlmap -u "$TARGET_URL/api/users?id=1"
--cookie="$COOKIE"
--technique=BEUSTQ
--level=5
--risk=3
--batch
--output-dir="$OUTPUT_DIR/users"

echo "Testing complete. Results in $OUTPUT_DIR"

Unit Tests for Input Validation

import pytest from your_app.security import validate_user_input, SQLInjectionDetector

class TestSQLInjectionPrevention:

@pytest.fixture
def detector(self):
    return SQLInjectionDetector()

def test_clean_input_passes(self, detector):
    result = detector.detect("John Doe")
    assert not result['is_suspicious']

def test_union_attack_detected(self, detector):
    result = detector.detect("' UNION SELECT * FROM users--")
    assert result['is_suspicious']
    assert any(d['type'] == 'UNION attack' for d in result['detections'])

def test_comment_attack_detected(self, detector):
    result = detector.detect("admin'--")
    assert result['is_suspicious']

def test_boolean_injection_detected(self, detector):
    result = detector.detect("' OR '1'='1")
    assert result['is_suspicious']

def test_valid_email_passes_validation(self):
    result = validate_user_input("123", "user@example.com", "user")
    assert result['valid']

def test_sql_in_email_fails_validation(self):
    result = validate_user_input("123", "'; DROP TABLE users;--", "user")
    assert not result['valid']

Emergency Response Procedures

Incident Response Checklist

sql_injection_incident_response: immediate_actions: - action: "Block malicious IP addresses" command: "iptables -A INPUT -s <attacker_ip> -j DROP" priority: 1

- action: "Disable affected endpoints"
  description: "Temporarily disable vulnerable API endpoints"
  priority: 2

- action: "Enable enhanced logging"
  description: "Capture all queries for forensic analysis"
  priority: 3

short_term: - action: "Patch vulnerable code" description: "Replace vulnerable queries with parameterized versions"

- action: "Deploy fixes to production"
  description: "Emergency release with security patches"

- action: "Reset compromised credentials"
  description: "Rotate database passwords, API keys"

investigation: - action: "Analyze access logs" description: "Identify attack timeline and scope"

- action: "Check for data exfiltration"
  description: "Review what data was accessed/modified"

- action: "Assess lateral movement"
  description: "Check if attacker accessed other systems"

long_term: - action: "Implement WAF rules" description: "Deploy ModSecurity or cloud WAF"

- action: "Security code review"
  description: "Review all database interactions"

- action: "Penetration testing"
  description: "Hire external security firm"

- action: "Developer training"
  description: "Secure coding practices workshop"

compliance: - action: "Data breach notification" condition: "If PII was exposed" deadline: "72 hours (GDPR)"

- action: "Regulatory reporting"
  condition: "If required by industry regulations"

Лучшие практики

  • Всегда используй parameterized queries — это единственный надёжный способ

  • Валидируй input на стороне сервера — client-side validation недостаточно

  • Применяй принцип минимальных привилегий для database users

  • Используй ORM где возможно — они автоматически параметризуют запросы

  • Регулярно тестируй с SQLMap и другими инструментами (только в dev/staging!)

  • Мониторь логи на подозрительные паттерны

  • Внедри WAF как дополнительный слой защиты

  • Обучай разработчиков secure coding practices

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

social-media-marketing

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

video-marketing

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

frontend-design

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

k6-load-test

No summary provided by upstream source.

Repository SourceNeeds Review