TL;DR
Fl1pper Zer0: secp256r1(ECDSA) 기반 서명 서비스에 AES-GCM으로 서명키를 래핑한 구조인데, nonce 고정, 세션에 동일한 AES키로 암호화문제
Fl1pper Zer1: secp256r1(ECDSA) 기반 서비스 -> 서버가 Shamir(3차) 공유 4개를 AES-GCM으로 암호화해 배포하지만 nonce 고정, 동일 키 재사용으로 공유들 간 CTR 스트림이 재사용되어 $d$ 복구 가능.
Fl1pper Zer0
from Crypto.Util.number import long_to_bytes, bytes_to_long, inverse
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from fastecdsa.curve import P256 as EC
from fastecdsa.point import Point
import os, random, hashlib, json
from secret import FLAG
class SignService:
def __init__(self):
self.G = Point(EC.gx, EC.gy, curve=EC)
self.order = EC.q
self.p = EC.p
self.a = EC.a
self.b = EC.b
self.privkey = random.randrange(1, self.order - 1)
self.pubkey = (self.privkey * self.G)
self.key = os.urandom(16)
self.iv = os.urandom(16)
def generate_key(self):
self.privkey = random.randrange(1, self.order - 1)
self.pubkey = (self.privkey * self.G)
def ecdsa_sign(self, message, privkey):
z = int(hashlib.sha256(message).hexdigest(), 16)
k = random.randrange(1, self.order - 1)
r = (k*self.G).x % self.order
s = (inverse(k, self.order) * (z + r*privkey)) % self.order
return (r, s)
def ecdsa_verify(self, message, r, s, pubkey):
r %= self.order
s %= self.order
if s == 0 or r == 0:
return False
z = int(hashlib.sha256(message).hexdigest(), 16)
s_inv = inverse(s, self.order)
u1 = (z*s_inv) % self.order
u2 = (r*s_inv) % self.order
W = u1*self.G + u2*pubkey
return W.x == r
def aes_encrypt(self, plaintext):
cipher = AES.new(self.key, AES.MODE_GCM, nonce=self.iv)
ct, tag = cipher.encrypt_and_digest(plaintext)
return tag + ct
def aes_decrypt(self, ciphertext):
tag, ct = ciphertext[:16], ciphertext[16:]
cipher = AES.new(self.key, AES.MODE_GCM, nonce=self.iv)
plaintext = cipher.decrypt_and_verify(ct, tag)
return plaintext
def get_flag(self):
key = hashlib.sha256(long_to_bytes(self.privkey)).digest()[:16]
cipher = AES.new(key, AES.MODE_ECB)
encrypted_flag = cipher.encrypt(pad(FLAG.encode(), 16))
return encrypted_flag
if __name__ == '__main__':
print("Welcome to Fl1pper Zer0 – Signing Service!\n")
S = SignService()
signkey = S.aes_encrypt(long_to_bytes(S.privkey))
print(f"Here is your encrypted signing key, use it to sign a message : {json.dumps({'pubkey': {'x': hex(S.pubkey.x), 'y': hex(S.pubkey.y)}, 'signkey': signkey.hex()})}")
while True:
print("\nOptions:\n \
1) sign <message> <signkey> : Sign a message\n \
2) verify <message> <signature> <pubkey> : Verify the signed message\n \
3) generate_key : Generate a new signing key\n \
4) get_flag : Get the flag\n \
5) quit : Quit\n")
try:
inp = json.loads(input('> '))
if 'option' not in inp:
print(json.dumps({'error': 'You must send an option'}))
elif inp['option'] == 'sign':
msg = bytes.fromhex(inp['msg'])
signkey = bytes.fromhex(inp['signkey'])
sk = bytes_to_long(S.aes_decrypt(signkey))
r, s = S.ecdsa_sign(msg, sk)
print(json.dumps({'r': hex(r), 's': hex(s)}))
elif inp['option'] == 'verify':
msg = bytes.fromhex(inp['msg'])
r = int(inp['r'], 16)
s = int(inp['s'], 16)
px = int(inp['px'], 16)
py = int(inp['py'], 16)
pub = Point(px, py, curve=EC)
verified = S.ecdsa_verify(msg, r, s, pub)
if verified:
print(json.dumps({'result': 'Success'}))
else:
print(json.dumps({'result': 'Invalid signature'}))
elif inp['option'] == 'generate_key':
S.generate_key()
signkey = S.aes_encrypt(long_to_bytes(S.privkey))
print("Here is your *NEW* encrypted signing key :")
print(json.dumps({'pubkey': {'x': hex(S.pubkey.x), 'y': hex(S.pubkey.y)}, 'signkey': signkey.hex()}))
elif inp['option'] == 'get_flag':
encrypted_flag = S.get_flag()
print(json.dumps({'flag': encrypted_flag.hex()}))
elif inp['option'] == 'quit':
print("Adios :)")
break
else:
print(json.dumps({'error': 'Invalid option'}))
except Exception:
print(json.dumps({'error': 'Oops! Something went wrong'}))
break
이중에서 focus해야하는건
def aes_encrypt(self, plaintext):
cipher = AES.new(self.key, AES.MODE_GCM, nonce=self.iv)
ct, tag = cipher.encrypt_and_digest(plaintext)
return tag + ct
이 부분
AES-GCM은 같은 (키, nonce) 조합으로 두 개 이상의 메시지를 암호화하면 GHASH 키가 노출된다.
서비스는 generate_key를 통해 새로운 서명키를 발급하지만 nonce는 고정되어 있다.
태그 관계식을 활용해 $GHASH$ 키 $H$와 $E_K (J_0)$를 복구 가능
$$ \text{tag}_i = E_K(J_0) \oplus \mathrm{GHASH}_H(C_i), \quad C_i = \text{ciphertext block}_i $$
$\mathrm{GHASH}_H$는 $GF(2^128)$ 곱셈 반복
$$ Y_{j+1} = (Y_j \oplus C_j) \cdot H, \quad Y_0 = 0 $$
두 쌍 이상 $(C_i, \text{tag}_i)$ 확보하면 선형 방정식 $H$를 풀 수 있고, 이후 모든 ciphertext에 대해 위조 태그를 만들 수 있다.
즉, 서명 오라클에 임의의 평문 서명키를 주입할 수 있는 상태가 된다.
Analysis
1. GHASH 키 & $E_K (J_0)$ 복구
1. generate_key를 최소 3회 호출해서 서로 다른 태그 획득 $(tag_i, C_i)$ 획득.
2. $GF(2^128)$에서 다음 시스템을 풀어 $H$를 획득.
$$ D_i = \text{tag}_i \oplus \text{tag}_0,\; A_i = C_{i,1} \oplus C_{0,1},\; B_i = C_{i,2} \oplus C_{0,2} $$
$$ D_i = H^2(A_i \cdot H \oplus B_i) $$
3. 얻은 $H$로 $E_K(J_0) = \text{tag}_0 \oplus \mathrm{GHASH}_H(C_0)$ 계산.
2. 태그 위조 및 서명키 조작
1. 원하는 256비트 서명키 평문 $K^*$에 대해 $C^* = \text{Enc}_{AES-GCM}(K^*)$에서 암호문 부분만 사용하고, 복구한 $H$, $E_K(J_0)$를 이용해 새 태그를 생성한다.
2. sign 옵션으로 $tag^*‖C^*$를 제출하면 서비스는 $K^*$로 서명을 수행한다.
3. ECDSA 공개키 복구
서명 $(r, s)$과 메시지 해시 $z$가 주어지면 공개키 후보를 복원한다.
1. 후보 $x$값으로 $r$과 $r + n \rightarrow$ ($n$: 곡선 차수). $x \ge p$ 이면 버림.
2. 곡선 방정식으로부터 $y = \sqrt{x^3 + ax + b \bmod p}$ 계산 ($(p+1)/4$ 제곱 활용).
3. 후보 점 $R = (x, y)$로부터 $ Q = r^{-1}(sR - zG) $를 계산하고, 검증식 $r \equiv (u_1 G + u_2 Q)_x \pmod{n}$을 만족하는 후보를 채택
4. 비트 플립을 이용한 개인키 재구성
1. $ct_base$와 일치하는 공개키를 $Q_0$라 하고, $ct_base \oplus 2^i$를 암호문으로 사용했을 때 대응하는 공개키를 $Q_i$
2. 두 점의 차이 $\Delta_i = Q_i - Q_0$를 계산 $ \Delta_i \stackrel{?}{=} \pm (2^i \bmod n) G $ 여부를 확인합니다.
3. +면 해당 비트는 0, - 이면 1로 판정. 이 과정을 256비트 수행하면 원래의 개인키 스칼라 $d$를 얻는다.
4. 최종 검증은 $dG = Q_0$로 확인.
Exploit
from pwn import *
from Crypto.Cipher import AES
from Crypto.Util.number import inverse
from fastecdsa.curve import P256
from fastecdsa.point import Point
import hashlib
import json
import re
context.log_level = 'debug'
MSG = b"andsopwn"
MASK = (1 << 128) - 1
def bit_reverse_128(x: int) -> int:
x = ((x & 0x55555555555555555555555555555555) << 1) | ((x >> 1) & 0x55555555555555555555555555555555)
x = ((x & 0x33333333333333333333333333333333) << 2) | ((x >> 2) & 0x33333333333333333333333333333333)
x = ((x & 0x0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f) << 4) | ((x >> 4) & 0x0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f)
x = ((x & 0x00ff00ff00ff00ff00ff00ff00ff00ff) << 8) | ((x >> 8) & 0x00ff00ff00ff00ff00ff00ff00ff00ff)
x = ((x & 0x0000ffff0000ffff0000ffff0000ffff) << 16) | ((x >> 16) & 0x0000ffff0000ffff0000ffff0000ffff)
x = ((x & 0x00000000ffffffff00000000ffffffff) << 32) | ((x >> 32) & 0x00000000ffffffff00000000ffffffff)
x = ((x & 0x0000000000000000ffffffffffffffff) << 64) | ((x >> 64) & 0x0000000000000000ffffffffffffffff)
return x
def to_elem(block: bytes) -> int:
return bit_reverse_128(int.from_bytes(block, "big"))
def from_elem(value: int) -> bytes:
return bit_reverse_128(value).to_bytes(16, "big")
def gf_mul(x: int, y: int) -> int:
prod = 0
for i in range(128):
if (y >> i) & 1:
prod ^= x << i
for i in range(255, 127, -1):
if (prod >> i) & 1:
shift = i - 128
prod ^= 1 << i
prod ^= 1 << (shift + 7)
prod ^= 1 << (shift + 2)
prod ^= 1 << (shift + 1)
prod ^= 1 << shift
return prod
def gf_pow(x: int, n: int) -> int:
result = 1
base = x
while n:
if n & 1:
result = gf_mul(result, base)
base = gf_mul(base, base)
n >>= 1
return result
def gf_inv(x: int) -> int:
if x == 0:
raise ZeroDivisionError
return gf_pow(x, (1 << 128) - 2)
def ghash(H: int, blocks):
Y = 0
for block in blocks:
Y = gf_mul(Y ^ block, H)
return Y
def recv_prompt(io) -> str:
return io.recvuntil(b"\r\n\r\n> ").decode()
def collect_records(io, count=4):
records = []
data = recv_prompt(io)
for line in data.splitlines():
if '{"pubkey"' in line:
records.append(json.loads(line[line.index('{"pubkey"'):]))
break
for _ in range(count - 1):
io.sendline(json.dumps({"option": "generate_key"}).encode())
data = recv_prompt(io)
for line in data.splitlines():
if '{"pubkey"' in line:
records.append(json.loads(line[line.index('{"pubkey"'):]))
break
return records
def parse_record(record):
raw = bytes.fromhex(record["signkey"])
return {
"tag": to_elem(raw[:16]),
"c1": to_elem(raw[16:32]),
"c2": to_elem(raw[32:]),
"ct": raw[16:],
}
def recover_ghash_params(records):
base = records[0]
len_block = to_elem((0).to_bytes(8, "big") + (32 * 8).to_bytes(8, "big"))
triples = []
for rec in records[1:4]:
triples.append((rec["c1"] ^ base["c1"], rec["c2"] ^ base["c2"], rec["tag"] ^ base["tag"]))
(A1, B1, D1), (A2, B2, D2), (A3, B3, D3) = triples
R = gf_mul(D1, gf_inv(D2))
E = gf_mul(R, A2) ^ A1
F = gf_mul(R, B2) ^ B1
H = gf_mul(gf_inv(E), F)
ghash_base = ghash(H, [base["c1"], base["c2"], len_block])
EKJ0 = base["tag"] ^ ghash_base
return H, EKJ0
def forge_tag(ciphertext: bytes, H: int, EKJ0: int) -> bytes:
len_block = to_elem((0).to_bytes(8, "big") + (32 * 8).to_bytes(8, "big"))
c1 = to_elem(ciphertext[:16])
c2 = to_elem(ciphertext[16:])
tag_elem = EKJ0 ^ ghash(H, [c1, c2, len_block])
return from_elem(tag_elem)
def send_sign(io, ciphertext: bytes, H: int, EKJ0: int):
forged_tag = forge_tag(ciphertext, H, EKJ0)
payload = {
"option": "sign",
"msg": MSG.hex(),
"signkey": (forged_tag + ciphertext).hex(),
}
io.sendline(json.dumps(payload).encode())
data = recv_prompt(io)
m = re.search(r'"r": "0x([0-9a-f]+)", "s": "0x([0-9a-f]+)"', data)
if not m:
raise RuntimeError("signature parse failure")
return int(m.group(1), 16), int(m.group(2), 16)
def recover_candidates(r: int, s: int, z: int):
candidates = []
inv_r = inverse(r, P256.q)
for k in range(2):
x = r + k * P256.q
if x >= P256.p:
continue
alpha = (pow(x, 3, P256.p) + P256.a * x + P256.b) % P256.p
beta = pow(alpha, (P256.p + 1) // 4, P256.p)
for y in [beta, (-beta) % P256.p]:
R = Point(x, y, curve=P256)
Q = inv_r * (s * R - z * Point(P256.gx, P256.gy, curve=P256))
w = inverse(s, P256.q)
u1 = (z * w) % P256.q
u2 = (r * w) % P256.q
check = u1 * Point(P256.gx, P256.gy, curve=P256) + u2 * Q
if check.x % P256.q == r % P256.q:
candidates.append(Q)
return candidates
def main():
io = remote('flipper.p2.securinets.tn', 6000)
records_raw = collect_records(io)
parsed = [parse_record(rec) for rec in records_raw]
H, EKJ0 = recover_ghash_params(parsed)
ct_base = parsed[-1]["ct"]
server_pub = Point(int(records_raw[-1]["pubkey"]["x"], 16), int(records_raw[-1]["pubkey"]["y"], 16), curve=P256)
z = int(hashlib.sha256(MSG).hexdigest(), 16) % P256.q
r0, s0 = send_sign(io, ct_base, H, EKJ0)
base_candidates = recover_candidates(r0, s0, z)
for Q in base_candidates:
if Q == server_pub:
Q_base = Q
break
else:
Q_base = base_candidates[0]
priv = 0
ct_base_int = int.from_bytes(ct_base, "big")
G = Point(P256.gx, P256.gy, curve=P256)
for bit in range(256):
delta = 1 << bit
ct_mut = (ct_base_int ^ delta).to_bytes(32, "big")
r, s = send_sign(io, ct_mut, H, EKJ0)
candidates = recover_candidates(r, s, z)
expected = (delta % P256.q) * G
matched = False
for Q in candidates:
diff = Q - Q_base
if diff == expected:
matched = True
break
if diff == (-expected):
priv |= delta
matched = True
break
if not matched:
io.close()
raise RuntimeError(f"bit {bit} unresolved")
assert (priv % P256.q) * G == Q_base
io.sendline(json.dumps({"option": "get_flag"}).encode())
resp = recv_prompt(io)
io.close()
flag_hex = re.search(r'"flag": "([0-9a-f]+)"', resp).group(1)
key = hashlib.sha256(priv.to_bytes(32, "big")).digest()[:16]
flag = AES.new(key, AES.MODE_ECB).decrypt(bytes.fromhex(flag_hex))
print(flag.decode().strip())
if __name__ == "__main__":
main()
b' 5) quit : Quit\r\n'
b'\r\n'
b'> '
[DEBUG] Sent 0x17 bytes:
b'{"option": "get_flag"}\n'
[DEBUG] Received 0x1bd bytes:
b'{"option": "get_flag"}\r\n'
b'{"flag": "5fdfe234b8d4904a44749d8cb7311d4cd43acfbefce5bc23aac18fab374b49427f0525540b35a7c3160f4dfd7edc0220800874318e935f2d3e69431a77382bb6358d3dc917f80b4e0e9306eb5400adf5"}\r\n'
b'\r\n'
b'Options:\r\n'
b' 1) sign <message> <signkey> : Sign a message\r\n'
b' 2) verify <message> <signature> <pubkey> : Verify the signed message\r\n'
b' 3) generate_key : Generate a new signing key\r\n'
b' 4) get_flag : Get the flag\r\n'
b' 5) quit : Quit\r\n'
b'\r\n'
b'> '
[*] Closed connection to flipper.p2.securinets.tn port 6000
Securinets{bea0c8b66714035aaa7e7035868dd58ac229399449b663da96cf637f2ced3d84}\x04\x04\x04\x04
복구된 개인키: 0x27299e9c80a96457ac09654d99c03ca13f397cf3ada37dc036c9e7a8e4d38ec4
검증: priv * G 서비스에서 공개키 일치한거 확인
Fl1pper Zer1
from sage.all import *
from Crypto.Util.number import long_to_bytes, bytes_to_long, inverse
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from fastecdsa.curve import P256 as EC
from fastecdsa.point import Point
import os, random, hashlib, json
load('secret.sage')
class SecureSignService:
def __init__(self):
self.G = Point(EC.gx, EC.gy, curve=EC)
self.order = EC.q
self.p = EC.p
self.a = EC.a
self.b = EC.b
self.privkey = random.randrange(1, self.order - 1)
self.pubkey = (self.privkey * self.G)
self.key = os.urandom(16)
self.iv = os.urandom(16)
def split_privkey(self, privkey):
shares = []
coeffs = [privkey]
for _ in range(3):
coeffs.append(random.randrange(1, self.order))
P.<x> = PolynomialRing(GF(self.order))
poly = sum(c*x^i for i, c in enumerate(coeffs))
for x in range(1, 5):
y = poly(x=x)
shares.append((x, y))
return shares
def reconstruct_privkey(self, shares):
P.<x> = PolynomialRing(GF(self.order))
reconst_poly = P.lagrange_polynomial(shares)
return int(reconst_poly(0))
def shares_encrypt(self, shares):
return [self.aes_encrypt(long_to_bytes(int(s[1]))).hex() for s in shares]
def shares_decrypt(self, shares):
return [(x+1, bytes_to_long(self.aes_decrypt(bytes.fromhex(y)))) for x, y in enumerate(shares)]
def generate_key(self):
self.privkey = random.randrange(1, self.order - 1)
self.pubkey = (self.privkey * self.G)
def ecdsa_sign(self, message, privkey):
z = int(hashlib.sha256(message).hexdigest(), 16)
k = random.randrange(1, self.order - 1)
r = (k*self.G).x % self.order
s = (inverse(k, self.order) * (z + r*privkey)) % self.order
return (r, s)
def ecdsa_verify(self, message, r, s, pubkey):
r %= self.order
s %= self.order
if s == 0 or r == 0:
return False
z = int(hashlib.sha256(message).hexdigest(), 16)
s_inv = inverse(s, self.order)
u1 = (z*s_inv) % self.order
u2 = (r*s_inv) % self.order
W = u1*self.G + u2*pubkey
return W.x == r
def aes_encrypt(self, plaintext):
cipher = AES.new(self.key, AES.MODE_GCM, nonce=self.iv)
ciphertext, tag = cipher.encrypt_and_digest(plaintext)
return tag + ciphertext
def aes_decrypt(self, ciphertext):
tag, ct = ciphertext[:16], ciphertext[16:]
cipher = AES.new(self.key, AES.MODE_GCM, nonce=self.iv)
plaintext = cipher.decrypt_and_verify(ct, tag)
return plaintext
def get_flag(self):
key = hashlib.sha256(long_to_bytes(self.privkey)).digest()[:16]
cipher = AES.new(key, AES.MODE_ECB)
encrypted_flag = cipher.encrypt(pad(FLAG.encode(), 16))
return encrypted_flag
if __name__ == '__main__':
print("Welcome to Fl0pper Zer1 – Secure Signing Service!\n")
S = SecureSignService()
signkey = S.shares_encrypt(S.split_privkey(S.privkey))
print(f"Here are your encrypted signing key shares, use them to sign a message : {json.dumps({'pubkey': {'x': hex(S.pubkey.x), 'y': hex(S.pubkey.y)}, 'signkey': signkey})}")
while True:
print("\nOptions:\n \
1) sign <message> <signkey> : Sign a message\n \
2) verify <message> <signature> <pubkey> : Verify the signed message\n \
3) generate_key : Generate new signing key shares\n \
4) get_flag : Get the flag\n \
5) quit : Quit\n")
try:
inp = json.loads(input('> '))
if 'option' not in inp:
print(json.dumps({'error': 'You must send an option'}))
elif inp['option'] == 'sign':
msg = bytes.fromhex(inp['msg'])
signkey = inp['signkey']
sk = S.reconstruct_privkey(S.shares_decrypt(signkey))
r, s = S.ecdsa_sign(msg, sk)
print(json.dumps({'r': hex(r), 's': hex(s)}))
elif inp['option'] == 'verify':
msg = bytes.fromhex(inp['msg'])
r = int(inp['r'], 16)
s = int(inp['s'], 16)
px = int(inp['px'], 16)
py = int(inp['py'], 16)
pub = Point(px, py, curve=EC)
verified = S.ecdsa_verify(msg, r, s, pub)
if verified:
print(json.dumps({'result': 'Success'}))
else:
print(json.dumps({'result': 'Invalid signature'}))
elif inp['option'] == 'generate_key':
S.generate_key()
signkey = S.shares_encrypt(S.split_privkey(S.privkey))
print("Here are your *NEW* encrypted signing key shares :")
print(json.dumps({'pubkey': {'x': hex(S.pubkey.x), 'y': hex(S.pubkey.y)}, 'signkey': signkey}))
elif inp['option'] == 'get_flag':
encrypted_flag = S.get_flag()
print(json.dumps({'flag': encrypted_flag.hex()}))
elif inp['option'] == 'quit':
print("Adios :)")
break
else:
print(json.dumps({'error': 'Invalid option'}))
except Exception as e:
print(json.dumps({'error': 'Oops! Something went wrong'}))
break
self.privkey = random.randrange(1, self.order - 1)
...
P.<x> = PolynomialRing(GF(self.order))
poly = sum(c*x^i for i, c in enumerate(coeffs))
for x in range(1, 5):
shares.append((x, y))
$x = 1..4$에서의 평가값을 사용한 표준 Shamir 공유.
def aes_encrypt(self, plaintext):
cipher = AES.new(self.key, AES.MODE_GCM, nonce=self.iv)
ciphertext, tag = cipher.encrypt_and_digest(plaintext)
return tag + ciphertext
AES-GCM nonce self.iv는 서비스 인스턴스 당 한 번만 생성. 초기 공유 4개뿐 아니라 generate_key 이후 공유도 모두 동일한 (key, nonce) 조합으로 암호화되며, Zer0 문제랑 같은 맥락
msg = bytes.fromhex(inp['msg'])
signkey = inp['signkey']
sk = S.reconstruct_privkey(S.shares_decrypt(signkey))
r, s = S.ecdsa_sign(msg, sk)
shares_decrypt 역시 고정된 (key, nonce)로 입력된 모든 공유를 복호화. 암호문을 그대로 신뢰하며 원래의 Shamir 공유인지 검증 절차가 없음
Analysis
1 GHASH 서브키 복구
고정 nonce 하의 AES-GCM에서는 모든 메시지가 같은 $GHASH$ 키 $H = \mathrm{AES}_K(0^{128})$를 사용.
태그 $T$는 $T^{(k)} = S_0 \oplus \mathrm{GHASH}_H(C^{(k)})$ where $S_0 = \mathrm{AES}_K(\mathrm{nonce} || 1)$
여기서 각 암호문은 128비트 블록 두 개로 구성됩니다(Shamir 공유 길이 때문).
두 암호문 $(C^{(i)}, T^{(i)})$, $(C^{(j)}, T^{(j)})$에 대해 다음과 같이 정의
$\Delta C_1 = C^{(i)}_1 \oplus C^{(j)}_1, \quad \Delta C_2 = C^{(i)}_2 \oplus C^{(j)}_2, \quad \Delta T = T^{(i)} \oplus T^{(j)}.$ $GHASH$ 재귀식을 전개하면 $\Delta T = H^3 \cdot \Delta C_1 \oplus H^2 \cdot \Delta C_2.$ 이를 $GF(2^{128})$에서 다루고 $\Delta C_1$로 나누면 $H$에 대한 3차 방정식을 얻는다.
$H^3 \oplus (\Delta C_2 / \Delta C_1) H^2 \oplus (\Delta T / \Delta C_1) = 0. $ 에서 유일한 근 $H$를 추출한다.
2. Forging a Zero Share
$H$를 얻었다면 각 공유에 대해 $S_0 = T^{(k)} \oplus \mathrm{GHASH}_H(C^{(k)}).$ $S_0$는 모든 공유에 대해 동일하다.
따라서 같은 $(key, nonce)$로 0을 암호화하면 $tag_0 = S_0 \oplus \mathrm{GHASH}_H(0^{256}) = S_0.$
즉 $S_0$ 자체가 0 공유의 암호문이 된다.
이를 네 번 보내면 복호 결과가 모두 0이 되고, 라그랑주 보간으로 복원된 개인키는 $sk = 0$이 된다.
3. Extracting Nonces and RNG State
$sk = 0$일 때 ECDSA 서명식은 $ s \equiv k^{-1} z \pmod{n} \Rightarrow k \equiv z s^{-1} \pmod{n}. $ 여기서 $z = \mathrm{SHA256}(m)$, $n = ECq$
파이썬 random.randrange는 내부적으로 MT19937에서 32비트씩 뽑아 붙이는 구조이므로, $k$에서 32비트 워드 8개를 추출해 총 624개를 모으면 MT 내부 상태를 완전히 복원할 수 있다.
Exploit
from sage.all import GF, PolynomialRing
import socket
import json
import re
import binascii
import hashlib
from Crypto.Cipher import AES
from Crypto.Util.number import long_to_bytes
ORDER = 0xFFFFFFFF00000000FFFFFFFFFFFFFFFFBCE6FAADA7179E84F3B9CAC2FC632551
poly = 0xE1000000000000000000000000000000
def gf_mul(x, y):
z = 0
for i in range(128):
if (y >> (127 - i)) & 1:
z ^= x
if x & 1:
x = (x >> 1) ^ poly
else:
x >>= 1
return z
def ghash(H, ciphertext):
blocks = [int.from_bytes(ciphertext[i:i+16], 'big') for i in range(0, len(ciphertext), 16)]
y = 0
for block in blocks:
y = gf_mul(y ^ block, H)
len_block = len(ciphertext) * 8
y = gf_mul(y ^ len_block, H)
return y
PR = PolynomialRing(GF(2), 'x')
x = PR.gen()
mod_poly = x**128 + x**7 + x**2 + x + 1
F = GF(2**128, modulus=mod_poly, names=('z',), repr='int')
Y = PolynomialRing(F, 'Y').gen()
def int_to_field(val):
bits = format(val, '0128b')[::-1]
rev = int(bits, 2)
return F.from_integer(rev)
def field_to_int(elem):
rev_bits = format(int(elem._integer_representation()), '0128b')[::-1]
return int(rev_bits, 2)
def solve_for_h(shares):
n = len(shares)
for i in range(n):
for j in range(i+1, n):
tag1, ct1 = shares[i]
tag2, ct2 = shares[j]
blocks1 = [int.from_bytes(ct1[k:k+16], 'big') for k in range(0, len(ct1), 16)]
blocks2 = [int.from_bytes(ct2[k:k+16], 'big') for k in range(0, len(ct2), 16)]
if len(blocks1) != 2 or len(blocks2) != 2:
continue
delta_c1 = blocks1[0] ^ blocks2[0]
delta_c2 = blocks1[1] ^ blocks2[1]
if delta_c1 == 0:
continue
delta_t = int.from_bytes(tag1, 'big') ^ int.from_bytes(tag2, 'big')
A = int_to_field(delta_c1)
B = int_to_field(delta_c2)
D = int_to_field(delta_t)
poly_eq = Y**3 + (A.inverse() * B) * Y**2 + (A.inverse() * D)
roots = poly_eq.roots()
if roots:
# assume unique root
return field_to_int(roots[0][0])
raise ValueError('Failed to recover H')
def compute_s0(H, share):
tag, ct = share
g = ghash(H, ct)
return int.from_bytes(tag, 'big') ^ g
def recv_until(sock, marker):
data = b''
while marker not in data:
chunk = sock.recv(4096)
if not chunk:
raise ConnectionError('Connection closed')
data += chunk
return data
def parse_json_from_text(text):
for line in reversed(text.splitlines()):
if '{' in line and '}' in line:
candidate = line[line.find('{'):line.rfind('}')+1]
try:
return json.loads(candidate)
except json.JSONDecodeError:
continue
raise ValueError('JSON not found')
def un_bitshift_right_xor(y, shift):
x = 0
for i in range(32):
idx = 31 - i
bit = (y >> idx) & 1
if idx + shift <= 31:
bit ^= (x >> (idx + shift)) & 1
x |= bit << idx
return x
def un_bitshift_left_xor_and(y, shift, mask):
x = 0
for idx in range(32):
bit = (y >> idx) & 1
if idx - shift >= 0 and (mask >> idx) & 1:
bit ^= (x >> (idx - shift)) & 1
x |= bit << idx
return x
def untemper(y):
y = un_bitshift_right_xor(y, 18)
y = un_bitshift_left_xor_and(y, 15, 0xEFC60000)
y = un_bitshift_left_xor_and(y, 7, 0x9D2C5680)
y = un_bitshift_right_xor(y, 11)
return y
def randbelow_clone(rng, n):
k = n.bit_length()
while True:
r = rng.getrandbits(k)
if r < n:
return r
def main():
sock = socket.create_connection(('flopper.p2.securinets.tn', 6002))
welcome = recv_until(sock, b'> ')
initial = parse_json_from_text(welcome.decode())
signkey_hex = initial['signkey']
shares = []
for hexstr in signkey_hex:
data = binascii.unhexlify(hexstr)
tag = data[:16]
ct = data[16:]
shares.append((tag, ct))
H = solve_for_h(shares)
S0 = compute_s0(H, shares[0])
# sanity check
for tag, ct in shares:
g = ghash(H, ct)
if int.from_bytes(tag, 'big') != (S0 ^ g):
raise ValueError('H verification failed')
zero_share_hex = f'{S0:032x}'
k_values = []
messages = []
for i in range(80):
msg = i.to_bytes(1, 'big')
messages.append(msg)
payload = {
'option': 'sign',
'msg': msg.hex(),
'signkey': [zero_share_hex]*4
}
sock.sendall((json.dumps(payload) + '\n').encode())
response = recv_until(sock, b'> ')
res = parse_json_from_text(response.decode())
if 'r' not in res:
print('Unexpected response:', res)
raise SystemExit
r = int(res['r'], 16)
s = int(res['s'], 16)
z = int(hashlib.sha256(msg).hexdigest(), 16)
k = (z * pow(s, -1, ORDER)) % ORDER
r_value = (k - 1) % ORDER
k_values.append(r_value)
words = []
for k in k_values:
temp = k
for _ in range(8):
words.append(temp & 0xFFFFFFFF)
temp >>= 32
words = words[:624] + words[624:]
state_values = [untemper(w) for w in words[:624]]
import random
clone = random.Random()
clone.setstate((3, tuple(state_values + [624]), None))
extra_words = words[624:]
for idx, w in enumerate(extra_words):
produced = clone.getrandbits(32)
if produced != w:
raise RuntimeError(
f"MT validation failed at position {idx}: got {produced:#010x}, expected {w:#010x}"
)
sock.sendall((json.dumps({'option': 'generate_key'}) + '\n').encode())
priv_offset = randbelow_clone(clone, ORDER - 2) + 1
randbelow_clone(clone, ORDER - 1)
randbelow_clone(clone, ORDER - 1)
randbelow_clone(clone, ORDER - 1)
response = recv_until(sock, b'> ')
sock.sendall((json.dumps({'option': 'get_flag'}) + '\n').encode())
flag_resp = recv_until(sock, b'> ')
flag_json = parse_json_from_text(flag_resp.decode())
flag_ct = bytes.fromhex(flag_json['flag'])
key = hashlib.sha256(long_to_bytes(priv_offset)).digest()[:16]
cipher = AES.new(key, AES.MODE_ECB)
flag = cipher.decrypt(flag_ct)
try:
from Crypto.Util.Padding import unpad
flag = unpad(flag, 16)
except ValueError:
flag = flag.rstrip(b'\x00')
print(flag.decode())
sock.sendall((json.dumps({'option': 'quit'}) + '\n').encode())
sock.close()
if __name__ == '__main__':
main()
Securients{cda8a3f400fdb1a73446cbc7494fbf3420bff9b9d41d4dea97088477cb3285d2}
'CTF' 카테고리의 다른 글
| [Forensic] SunshineCTF 2025 Intergalactic Copyright Infringement write-up (1) | 2025.09.30 |
|---|---|
| [Crypto] snakeCTF 2025 Qual free-start write-up (1) | 2025.08.31 |
| [PWN] scriptCTF 2025 Vault 3 write-up (0) | 2025.08.21 |
| [Rev] scriptCTF 2025 - plastic-shield 1, 2 write-up (3) | 2025.08.21 |
| [Crypto] scriptCTF 2025 - Secure-Server-2 write-up (1) | 2025.08.21 |