返回

AutoGen 文件操作安全指南:限制工作目录及防范路径遍历

Ai

怎样把 AutoGen 工具的作用范围限制在工作目录内?

使用 AutoGen 工具时, 为了节约资源, 我增加了读写文本文件的工具。

我的 agent 有一个工作目录:

executor = autogen.UserProxyAgent(
    name="executor",
    system_message="Executor. Execute the code written by the Engineer and report the result.",
    human_input_mode="NEVER",
    code_execution_config={
        "last_n_messages": 3,
        "work_dir": WORKING_DIR,
        "use_docker": False,
    },  # Please set use_docker=True if docker is available to run the generated code. Using docker is safer than running the generated code directly.
)

还有这些工具:

def read_file(file_name: Annotated[str, "File name has to be json, txt or html"]) -> int:
    if not file_name.endswith(".json") and not file_name.endswith(".txt") and not file_name.endswith(".html"):
        return f"I can read only .json, .txt or .html files you asked for {file_name}. Use python to read other files."
    if not os.path.exists(os.path.join(WORKING_DIR, file_name)):
        return f"File {file_name} does not exist."
    with open(os.path.join(WORKING_DIR, file_name), "r") as f:
        return f.read()


def write_file(file_name: Annotated[str, "File name"], content: Annotated[str, "text or json content"]) -> int:
    # verify that nested folders exists
    if not os.path.exists(f"{WORKING_DIR}/{os.path.dirname(file_name)}"):
        os.makedirs(f"{WORKING_DIR}/{os.path.dirname(file_name)}")

    with open(f"{WORKING_DIR}/{file_name}", "w") as f:
        return f.write(content)

怎样才能做得更好?让我不用再操心代码里的工作目录。

问题原因分析

目前的代码, read_filewrite_file 函数内部都直接使用了全局变量 WORKING_DIR。 这导致了几个问题:

  1. 代码耦合度高: 函数的实现细节和外部环境(WORKING_DIR)紧密绑定, 不够灵活。
  2. 潜在的安全风险: 如果 file_name 参数没有正确校验, 恶意用户可能利用它来访问工作目录之外的文件, 造成信息泄露或系统破坏。
  3. 测试困难: 函数依赖于全局变量, 单元测试时需要特别处理。
  4. 可读性差 每次都要重复写 os.path.join(WORKING_DIR, file_name) 影响可读性。

解决方案

下面提供几种改进方法,逐步提升安全性和代码质量:

方案一:使用装饰器 (Decorator)

用一个装饰器来处理文件路径, 把 WORKING_DIR 相关的逻辑封装起来。

原理: 装饰器是一个接受函数作为输入, 并返回一个新函数的函数。我们利用装饰器,在不修改原函数代码的情况下,给它添加处理文件路径的功能。

import functools
import os
from typing import Callable

def with_working_dir(func: Callable) -> Callable:
    """装饰器,自动将文件名拼接到工作目录。"""
    @functools.wraps(func)
    def wrapper(file_name: str, *args, **kwargs):
        full_path = os.path.join(WORKING_DIR, file_name)
        return func(full_path, *args, **kwargs)
    return wrapper

@with_working_dir
def read_file(file_path: str) -> str:
    if not file_path.endswith((".json", ".txt", ".html")):
        return f"只支持 .json, .txt 或 .html 文件, 你要求的是 {file_path}。"
    if not os.path.exists(file_path):
        return f"文件 {file_path} 不存在。"
    with open(file_path, "r") as f:
        return f.read()

@with_working_dir
def write_file(file_path: str, content: str) -> int:
    # 确保上级目录存在
    dir_name = os.path.dirname(file_path)
    if not os.path.exists(dir_name):
        os.makedirs(dir_name)

    with open(file_path, "w") as f:
        return f.write(content)

#使用示例, 不需要再加 WORKING_DIR
# content = read_file("my_file.txt")
# write_file("subdir/my_output.json", '{"key": "value"}')

代码解释:

  • with_working_dir 是装饰器函数, 它接收一个函数 func (比如 read_file) 作为参数。
  • wrapper 是内部函数, 它接收 file_name 和其他参数。在 wrapper 里, 把 file_nameWORKING_DIR 拼接成完整路径 full_path。然后调用原始函数 func, 把 full_path 作为参数传进去。
  • @functools.wraps(func) 这行很重要, 它保留了原始函数 func 的元数据 (比如函数名和文档字符串)。
  • 使用方法很简单,只要在read_file, write_file 前面加@with_working_dir 即可。

优点:

  • 代码更简洁, read_filewrite_file 内部不再需要处理 WORKING_DIR
  • 提高了代码的复用性, 如果有其他操作文件的函数, 也可以用这个装饰器。

方案二: 创建文件操作类

把文件读写操作封装到一个类里,将工作目录作为类的属性。

原理: 面向对象编程的思想。把数据(工作目录)和操作数据的方法(读、写)封装到一起。

import os

class FileManager:
    def __init__(self, working_dir: str):
        self.working_dir = working_dir

    def _get_full_path(self, file_name: str) -> str:
        """私有方法, 拼接完整路径。"""
        return os.path.join(self.working_dir, file_name)

    def read_file(self, file_name: str) -> str:
        full_path = self._get_full_path(file_name)
        if not full_path.endswith((".json", ".txt", ".html")):
            return f"只支持 .json, .txt 或 .html 文件, 你要求的是 {file_name}。"
        if not os.path.exists(full_path):
            return f"文件 {file_name} 不存在。"
        with open(full_path, "r") as f:
            return f.read()

    def write_file(self, file_name: str, content: str) -> int:
        full_path = self._get_full_path(file_name)
        dir_name = os.path.dirname(full_path)
        if not os.path.exists(dir_name):
            os.makedirs(dir_name)
        with open(full_path, "w") as f:
            return f.write(content)

# 使用示例:
# file_manager = FileManager(WORKING_DIR)
# content = file_manager.read_file("data.txt")
# file_manager.write_file("results/summary.txt", "结果...")

代码解释:

  • FileManager 类有一个 working_dir 属性, 在初始化时设置。
  • _get_full_path 是一个私有方法 (名字以下划线开头), 用于拼接完整路径。
  • read_filewrite_file 方法内部都通过 _get_full_path 来获取文件路径。

优点:

  • 更清晰地组织了代码, 将文件操作相关的逻辑都放在 FileManager 类中。
  • 以后如果要添加更多文件操作, 直接在类里增加方法即可, 很方便扩展。

方案三:增强安全性 - 路径规范化和校验

不管使用哪种方法, 都应该对用户提供的文件名进行更严格的校验, 防止路径遍历攻击 (Path Traversal)。

原理: 路径遍历攻击是指, 攻击者通过构造特殊的文件名 (比如包含 ../ ), 来访问到工作目录之外的文件。为了防止这种情况, 我们需要对文件名进行规范化和检查。

import os
import functools
from typing import Callable

def safe_with_working_dir(func: Callable) -> Callable:
    """更安全的装饰器,规范化并校验文件路径。"""
    @functools.wraps(func)
    def wrapper(file_name: str, *args, **kwargs):
        if not isinstance(file_name, str):
            raise TypeError("文件名必须是字符串。")

        # 规范化路径
        safe_path = os.path.normpath(os.path.join(WORKING_DIR, file_name))

        # 检查路径是否仍然在工作目录下
        if not safe_path.startswith(os.path.abspath(WORKING_DIR) + os.sep):
             raise ValueError(f"非法的文件访问:{file_name}, 只能访问工作目录内的文件!")

        return func(safe_path, *args, **kwargs)
    return wrapper

@safe_with_working_dir
def read_file(file_path: str) -> str:
      if not file_path.endswith((".json", ".txt", ".html")):
        return f"只支持 .json, .txt 或 .html 文件, 你要求的是 {file_path}。"
      if not os.path.exists(file_path):
        return f"文件 {file_path} 不存在。"
      with open(file_path, 'r') as f:
          return f.read()

@safe_with_working_dir
def write_file(file_path: str, content: str)-> int:
    dir_name = os.path.dirname(file_path)
    if not os.path.exists(dir_name):
      os.makedirs(dir_name)
    with open(file_path, 'w') as f:
          return f.write(content)

# 示例 (安全版本, 会抛出异常):
# read_file("../secret.txt") # 抛出 ValueError
# write_file("../../etc/passwd", "bad content") # 抛出 ValueError

# 示例(正常):
# read_file("my/dir/file.txt")

代码解释:

  • 新的装饰器名叫 safe_with_working_dir
  • os.path.normpath 将路径规范化,移除多余的 /.,并解析 ..
  • os.path.abspath 返回绝对路径. 绝对路径确保我们对比的都是不带任何 ../ 这类的相对标识的完整路径.
  • safe_path.startswith(os.path.abspath(WORKING_DIR) + os.sep) 检查规范化后的路径是否仍然以工作目录的绝对路径开头。 如果不是,说明用户试图访问工作目录之外的文件,抛出异常。
  • read_file, write_file 的代码保持不变。 只是使用了 @safe_with_working_dir 修饰

优点:

  • 极大地增强了安全性, 几乎可以杜绝路径遍历攻击。
  • 兼容之前的装饰器用法。

方案四:进阶使用技巧 – 文件类型检查加强版

如果对于读取文件的类型有要求, 比如只允许读取 json, txt 或 html 文件。可以在安全检查装饰器中, 添加类型校验.

原理 : 通过获取文件的扩展名,判断是否属于允许范围.

代码示例 (继续在安全版本上修改):

import os
import functools
from typing import Callable

def safe_with_working_dir(allowed_extensions: tuple = (".json", ".txt", ".html")) -> Callable:
    """更安全的装饰器, 支持文件类型检查。"""
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(file_name: str, *args, **kwargs):
            if not isinstance(file_name, str):
                raise TypeError("文件名必须是字符串。")

            # 规范化路径
            safe_path = os.path.normpath(os.path.join(WORKING_DIR, file_name))

            # 检查路径是否仍然在工作目录下
            if not safe_path.startswith(os.path.abspath(WORKING_DIR) + os.sep):
                raise ValueError(f"非法文件访问:{file_name}。")
            
            # 检查文件扩展名
            if not safe_path.lower().endswith(allowed_extensions) and func.__name__ == "read_file":
                 raise ValueError(f"不被允许的文件类型: {file_name},只允许: {allowed_extensions}。")

            return func(safe_path, *args, **kwargs)
        return wrapper
    return decorator
    
@safe_with_working_dir()
def read_file(file_path: str) -> str:
    if not os.path.exists(file_path):
        return f"文件 {file_path} 不存在。"
    with open(file_path, 'r') as f:
        return f.read()
        
@safe_with_working_dir()
def write_file(file_path: str, content: str)-> int:
    dir_name = os.path.dirname(file_path)
    if not os.path.exists(dir_name):
        os.makedirs(dir_name, exist_ok=True)
    with open(file_path, 'w') as f:
          return f.write(content)

#使用
# content = read_file("file.txt")
# content = read_file("file.py") #ValueError: 不被允许的文件类型: file.py,只允许: ('.json', '.txt', '.html')。

代码解释:

  • safe_with_working_dir 现在接受一个可选参数allowed_extensions,允许自定义白名单。 默认值还是 (".json", ".txt", ".html").
  • 使用 safe_path.lower().endswith(allowed_extensions)检查扩展名,且只针对read_file. lower()用于忽略大小写。

优点:

  • 可以在装饰器层统一控制文件类型, 使read_file 自身更专注于读取内容。
  • 使用了白名单机制, 安全性好.
  • 调用时候, 灵活方便.

总结

综合来看, 方案四(增强安全 + 文件类型检查的装饰器)是最好的选择,兼顾了安全性、代码简洁性、可维护性和可扩展性。