问答
发起
提问
文章
攻防
活动
Toggle navigation
首页
(current)
问答
商城
实战攻防技术
漏洞分析与复现
NEW
活动
摸鱼办
搜索
登录
注册
从对抗到出洞:某金融APP 实战渗透与 Frida 反检测绕过(Rpc + Flask + AutoDecoder)
渗透测试
移动安全
当抓包只剩密文、对抗又被检测,渗透是否就此停滞不前?本文跟着笔者在一次授权的金融类渗透项目中,如何使用rpc+frida+flask+autodecoder,一步步剥离算法,还原明文,最终产出漏洞。
注意:本研究仅限在**授权范围内**进行。本文所有技术演示均在受控测试环境下完成。未经明确许可,对他人系统进行探测、篡改或信息窃取可能触犯当地法律和公司政策。若在渗透测试中发现严重漏洞,请及时通过合规渠道通知责任方并跟进修复。 frida检测 ------- `frida`可以正常注入程序,但是一使用`Java.use`就会闪退  这里编译`frida-bridge`或者使用`ZygiskFrida`都可以绕过`Java.use`检测 编译`frida-bridge`可以参考这个视频 [frida模块化开发,frida-compile,解决frida的java的api检测相关问题,frida-java-bridge调试\_哔哩哔哩\_bilibili](https://www.bilibili.com/video/BV1RVScYoEzo/?spm_id_from=333.337.search-card.all.click&vd_source=997a434dc9f9daccb7fabe7bfcd4b027) 加载编译好的`_agent.js`即可绕过检测   `ZygiskFrida`这里使用的是这个项目 <https://github.com/sucsand/sucsand>  算法分析 ---- 首先抓到的数据包是这样的,请求体和响应体都是`json`格式,并且以`=`作为键。  先上算法助手hook一遍,然后搜索相关的密文字符串,但是并没有找到相关联的hook结果,那大概率加解密都是放在`native`层,然后看到请求头中有验签的字段`X-Emp-Signature`,索性直接`jadx`大法,全局搜索`X-Emp-Signature`。   经过分析定位到`initHttpRequest`方法,这里签名字段是通过HmacSha1实现的  这里`encryptHMAC`通过配置决定使用国密 SM3 算法还是传入的算法配置对输入数据和密钥进行 HMAC 运算。  签名的算法搞定后,再去看请求体是如何加密的,一般验证签名都是在数据加密后再去做的,可以通过`jadx`向上找引用,这里省的看代码直接用frida打印堆栈去看  往上追踪定位到`sendResquest`方法,`sendResquest`中通过`handleRequestBody`函数来处理请求体  继续跟到`handleRequestBody`,这里直接贴出GPT给出的解释    `handleRequestBody`处理完请求体之后,会调用`sendResquest`发送请求,这里再看下`sendResquest`的逻辑  这里会再对上面`handleRequestBody`的结果再进行一次Base64编码,然后再拼接成json发送数据包  这样逻辑就比较清晰了 ```php resultByts = HMAC(序列号 || AES(RNC + 请求体)) || 序列号 || AES(RNC + 请求体) result = Base64(resultByts) 请求体 = Base64(result) ``` 所以只要能拿到`AESCipher.clientKey`\_和`AESCipher.ClientIv_`以及`ClientHello.ServerHmacKey`即可对加密后的数据进行还原并且重放,这里继续往下跟踪`AESCipher`和`ClientHello`代码,发现只是定义了静态变量没有进行赋值。   为了找到`clientKey_`和`clientIv_`,继续追踪`AESAdapter.encrypt`方法,发现每次重新打开APP,对应的`clientKey_`和`clientIv_`都会改变,一开始想的是是不是动态向服务器去请求的`clientKey_`和`clientIv_`,但是抓包并没有发现类似的请求,事情开始变得有趣了起来。   `clientKey_`和`clientIv_`既不是通过HTTP请求传输的,又不是硬编码在代码中的,**那服务端到底是如何解密的呢?** 为了搞清楚`clientKey_`和`clientIv_`的生成逻辑,继续往上找`AESCipher`这个类的相关引用,最后发现一个可疑的方法,  这里的关键其实是第一行代码,其余的代码都是在做密钥的分割,然后赋值。 ```php byte[] allSecret = PRFCipher.PRF(ms2, HMac.TLS_MD_CLIENT_SERVER_KEYIVMAC_CONST(), ms2RncRnsSeed, R2.attr.arrowHeadLength); ``` 所以还是得继续往上找引用,看看`ms2`是如何生成的,定位到`handleServerKeyExchange`方法,熟悉TLS流程的朋友这里就能看出来,这里的TLS握手流程中的**服务器密钥交换**过程非常相似。  继续往上找,定位到`handlerServerKeyExchange`函数  继续追`handleFacilityServerHelloResponse`  继续往上追,发现了一个可疑的请求,生成的密钥结果都是从这个请求中提取的。  抓包也同样看到了这个请求  继续向上看定位到`ClientHello`的构造方法  这里直接贴出GPT给的代码解释  但是由于测试时间有限,按理说这部分代码也是可以通过`Python`代码去解析响应然后提取`clientIv`、`clientKey`、`serverKey`、`serverIv`的,但转念一想这几个参数都是静态参数,可以直接用`frida`去获取。 解密思路 ---- 首先用`frida+rpc+flask`获取`clientIv`、`clientKey`、`serverKey`、`serverIv`,脚本如下 在上文编译好的`_agent.js`中添加需要发送到`Python`端的数据。 ```php Java.perform(function () { var ClientHello = Java.use("com.rytong.emp.net.ClientHello"); var mClientHmacKey = ClientHello.mClientHmacKey.value; var AESCipher = Java.use("com.rytong.emp.security.AESCipher"); var clientKey = AESCipher.clientKey_.value; var clientIv = AESCipher.clientIv_.value; var serverKey = AESCipher.serverKey_.value; var serverIv = AESCipher.serverIv_.value; var result = { clientKey: bytesToHex(Java.array('byte', clientKey)), clientIv: bytesToHex(Java.array('byte', clientIv)), serverKey: bytesToHex(Java.array('byte', serverKey)), serverIv: bytesToHex(Java.array('byte', serverIv)), clientHmacKey: bytesToHex(Java.array('byte', mClientHmacKey)) }; send(JSON.stringify(result)); }); ``` 在`Python`端接收数据,并且提供对外的接口。 ```php import frida import sys import json from flask import Flask, jsonify import os TARGET_APP = "xxx" # 你需要改成实际包名 aes_data = { "clientKey": None, "clientIv": None, "serverKey": None, "serverIv": None, "clientHmacKey": None, } def load_agent_script(file_path): if not os.path.exists(file_path): print(f"[!] 找不到脚本文件: {file_path}") sys.exit(1) with open(file_path, 'r', encoding='utf-8') as f: return f.read() def on_message(message, data): global aes_data if message["type"] == "send": print("[*] 获取到AES信息: ", message["payload"]) aes_data.update(json.loads(message["payload"])) elif message["type"] == "error": print("[!] 脚本错误: ", message["stack"]) def init_frida(agent_file="_agent.js"): device = frida.get_usb_device() try: session = device.attach(TARGET_APP) print(f"[*] 已连接到正在运行的应用: {TARGET_APP}") except frida.ProcessNotFoundError: print(f"[!] 未找到运行中的进程: {TARGET_APP}") print("[!] 请确保应用已经启动") return None except Exception as e: print(f"[!] 连接失败: {e}") return None agent_code = load_agent_script(agent_file) script = session.create_script(agent_code) script.on("message", on_message) script.load() print("[*] 已启动 Frida hook,等待 AES 数据...") return session app = Flask(__name__) @app.route("/get_aes_key", methods=["GET"]) def get_aes_key(): if aes_data["clientKey"]: return jsonify({"status": "ok", "data": aes_data}) else: return jsonify({"status": "error", "msg": "AES key not yet captured"}) if __name__ == "__main__": init_frida("_agent.js") app.run(host="0.0.0.0", port=5001, debug=False) ``` 运行成功后提示如下  然后就可以通过自定义`autodecoder`接口加解密脚本,来实现自动化解密。 **按步骤来解密思路如下** 1. **Base64解码:** 首先,将接收到的字符串进行两次 Base64 解码,还原成原始的二进制数据包。 2. **数据切割:** 按照 `HMAC签名 (20字节)` + `序列号 (9字节)` + `AES加密体` 的固定结构,将二进制数据包精确地切割成三部分。 3. **AES解密:** 使用从Frida服务动态获取的 `clientKey` 和 `clientIv`,对 `AES加密体` 部分进行 AES-CBC 模式的解密。 4. **去除填充与头部:** 对解密后的数据,先执行 PKCS7 Unpadding(去除填充),然后再从数据头部去掉一个 `RNC随机数 (32字节)`。 5. **提取明文:** 经过以上步骤后,剩余的数据就是最终可读的JSON明文。同时,脚本会**保存**本次解密得到的`序列号`和`RNC随机数`,供后续加密时使用。 **加密思路同样如此** 1. **准备明文:** 将要发送的JSON明文前,先拼接上一个(复用的或新的)`RNC随机数`**。** 2. **填充与加密:** 对拼接后的数据进行 PKCS7 Padding,然后使用 `clientKey` 和 `clientIv` 进行 AES-CBC 加密,得到 `AES加密体`。 3. **计算HMAC签名:** 将一个(复用的或新的)`序列号`与 `AES加密体` 拼接,然后使用 `clientHmacKey` 计算其 HMAC-SHA1 签名。 4. **数据封装:**按照 `HMAC签名` + `序列号` + `AES加密体` 的顺序组装成最终的数据包,再对此数据包进行两次 Base64 编码,得到可以发送的字符串。 这里附上autodecoder脚本,思路仅供参考学习 ```php # -*- coding:utf-8 -*- from flask import Flask, request, jsonify from Crypto.Cipher import AES import requests import base64 import json import binascii import chardet import traceback import hmac import hashlib import sys import os import re HMAC_LEN = 20 SERIAL_LEN = 9 RNC_LEN = 32 DEFAULT_ENCODING = "utf-8" # 保存最近一次“请求包”解密得到的数据,供后续加密复用 LAST_DECODE_STATE = { "serial_bytes": None, # 9字节序列号 "rnc_bytes": None, # RNC 前缀 "encoding": DEFAULT_ENCODING, } FRIDA_RPC_URL = "http://127.0.0.1:5001/get_aes_key" app = Flask(__name__) # 仅替换 JSON 字符串字面量内部的等号为 \u003d,避免替换到非字符串位置 def escape_equals_in_json_strings(s: str) -> str: # 匹配 JSON 字符串(含转义) pattern = r'"(?:\\.|[^"\\])*"' def repl(m): token = m.group(0) # 包含首尾引号 inner = token[1:-1] inner = inner.replace('=', '\\u003d') return '"' + inner + '"' return re.sub(pattern, repl, s) def pkcs7_unpad(data: bytes) -> bytes: pad_len = data[-1] if pad_len < 1 or pad_len > AES.block_size: raise ValueError("Invalid padding length") if data[-pad_len:] != bytes([pad_len]) * pad_len: raise ValueError("Invalid padding bytes") return data[:-pad_len] def pkcs7_pad(data: bytes) -> bytes: pad_len = AES.block_size - (len(data) % AES.block_size) return data + bytes([pad_len]) * pad_len def get_aes_keys() -> dict: try: resp = requests.get(FRIDA_RPC_URL, timeout=3) resp.raise_for_status() j = resp.json() if j.get("status") != "ok": raise Exception(j.get("msg", "unknown error")) return j["data"] except Exception as e: raise Exception(f"获取AES key失败: {e}") def verify_hmac(data_bytes: bytes, hmac_key: bytes, expect_hmac: bytes) -> bool: # 与 Java 一致:直接对原始字节做 HMAC-SHA1 calc_hmac = hmac.new(hmac_key, data_bytes, hashlib.sha1).digest() return calc_hmac == expect_hmac def calc_hmac_java_style(data_bytes: bytes, hmac_key: bytes) -> bytes: """ 与 Java 一致:直接对原始字节做 HMAC-SHA1 """ return hmac.new(hmac_key, data_bytes, hashlib.sha1).digest() def request_decode_tls13(cipher_str: str, client_key_hex: str, client_iv_hex: str, hmac_key_hex: str, hmac_len=HMAC_LEN, ser_len=SERIAL_LEN, rnc_len=RNC_LEN) -> str: first_decode = base64.b64decode(cipher_str) second_decode = base64.b64decode(first_decode) data = bytearray(second_decode) # 拆包 hmac_bytes = data[:hmac_len] serial_bytes = data[hmac_len:hmac_len + ser_len] body_cr_bytes = data[hmac_len + ser_len:] # AES 解密 key_bytes = binascii.unhexlify(client_key_hex) iv_bytes = binascii.unhexlify(client_iv_hex) cipher = AES.new(key_bytes, AES.MODE_CBC, iv_bytes) decrypted = cipher.decrypt(body_cr_bytes) decrypted = pkcs7_unpad(decrypted) if len(decrypted) < rnc_len: raise ValueError("解密数据长度异常") rnc_bytes = decrypted[:rnc_len] plaintext_bytes = decrypted[rnc_len:] encoding = chardet.detect(plaintext_bytes).get("encoding") or "utf-8" try: LAST_DECODE_STATE["serial_bytes"] = bytes(serial_bytes) LAST_DECODE_STATE["rnc_bytes"] = bytes(rnc_bytes) LAST_DECODE_STATE["encoding"] = encoding except Exception: pass return plaintext_bytes.decode(encoding, errors="replace") def response_decode(cipher_str: str, server_key_hex: str, server_iv_hex: str) -> str: first_decode = base64.b64decode(cipher_str) second_decode = base64.b64decode(first_decode) body_bytes = bytearray(second_decode) aes_cipher_data = body_bytes[HMAC_LEN:] key_bytes = binascii.unhexlify(server_key_hex) iv_bytes = binascii.unhexlify(server_iv_hex) cipher = AES.new(key_bytes, AES.MODE_CBC, iv_bytes) decrypted = cipher.decrypt(aes_cipher_data) decrypted = pkcs7_unpad(decrypted) encoding = chardet.detect(decrypted).get("encoding") or "utf-8" return decrypted.decode(encoding, errors="replace") def request_encode_tls13(plaintext_str: str, client_key_hex: str, client_iv_hex: str, hmac_key_hex: str, encoding='utf-8', serial_bytes: bytes = None, rnc_bytes: bytes = None) -> str: try: import json parsed = json.loads(plaintext_str) plaintext_str = json.dumps(parsed, ensure_ascii=False, separators=(',', ':')) # 紧凑格式 # 将 JSON 字符串字面量内部的等号替换为 \u003d plaintext_str = escape_equals_in_json_strings(plaintext_str) print(f"使用JSON序列化后的字符串长度: {len(plaintext_str)}") except: print("不是有效的JSON格式,保持原样") print(f"原始字符串长度: {len(plaintext_str)}") plaintext_bytes = plaintext_str.encode(encoding) print(plaintext_str) print(f"编码为字节后长度: {len(plaintext_bytes)}") rnc = rnc_bytes if (rnc_bytes is not None and len(rnc_bytes) == RNC_LEN) else os.urandom(RNC_LEN) padded = pkcs7_pad(rnc + plaintext_bytes) key_bytes = binascii.unhexlify(client_key_hex) iv_bytes = binascii.unhexlify(client_iv_hex) cipher = AES.new(key_bytes, AES.MODE_CBC, iv_bytes) body_cr_bytes = cipher.encrypt(padded) # 复用最近一次解密得到的 serial,否则随机 serial = serial_bytes if (serial_bytes is not None and len(serial_bytes) == SERIAL_LEN) else os.urandom(SERIAL_LEN) # HMAC-SHA1( serial || body_cr ) hmac_key = binascii.unhexlify(hmac_key_hex) hmac_full = calc_hmac_java_style(serial + body_cr_bytes, hmac_key) hmac_bytes = hmac_full[:HMAC_LEN] packet = hmac_bytes + serial + body_cr_bytes out = base64.b64encode(base64.b64encode(packet)).decode('ascii') return out def calculate_signature(body: str, hmac_key_hex: str) -> str: import base64, binascii, hmac, hashlib try: decoded_body = base64.b64decode(body) hmac_key = binascii.unhexlify(hmac_key_hex) signature = hmac.new(hmac_key, decoded_body, hashlib.sha1).digest() signature_b64 = base64.b64encode(signature).decode('ascii') return signature_b64 except Exception as e: print(f"计算签名失败: {e}") return "" def update_signature_header(headers: str, signature: str) -> str: if not signature: return headers if 'X-Emp-Signature:' in headers: # 替换现有的签名 headers = re.sub(r'X-Emp-Signature:\s*[^\r\n]*', f'X-Emp-Signature: {signature}', headers) else: # 添加新的签名头 lines = headers.split('\n') if len(lines) > 0: lines.insert(1, f'X-Emp-Signature: {signature}') headers = '\n'.join(lines) return headers @app.route("/decode", methods=["POST"]) def decode(): try: # 解析 body 和 dataHeaders(dataHeaders 是前端转发的原始请求头) headers = request.form.get("dataHeaders") body = request.form.get("dataBody") reqresp = request.form.get('requestorresponse') # 获取 requestorresponse 参数 可选 获取是请求还是响应包,需要勾选<请求响应不同加解密>按钮 keys = get_aes_keys() if headers != None: # 开启了请求头加密 if reqresp == 'request': # 请求包 text = request_decode_tls13(body, keys["clientKey"], keys["clientIv"], keys["clientHmacKey"]) parsed_json = json.loads(text) body = json.dumps(parsed_json, ensure_ascii=False, indent=2) return headers.strip() + "\r\n\r\n\r\n\r\n" + body # 返回值为固定格式,不可更改 必需必需必需,共四个\r\n if reqresp == 'response': # 响应包 text = response_decode(body, keys["serverKey"], keys["serverIv"]) parsed_json = json.loads(text) body = json.dumps(parsed_json, ensure_ascii=False, indent=2) return headers.strip() + "\r\n\r\n\r\n\r\n" + body # 返回值为固定格式,不可更改 必需必需必需,共四个\r\n return body # 返回值为固定格式,不可更改 必需必需必需 except Exception as e: traceback.print_exc() return jsonify({"status": "error", "msg": str(e)}), 500 @app.route("/encode", methods=["POST"]) def encode(): try: headers = request.form.get("dataHeaders") body = request.form.get("dataBody") reqresp = request.form.get('requestorresponse') keys = get_aes_keys() serial_bytes = LAST_DECODE_STATE.get("serial_bytes") rnc_bytes = LAST_DECODE_STATE.get("rnc_bytes") encoding = LAST_DECODE_STATE.get("encoding", DEFAULT_ENCODING) print("serial_bytes:", serial_bytes.hex()) print("rnc_bytes:", rnc_bytes.hex()) cipher_str = request_encode_tls13( body, keys["clientKey"], keys["clientIv"], keys["clientHmacKey"], encoding=encoding, serial_bytes=serial_bytes, rnc_bytes=rnc_bytes ) # 将cipher_str封装成JSON格式的字符串 cipher_json_str = json.dumps({"=": cipher_str}, ensure_ascii=False) if headers is not None: # 计算并更新签名头 signature = calculate_signature(cipher_str, keys["clientHmacKey"]) headers = update_signature_header(headers, signature) result = headers.strip() + "\r\n\r\n\r\n\r\n" + cipher_json_str else: result = cipher_json_str return result except Exception as e: traceback.print_exc() return jsonify({"status": "error", "msg": str(e)}), 500 if __name__ == '__main__': app.config['JSON_AS_ASCII'] = False app.run(host="0.0.0.0", port=5002, debug=True) ``` 最终效果 ---- `Burpsuite`的各项功能均正常使用,在`Proxy`模块可通过`autoDecoder`板块查看明文数据包   `Repeater`等模块均可正常使用  即可开始愉快的测试~
发表于 2025-10-15 09:00:00
阅读 ( 424 )
分类:
渗透测试
3 推荐
收藏
0 条评论
请先
登录
后评论
sola
1 篇文章
×
发送私信
请先
登录
后发送私信
×
举报此文章
垃圾广告信息:
广告、推广、测试等内容
违规内容:
色情、暴力、血腥、敏感信息等内容
不友善内容:
人身攻击、挑衅辱骂、恶意行为
其他原因:
请补充说明
举报原因:
×
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!