client.py
더보기
더보기
더보기
더보기
import ast
import hashlib
import json
import queue
import secrets
import socket
import threading
import time
from ecdsa.curves import NIST256p
from ecdsa.ellipticcurve import Point, PointJacobi
curve = NIST256p
gen = curve.generator
p = gen.order()
def point2bytes(P):
return P.to_bytes()
def hash_func(Rp, m):
if isinstance(m, str):
m = m.encode()
return (
int.from_bytes(hashlib.sha256(point2bytes(Rp) + m).digest(), byteorder="big")
% p
)
class SocketReader(threading.Thread):
def __init__(self, sock):
super().__init__(daemon=True)
self.sock = sock
self.response_queue = queue.Queue()
self.running = True
def run(self):
while self.running:
try:
self.sock.setblocking(0)
try:
data = self.sock.recv(65536)
if not data:
time.sleep(0.1)
continue
except socket.error:
time.sleep(0.1)
continue
message = data.decode()
print(f"[*] Received from server: {message}")
try:
response = json.loads(message)
self.response_queue.put(response)
except json.JSONDecodeError:
pass
except Exception as e:
print(f"[!] Socket reader error: {e}")
if not self.running:
break
time.sleep(0.1)
def stop(self):
self.running = False
def get_response(self, timeout=10):
try:
return self.response_queue.get(timeout=timeout)
except queue.Empty:
return None
class BlindClient:
def __init__(self, host="localhost", port=1337):
self.host = host
self.port = port
self.sock = None
self.Q = None # Server's public key
self.reader = None
def connect(self):
try:
self.sock = socket.socket()
self.sock.connect((self.host, self.port))
print(f"[+] Connected to server at {self.host}:{self.port}")
self.reader = SocketReader(self.sock)
self.reader.start()
self.sock.sendall(json.dumps({"cmd": "GETKEY"}).encode())
print("[*] Sent: {'cmd': 'GETKEY'}")
response = self.reader.get_response()
if not response or "Q" not in response:
print(
f"[-] Unexpected or no response from server when getting key: {response}"
)
return False
try:
self.Q = PointJacobi.from_affine(
Point(curve.curve, response["Q"][0], response["Q"][1])
)
print("[+] Received server's public key")
return True
except Exception as e:
print(f"[-] Error processing server's public key: {e}")
return False
except Exception as e:
print(f"[-] Connection error: {e}")
return False
def reset_server(self):
if not self.sock or not self.reader:
print("[-] Not connected to server. Use 'connect' command first.")
return False
try:
print("[*] Resetting server state...")
self.sock.sendall(json.dumps({"cmd": "RESET"}).encode())
print("[*] Sent: {'cmd': 'RESET'}")
response = self.reader.get_response()
if not response or "status" not in response:
print(
f"[-] Unexpected or no response from server when resetting: {response}"
)
return False
if response["status"] == "ok":
print("[+] Server reset successful")
self.sock.sendall(json.dumps({"cmd": "GETKEY"}).encode())
print("[*] Sent: {'cmd': 'GETKEY'}")
response = self.reader.get_response()
if not response or "Q" not in response:
print(
f"[-] Unexpected or no response from server when getting new key: {response}"
)
return False
try:
self.Q = PointJacobi.from_affine(
Point(curve.curve, response["Q"][0], response["Q"][1])
)
print("[+] Received server's new public key")
return True
except Exception as e:
print(f"[-] Error processing server's new public key: {e}")
return False
else:
print(f"[-] Server reset failed: {response}")
return False
except Exception as e:
print(f"[-] Error during server reset: {e}")
return False
def sign_message(self, message):
if not self.sock or not self.Q or not self.reader:
print("[-] Not connected to server. Use 'connect' command first.")
return None
try:
print("[*] Starting signing session...")
self.sock.sendall(json.dumps({"cmd": "REQUEST"}).encode())
print("[*] Sent: {'cmd': 'REQUEST'}")
reply = self.reader.get_response()
if not reply or "R" not in reply:
print(
f"[-] Unexpected or no response from server when requesting session: {reply}"
)
return None
try:
R = PointJacobi.from_affine(
Point(curve.curve, reply["R"][0], reply["R"][1])
)
except Exception as e:
print(f"[-] Error processing server's R point: {e}")
return None
alpha = secrets.randbelow(p)
while alpha == 0:
alpha = secrets.randbelow(p)
beta = secrets.randbelow(p)
while beta == 0:
beta = secrets.randbelow(p)
Rblind = R + gen * alpha + self.Q * beta
c_prime = hash_func(Rblind, message)
c = (c_prime + beta) % p
print("[*] Sending challenge to server...")
challenge_data = {"cmd": "CHALLENGE", "c": c}
self.sock.sendall(json.dumps(challenge_data).encode())
print(f"[*] Sent: {challenge_data}")
response = self.reader.get_response()
if not response or "s" not in response:
print(
f"[-] Unexpected or no response from server when sending challenge: {response}"
)
return None
s = response["s"]
s_prime = (s + alpha) % p
R = Rblind.to_affine()
signature = ([R.x(), R.y()], s_prime)
print("[+] Signature created successfully")
return signature
except Exception as e:
print(f"[-] Error during signing: {e}")
return None
def verify_signature(self, message, signature):
if not self.sock or not self.reader:
print("[-] Not connected to server. Use 'connect' command first.")
return False
try:
print("[*] Sending verification request to server...")
verify_data = {"cmd": "VERIFY", "msg": message, "sig": signature}
self.sock.sendall(json.dumps(verify_data).encode())
print(f"[*] Sent: {verify_data}")
verify = self.reader.get_response()
if not verify or "status" not in verify:
print(
f"[-] Unexpected or no response from server when verifying: {verify}"
)
return False
if verify["status"] == "ok":
print("[+] Signature is valid!")
print(
f"[*] Server stats - Signatures: {verify.get('sign_cnt', 'N/A')}, Verifications: {verify.get('verify_cnt', 'N/A')}"
)
return True
else:
print("[-] Signature is invalid.")
if "detail" in verify:
print(f"[*] Details: {verify['detail']}")
return False
except Exception as e:
print(f"[-] Error during verification: {e}")
return False
def close(self):
if self.reader:
self.reader.stop()
if self.sock:
self.sock.close()
self.sock = None
print("[+] Connection closed")
def parse_signature(signature_str):
try:
sig_data = ast.literal_eval(signature_str)
if (isinstance(sig_data, tuple) or isinstance(sig_data, list)) and len(
sig_data
) == 2:
R_prime, s_prime = sig_data
if (isinstance(R_prime, tuple) or isinstance(R_prime, list)) and len(
R_prime
) == 2:
if all(isinstance(coord, int) for coord in R_prime):
if isinstance(s_prime, int):
return [R_prime, s_prime]
raise ValueError("Invalid signature format")
except Exception as e:
raise ValueError(f"Failed to parse signature: {e}")
def print_help():
print("\nBlind Signature Client - Available Commands:")
print(" connect - Connect to the server")
print(" reset - Reset the server state")
print(" sign <message> - Sign a message")
print(" verify - Verify your message and signature")
print(" help - Show this help message")
print(" exit - Exit the program")
print()
def main():
client = BlindClient()
print("=== Blind Signature Client ===")
print("Type 'help' for available commands")
while True:
try:
cmd_line = input("\n> ").strip()
if not cmd_line:
continue
parts = cmd_line.split(maxsplit=1)
cmd = parts[0].lower()
if cmd == "exit":
client.close()
print("[+] Exiting...")
break
elif cmd == "help":
print_help()
elif cmd == "connect":
client.connect()
elif cmd == "reset":
client.reset_server()
elif cmd == "sign":
if len(parts) < 2:
message = input("Enter message to sign: ")
else:
message = parts[1]
print(f"[*] Signing message: '{message}'")
signature = client.sign_message(message)
if signature:
print("[+] Signature:")
print(f" R': {signature[0]}")
print(f" s': {signature[1]}")
elif cmd == "verify":
message = input("Enter message to verify: ")
print("Enter signature in format ([x, y], s):")
print("Example: ([12345, 67890], 54321)")
signature_str = input("Signature: ")
try:
signature = parse_signature(signature_str)
print(f"[*] Verifying custom signature for message: '{message}'")
client.verify_signature(message, signature)
except ValueError as e:
print(f"[-] {e}")
else:
print(f"[-] Unknown command: {cmd}")
print(" Type 'help' for available commands")
except KeyboardInterrupt:
print("\n[*] Interrupted")
client.close()
break
except Exception as e:
print(f"[-] Error: {e}")
print("[*] Continuing...")
if __name__ == "__main__":
main()
server.py
더보기
더보기
더보기
더보기
import hashlib
import json
import os
import secrets
import socket
import threading
from typing import Any
from ecdsa.curves import NIST256p
from ecdsa.ellipticcurve import Point, PointJacobi
from pydantic import BaseModel, ValidationError
FLAG = os.getenv("FLAG")
curve = NIST256p
gen = curve.generator
p = gen.order()
def KeyGen():
d = secrets.randbelow(p)
while d == 0:
d = secrets.randbelow(p)
Q = gen * d
return d, Q
def point2bytes(P):
return P.to_bytes()
def hash_func(Rp, m):
if isinstance(m, str):
m = m.encode()
return (
int.from_bytes(hashlib.sha256(point2bytes(Rp) + m).digest(), byteorder="big")
% p
)
def Verify(Q, m, sig):
R_prime, s_prime = sig
c_prime = hash_func(R_prime, m)
return gen * s_prime == R_prime + Q * c_prime
class ServerState:
def __init__(self):
self.d, self.Q = KeyGen()
self.counter_sign = 0
self.verified_messages = set()
self.pending_sessions = {} # conn -> (k, R)
self.mutex = threading.Lock()
def reset(self):
with self.mutex:
self.d, self.Q = KeyGen()
self.counter_sign = 0
self.verified_messages.clear()
def new_session(self, addr):
k = secrets.randbelow(p)
while k == 0:
k = secrets.randbelow(p)
R = gen * k
with self.mutex:
self.pending_sessions[addr] = (k, R)
def process_challenge(self, addr, c):
with self.mutex:
if addr not in self.pending_sessions:
return None
k, R = self.pending_sessions[addr]
s = (k + c * self.d) % p
with self.mutex:
self.counter_sign += 1
return s
def verify_sig(self, msg, sig):
try:
sig = [
PointJacobi.from_affine(Point(curve.curve, sig[0][0], sig[0][1])),
sig[1],
]
res = Verify(self.Q, msg, sig)
if res:
with self.mutex:
self.verified_messages.add(msg)
return res
except Exception:
return 0
def convert_point_to_dict(point):
if isinstance(point, PointJacobi):
point = point.to_affine()
return [point.x(), point.y()]
def process_json_value(value):
if isinstance(value, PointJacobi):
return convert_point_to_dict(value)
elif isinstance(value, dict):
return {k: process_json_value(v) for k, v in value.items()}
elif isinstance(value, (list, tuple)):
return [process_json_value(item) for item in value]
return value
def send_socket_message(conn, msg: dict[Any, Any]):
processed_data = process_json_value(msg)
conn.sendall(json.dumps(processed_data).encode() + b"\n")
class ChallengeRequest(BaseModel):
c: int
class VerifyRequest(BaseModel):
sig: tuple[tuple[int, int], int]
msg: str
srv = ServerState()
def handle_client(conn, addr):
srv.new_session(conn)
try:
while True:
req = conn.recv(65536)
if not req:
break
try:
data = json.loads(req)
except Exception:
send_socket_message(conn, {"status": "err", "detail": "bad json"})
continue
cmd = data.get("cmd")
try:
if cmd == "RESET":
srv.reset()
send_socket_message(conn, {"status": "ok", "msg": "reset done"})
elif cmd == "GETKEY":
send_socket_message(conn, {"Q": srv.Q})
elif cmd == "REQUEST":
_, R = srv.pending_sessions[conn]
send_socket_message(conn, {"R": R, "Q": srv.Q})
elif cmd == "CHALLENGE":
req = ChallengeRequest.model_validate(data)
s = srv.process_challenge(conn, req.c)
send_socket_message(conn, {"s": s})
elif cmd == "VERIFY":
req = VerifyRequest.model_validate(data)
result = srv.verify_sig(req.msg, req.sig)
send_socket_message(
conn,
{
"status": "ok" if result else "bad",
"sign_cnt": srv.counter_sign,
"verify_cnt": len(srv.verified_messages),
},
)
if len(srv.verified_messages) > srv.counter_sign:
send_socket_message(
conn,
{
"msg": f"Wow, you can verify unsigned messages, here is your prize: {FLAG}",
},
)
else:
send_socket_message(
conn, {"status": "err", "detail": "unknown cmd"}
)
except ValidationError as e:
send_socket_message(conn, {"status": "err", "detail": e.errors()})
finally:
try:
srv.pending_sessions.pop(conn)
conn.close()
except Exception:
pass
def start_server(port=1337):
srv.reset()
s = socket.socket()
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("0.0.0.0", port))
s.listen(100)
print(f"Server started on {port}")
while True:
try:
conn, addr = s.accept()
threading.Thread(
target=handle_client, args=(conn, addr), daemon=True
).start()
except KeyboardInterrupt:
print("Shutting down")
break
if __name__ == "__main__":
start_server(1337)
Analysis
서버에서는 Schnorr-like blind signature 프로토콜을 제공
conn에 대해 k가 고정되어 있어서 REQUEST 후 여러번 CHALLENGE를 받는다.
s_1 = k + c_1 * d
s_2 = k + c_2 * d
두번의 서명을 빼면
d = (s_1 - s_2) * (c_1 - c_2)^-1 (mod p)
동일한 k 덕에 두 패킷으로 개인 키가 노출된다.
서버는 sign_cnt - verify_cnt를 비교하여
verify_cnt > sign_cnt일 때 플래그를 전송한다.
1. GETKEY로 공개키 Q 조회
2. REQUEST로 고정 R 확보
3. 서로 다른 c1, c2로 두 번 CHALLENGE하고 s1, s2 획득
4. 개인 키 계산
5. 메시지에 여러개 위조 서명
6. VERIFY로 제출
* 동일 nonce 사용은 재사용 금지해야한다.
Exploitation
from pwn import *
import json, secrets, hashlib
from ecdsa.curves import NIST256p
from ecdsa.ellipticcurve import Point, PointJacobi
curve = NIST256p
G = curve.generator
p = G.order()
def point2bytes(P):
return P.to_bytes()
def H(R, m):
if isinstance(m, str):
m = m.encode()
return int.from_bytes(
hashlib.sha256(point2bytes(R) + m).digest(), "big"
) % p
def recv_json(r):
return json.loads(r.recvline().decode())
io = remote("tcp.sasc.tf", 11334)
io.sendline(json.dumps({"cmd": "GETKEY"}).encode())
Qx, Qy = recv_json(io)["Q"]
Q = PointJacobi.from_affine(Point(curve.curve, Qx, Qy))
io.sendline(json.dumps({"cmd": "REQUEST"}).encode())
Rx, Ry = recv_json(io)["R"]
R = PointJacobi.from_affine(Point(curve.curve, Rx, Ry))
c1 = secrets.randbelow(p - 1) + 1
io.sendline(json.dumps({"cmd": "CHALLENGE", "c": c1}).encode())
s1 = recv_json(io)["s"]
c2 = secrets.randbelow(p - 1) + 1
while c2 == c1:
c2 = secrets.randbelow(p - 1) + 1
io.sendline(json.dumps({"cmd": "CHALLENGE", "c": c2}).encode())
s2 = recv_json(io)["s"]
d = ((s1 - s2) * pow((c1 - c2) % p, -1, p)) % p
log.success(f"recovered d = {hex(d)}")
for i in range(3):
msg = f"FORGED-{i}"
k = secrets.randbelow(p - 1) + 1
Rf = (G * k).to_affine()
c = H(Rf, msg)
s = (k + c * d) % p
sig = [[Rf.x(), Rf.y()], s]
io.sendline(json.dumps({"cmd": "VERIFY", "msg": msg, "sig": sig}).encode())
res = recv_json(io)
log.info(res)
if "msg" in res and "prize" in res["msg"]:
print(res["msg"])
break
io.interactive()
두번의 CHALLENGE로 개인키가 노출되어 서명을 계속 위조할 수 있다.
pwned@CT100:~/CTFs-2025$ /usr/bin/env python3 "/home/pwned/CTFs-2025/sasctf/crpyt/blindspot/ex.py"
[+] Opening connection to tcp.sasc.tf on port 11334: Done
[+] recovered d = 0xc3aae1aea6900f15090425648662a531c8f729c8f7feb6a0b7a295f6905630e9
[*] {'status': 'ok', 'sign_cnt': 2, 'verify_cnt': 1}
[*] {'status': 'ok', 'sign_cnt': 2, 'verify_cnt': 2}
[*] {'status': 'ok', 'sign_cnt': 2, 'verify_cnt': 3}
[*] Switching to interactive mode
{"msg": "Wow, you can verify unsigned messages, here is your prize: SAS{r05_4t7ack_s3e5_7hr0u6h_7h3_bl1nd5p0t}"}
$
[*] Interrupted
[*] Closed connection to tcp.sasc.tf port 11334
$
'CTF' 카테고리의 다른 글
[Rev] CCE 2025 Qual - Directcalc write-up (4) | 2025.08.16 |
---|---|
[Forensic] CCE 2025 Qual - something from a friend write-up (2) | 2025.08.16 |
[Crypto] Linear congruential generator(LCG) Recovery (3) | 2025.08.09 |
[Crypto] SASCTF 2025 - bigbabycode write-up (0) | 2025.05.26 |
[Network Forensic] MISC 풀어보기 (0) | 2024.05.20 |