我用MCP开发了一个AI目录扫描分析工具

本工具结合了当下的MCP协议中sse方案,给传统的目录扫描工具dirsearch对接上了AI大模型,并对扫描结果进行数据筛选及深度分析并输出漏洞报告。

一、工具介绍

本工具结合了当下的MCP协议中的sse方案(为何不用stdio方案下文有讲),给传统的目录扫描工具dirsearch对接上了AI大模型,并对扫描结果进行下一步分析,以及输出漏洞报告,未来还会进行拓展以及微调,实现漏洞扫描流程自动化,目前效果如下

图片.png
自动分析非200页面网页内容,可用于发现服务器版本泄露,框架类型版本泄露,网站绝对路径泄露等漏洞,并在分析完所有扫描得到的路径之后,会产出md格式漏洞报告(依赖于Cline)

为节省LLM的api tokens,对扫描结果进行了数据筛选工作,以 状态码+返回包大小 作为判断标识,同样的组合仅保留分析一组

图片.png
项目地址:

ai_dirscan

二、MCP服务端编写

2.1 环境初始化

为了使我们的mcp服务不和主环境冲突,这里先设置下uv虚拟环境

  1. uv init ai_dirscan
  2. cd ai_dirscan
  3. uv venv

图片.png
然后我们将 dirsearch 工具的目录复制到当前工作目录中,dirsearch源码推荐GitHub下载

dirsearch下载链接

此时我们的项目目录结构应该如下:

图片.png
然后安装MCP和dirsearch所需依赖,命令如下:

  1. source .venv/bin/activate #win环境下为 .venv\Scripts\activate.bat
  2. uv add "mcp[cli]"
  3. uv add requests
  4. deactivate
  5. cd dirsearch
  6. pip install -r requirements.txt
  7. pip install setuptools

我们测试下dirsearch在当前环境中是否可用,命令如下

  1. python3 dirsearch.py -u https://src.sjtu.edu.cn/

出现如图所示界面及代表虚拟环境依赖配置成功

图片.png

2.2 模块导入及服务注册

我们用vs code打开刚才创建的虚拟环境目录ai_dirscan

可以看到有默认生成的main.py

图片.png
首先导入一些必要的库,其中FastMCP模块即提供本地的mcp服务,代码如下:

  1. import json
  2. import subprocess
  3. from datetime import datetime
  4. from pathlib import Path
  5. from mcp.server.fastmcp import FastMCP
  6. # 初始化 MCP 服务
  7. mcp = FastMCP("ai_dirscan",port=8000) //注册mcp服务名为ai_dirscan,sse服务端口设置为 默认8000

2.3 扫描流程处理

2.3.1 总体交互

然后当前工具与ai的数据交互方式如下,以下均为作者本人的想法,如果有更好的解决方案,欢迎提出

数据交互(file):

由于大模型有上下文token字数限制,且过多无用的扫描数据也存在浪费api的token生成过慢的危害,所以我先将原始dirsearch扫描结果中输出到文件,然后经过筛选去重返回给LLM处理

为什么要输出到文件中,再中转给LLM呢?

因为LLM常常是不可信的,我们可以通过准确的结果数据与LLM输出的结果进行对比,从而进行微调训练,逐渐矫准

数据交互部分对应代码如下:

  1. # 配置存储路径
  2. SCAN_RESULT_DIR = Path("scan_results")
  3. SCAN_RESULT_DIR.mkdir(exist_ok=True)
  4. ……
  5. # 生成结果文件路径
  6. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  7. output_file = SCAN_RESULT_DIR / f"scan_{timestamp}.json"
  8. ……
  9. # 加载原始扫描结果
  10. with open(output_file, "r", encoding="utf-8") as f:
  11. scan_data = json.load(f)
  12. #……处理逻辑

传输交互(sse):

因为我们知道常规目录扫描作业很多都是超过30s的,经过作者查阅资料发现MCP的stdio协议中,工具超时调用时间是硬编码在协议中的,默认为30s且无法更改,而由于sse协议涉及到与服务器的http网络交互,其中的超时时间是可以更改的,但目前支持更改该时间的客户端只有新版的Cline插件,除此之外唯有手搓一个客户端才能解决

传输交互部分对应代码如下:

  1. if __name__ == "__main__":
  2. mcp.run(transport='sse')

2.3.2 工具注解词

MCP客户端在调用tool时,会查阅函数下以“”” “””注解的语句,从而进行判断调用,该语句尽量写的简单明确且格式化,注明返回格式

我们注册一个@mcp.tool(),以将函数暴露ai识别调用,代码如下:

  1. @mcp.tool()
  2. def scan_dir(url: str) -> str:
  3. """
  4. 执行网站目录扫描,返回结构化扫描结果
  5. Args:
  6. url (str): 目标网站URL,需包含协议头(如http/https)
  7. Returns:
  8. str: JSON格式响应,包含:
  9. - status_200: 200状态的有效路径列表
  10. - non_200_results: 非200状态的有效结果列表(包含状态码)
  11. - report_path: 结果文件路径
  12. - stats: 各类状态码统计
  13. """

2.3.3 工具调用及数据筛选

我们通过subprocess子进程模块对项目中的 dirsearch工具 进行调用,由于dirsearch工具本身就具有定向输出文件(-o),所以我们可以直接在dirsearch中实现结果输出

对于不带此类命令的工具,我们可能需要麻烦一点,例如获取subprocess的执行结果,然后针对此工具的结果返回格式,编写出对应的函数解析过滤然后写出文件

该部分功能代码如下:

  1. # 构建扫描命令
  2. base_cmd = [
  3. "python3", "./dirsearch/dirsearch.py",
  4. "-u", url,
  5. "-o", str(output_file),
  6. "--format=json",
  7. "-q",
  8. "--no-color",
  9. ]
  10. try:
  11. # 执行扫描命令[8](@ref)
  12. subprocess.run(
  13. base_cmd,
  14. check=True,
  15. capture_output=True,
  16. timeout=300,
  17. text=True
  18. )

为节省tokens,我们对结果目录进行了数据筛选工作,以 状态码+返回包大小 作为判断标识,同样的组合仅保留分析一组,代码如下

  1. # 结果分类处理
  2. for entry in scan_data.get("results", []):
  3. status = entry["status"]
  4. url_path = entry["url"]
  5. content_length = entry["content-length"]
  6. # 状态码统计
  7. status_counter[status] += 1
  8. # 生成唯一标识符防止重复
  9. entry_key = f"{status}|{content_length}"
  10. if entry_key in unique_tracker:
  11. continue
  12. unique_tracker.add(entry_key)
  13. # 分类存储结果
  14. if status == 200:
  15. status_200.append(url_path)
  16. elif content_length != 0:
  17. non_200_results.append({
  18. "url": url_path,
  19. "status": status,
  20. "content_length": content_length
  21. })

2.3.4 结果返回及报错处理

接下来便是结果返回,因为LLM更善于解析格式明确的语句,这里我们以固定的json格式返回

并且为了方便LLM进行下一步处理,我们将扫描得到的目录,分为了 200响应码非200响应码,为了节省mcp调用时间,以及节约tokens,目前工具只会对非200响应码进行深度分析,得到由网站报错导致的服务器版本泄露绝对路径泄露等漏洞。

该工具将会返回网站目录扫描结果保存目录统计信息

代码如下:

  1. # 生成最终响应
  2. response = {
  3. "status": 200,
  4. "data": {
  5. "status_200": status_200,
  6. "non_200_results": non_200_results,
  7. "report_path": str(output_file),
  8. "stats": stats
  9. },
  10. "message": "扫描完成,结果已分类"
  11. }

为了更好的获取数据,调试程序和LLM,我加入了报错处理功能,可以让ai返回具体的报错信息,以供我们进行修复,代码如下:

  1. except json.JSONDecodeError as e:
  2. response = {"status": 500, "message": f"结果解析失败: {str(e)}"}
  3. except subprocess.TimeoutExpired:
  4. response = {"status": 408, "message": "扫描超时"}
  5. except Exception as e:
  6. response = {"status": 500, "message": f"扫描失败: {str(e)}"}

最后便是运行mcp服务的代码,目前大部分mcp客户端支持的对接方式有两种:stdio(本地)sse(服务器),因为我们项目是在本地开发的并没有上传到服务器,故选择 stdio,代码如下:

  1. if __name__ == "__main__":
  2. mcp.run(transport='stdio')

2.4 深度分析处理

为了对非200响应码进行深度分析,得到由网站报错导致的服务器版本泄露绝对路径泄露等漏洞,我们需编写MCP工具get_content,利用requests模块获取网页内容,从而进行分析,代码如下:

  1. @mcp.tool()
  2. def get_content(url: str) -> str:
  3. """
  4. 获取非200响应界面的网页内容
  5. :param url: 需要检测的网页地址
  6. :return: 返回页面的完整内容(若目标页面返回非200状态码)
  7. """
  8. try:
  9. response = requests.get(
  10. url,
  11. headers={"User-Agent": "Mozilla/5.0"},
  12. timeout=5
  13. )
  14. if response.status_code != 200:
  15. return response.text
  16. return f"200响应页面,无需进行深度分析"
  17. except requests.exceptions.RequestException as e:
  18. return f"请求异常:{str(e)}"

2.5 完整代码

服务端完整代码如下

  1. import json
  2. import subprocess
  3. from collections import defaultdict
  4. import requests
  5. from datetime import datetime
  6. from pathlib import Path
  7. from mcp.server.fastmcp import FastMCP
  8. # 初始化 MCP 服务
  9. mcp = FastMCP("ai_dirscan",port=8000)
  10. # 配置存储路径
  11. SCAN_RESULT_DIR = Path("scan_results")
  12. SCAN_RESULT_DIR.mkdir(exist_ok=True)
  13. @mcp.tool()
  14. def scan_dir(url: str) -> str:
  15. """
  16. 执行网站目录扫描,返回结构化扫描结果
  17. Args:
  18. url (str): 目标网站URL,需包含协议头(如http/https)
  19. Returns:
  20. str: JSON格式响应,包含:
  21. - status_200: 200状态的有效路径列表
  22. - non_200_results: 非200状态的有效结果列表(包含状态码)
  23. - report_path: 结果文件路径
  24. - stats: 各类状态码统计
  25. """
  26. # 生成结果文件路径
  27. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  28. output_file = SCAN_RESULT_DIR / f"scan_{timestamp}.json"
  29. # 构建扫描命令
  30. base_cmd = [
  31. "python3", "./dirsearch/dirsearch.py",
  32. "-u", url,
  33. "-o", str(output_file),
  34. "--format=json",
  35. "-q",
  36. "--no-color",
  37. ]
  38. response = {"status": 500, "message": "初始化失败"}
  39. try:
  40. # 执行目录扫描
  41. subprocess.run(
  42. base_cmd,
  43. check=True,
  44. capture_output=True,
  45. timeout=300,
  46. text=True
  47. )
  48. # 加载原始扫描结果
  49. with open(output_file, "r", encoding="utf-8") as f:
  50. scan_data = json.load(f)
  51. # 初始化数据结构
  52. status_200 = []
  53. non_200_results = []
  54. status_counter = defaultdict(int)
  55. unique_tracker = set()
  56. # 结果分类处理
  57. for entry in scan_data.get("results", []):
  58. status = entry["status"]
  59. url_path = entry["url"]
  60. content_length = entry["content-length"]
  61. # 状态码统计
  62. status_counter[status] += 1
  63. # 生成唯一标识符防止重复
  64. entry_key = f"{status}|{content_length}"
  65. if entry_key in unique_tracker:
  66. continue
  67. unique_tracker.add(entry_key)
  68. # 分类存储结果
  69. if status == 200:
  70. status_200.append(url_path)
  71. elif content_length != 0:
  72. non_200_results.append({
  73. "url": url_path,
  74. "status": status,
  75. "content_length": content_length
  76. })
  77. # 构建统计信息
  78. stats = {
  79. "total_200": len(status_200),
  80. "total_non_200": len(non_200_results),
  81. "status_distribution": dict(status_counter)
  82. }
  83. # 生成最终响应
  84. response = {
  85. "status": 200,
  86. "data": {
  87. "status_200": status_200,
  88. "non_200_results": non_200_results,
  89. "report_path": str(output_file),
  90. "stats": stats
  91. },
  92. "message": "扫描完成,结果已分类"
  93. }
  94. except json.JSONDecodeError as e:
  95. response = {"status": 500, "message": f"结果解析失败: {str(e)}"}
  96. except subprocess.TimeoutExpired:
  97. response = {"status": 408, "message": "扫描超时"}
  98. except Exception as e:
  99. response = {"status": 500, "message": f"扫描失败: {str(e)}"}
  100. return json.dumps(response, ensure_ascii=False, indent=2)
  101. @mcp.tool()
  102. def get_content(url: str) -> str:
  103. """
  104. 获取非200响应界面的网页内容
  105. :param url: 需要检测的网页地址
  106. :return: 返回页面的完整内容(若目标页面返回非200状态码)
  107. """
  108. try:
  109. response = requests.get(
  110. url,
  111. headers={"User-Agent": "Mozilla/5.0"},
  112. timeout=5
  113. )
  114. if response.status_code != 200:
  115. return response.text
  116. return f"非404页面,当前状态码:{response.status_code}"
  117. except requests.exceptions.RequestException as e:
  118. return f"请求异常:{str(e)}"
  119. def error_response(exception, code, message, filepath):
  120. """构建错误响应模板"""
  121. return {
  122. "status": code,
  123. "error": {
  124. "type": exception.__class__.__name__ if exception else "UnknownError",
  125. "details": str(exception) if exception else ""
  126. },
  127. "message": message,
  128. "failed_path": str(filepath) if filepath else None
  129. }
  130. if __name__ == "__main__":
  131. mcp.run(transport='sse')

三、MCP客户端对接

3.1 对接Cline

目前常用的mcp客户端有 CursorCherry StudioCline(插件),但只有Cline支持更改时间延迟,故以Cline为例

首先进入工具目录,允许以下命令,开启sse服务

  1. uv run main.py

如图便是开启成功

图片.png
然后Vscode下载最新版Cline插件,进入远程MCP服务添加页面,名称自定义,URL填写 http://0.0.0.0:8000/sse,并保存

图片.png
在已导入的MCP服务中,将超时时长设置为10min(视扫描时长而定),如果保存后无法连接mcp服务器,建议点击 Configure MCP Servers 选项,然后对弹出的配置json 进行ctrl+s 保存

我们在该界面便能看到对tool的介绍

图片.png
建议在对话中开启MCP自动调用,这样更加方便

图片.png

3.2 提示词优化

经测试,使用以下提示词效果更好

  1. 请帮我使用已有的mcp工具扫描网站https://xxx.xxx.top/ ,非200响应页面都要调用get_content函数获取内容,判断是否存在版本目录泄露等漏洞,并输出得到的目录,状态码,危害,利用方法,修复方法,以表格的形式统一给我写在md文件中

1 条评论

Elite
Elite

Ctfer

2 篇文章

站长统计