[Crypto] SASCTF 2025 - blindspot write-up

client.py

더보기
더보기
더보기
더보기
import ast
import hashlib
import json
import queue
import secrets
import socket
import threading
import time

from ecdsa.curves import NIST256p
from ecdsa.ellipticcurve import Point, PointJacobi

curve = NIST256p
gen = curve.generator
p = gen.order()


def point2bytes(P):
    return P.to_bytes()


def hash_func(Rp, m):
    if isinstance(m, str):
        m = m.encode()
    return (
        int.from_bytes(hashlib.sha256(point2bytes(Rp) + m).digest(), byteorder="big")
        % p
    )


class SocketReader(threading.Thread):
    def __init__(self, sock):
        super().__init__(daemon=True)
        self.sock = sock
        self.response_queue = queue.Queue()
        self.running = True

    def run(self):
        while self.running:
            try:
                self.sock.setblocking(0)
                try:
                    data = self.sock.recv(65536)
                    if not data:
                        time.sleep(0.1)
                        continue
                except socket.error:
                    time.sleep(0.1)
                    continue

                message = data.decode()
                print(f"[*] Received from server: {message}")

                try:
                    response = json.loads(message)
                    self.response_queue.put(response)
                except json.JSONDecodeError:
                    pass

            except Exception as e:
                print(f"[!] Socket reader error: {e}")
                if not self.running:
                    break
                time.sleep(0.1)

    def stop(self):
        self.running = False

    def get_response(self, timeout=10):
        try:
            return self.response_queue.get(timeout=timeout)
        except queue.Empty:
            return None


class BlindClient:
    def __init__(self, host="localhost", port=1337):
        self.host = host
        self.port = port
        self.sock = None
        self.Q = None  # Server's public key
        self.reader = None

    def connect(self):
        try:
            self.sock = socket.socket()
            self.sock.connect((self.host, self.port))
            print(f"[+] Connected to server at {self.host}:{self.port}")

            self.reader = SocketReader(self.sock)
            self.reader.start()

            self.sock.sendall(json.dumps({"cmd": "GETKEY"}).encode())
            print("[*] Sent: {'cmd': 'GETKEY'}")

            response = self.reader.get_response()
            if not response or "Q" not in response:
                print(
                    f"[-] Unexpected or no response from server when getting key: {response}"
                )
                return False

            try:
                self.Q = PointJacobi.from_affine(
                    Point(curve.curve, response["Q"][0], response["Q"][1])
                )
                print("[+] Received server's public key")
                return True
            except Exception as e:
                print(f"[-] Error processing server's public key: {e}")
                return False
        except Exception as e:
            print(f"[-] Connection error: {e}")
            return False

    def reset_server(self):
        if not self.sock or not self.reader:
            print("[-] Not connected to server. Use 'connect' command first.")
            return False

        try:
            print("[*] Resetting server state...")
            self.sock.sendall(json.dumps({"cmd": "RESET"}).encode())
            print("[*] Sent: {'cmd': 'RESET'}")

            response = self.reader.get_response()
            if not response or "status" not in response:
                print(
                    f"[-] Unexpected or no response from server when resetting: {response}"
                )
                return False

            if response["status"] == "ok":
                print("[+] Server reset successful")
                self.sock.sendall(json.dumps({"cmd": "GETKEY"}).encode())
                print("[*] Sent: {'cmd': 'GETKEY'}")

                response = self.reader.get_response()
                if not response or "Q" not in response:
                    print(
                        f"[-] Unexpected or no response from server when getting new key: {response}"
                    )
                    return False

                try:
                    self.Q = PointJacobi.from_affine(
                        Point(curve.curve, response["Q"][0], response["Q"][1])
                    )
                    print("[+] Received server's new public key")
                    return True
                except Exception as e:
                    print(f"[-] Error processing server's new public key: {e}")
                    return False
            else:
                print(f"[-] Server reset failed: {response}")
                return False
        except Exception as e:
            print(f"[-] Error during server reset: {e}")
            return False

    def sign_message(self, message):
        if not self.sock or not self.Q or not self.reader:
            print("[-] Not connected to server. Use 'connect' command first.")
            return None

        try:
            print("[*] Starting signing session...")
            self.sock.sendall(json.dumps({"cmd": "REQUEST"}).encode())
            print("[*] Sent: {'cmd': 'REQUEST'}")

            reply = self.reader.get_response()
            if not reply or "R" not in reply:
                print(
                    f"[-] Unexpected or no response from server when requesting session: {reply}"
                )
                return None

            try:
                R = PointJacobi.from_affine(
                    Point(curve.curve, reply["R"][0], reply["R"][1])
                )
            except Exception as e:
                print(f"[-] Error processing server's R point: {e}")
                return None

            alpha = secrets.randbelow(p)
            while alpha == 0:
                alpha = secrets.randbelow(p)
            beta = secrets.randbelow(p)
            while beta == 0:
                beta = secrets.randbelow(p)

            Rblind = R + gen * alpha + self.Q * beta
            c_prime = hash_func(Rblind, message)
            c = (c_prime + beta) % p

            print("[*] Sending challenge to server...")
            challenge_data = {"cmd": "CHALLENGE", "c": c}
            self.sock.sendall(json.dumps(challenge_data).encode())
            print(f"[*] Sent: {challenge_data}")

            response = self.reader.get_response()
            if not response or "s" not in response:
                print(
                    f"[-] Unexpected or no response from server when sending challenge: {response}"
                )
                return None

            s = response["s"]

            s_prime = (s + alpha) % p
            R = Rblind.to_affine()
            signature = ([R.x(), R.y()], s_prime)

            print("[+] Signature created successfully")
            return signature
        except Exception as e:
            print(f"[-] Error during signing: {e}")
            return None

    def verify_signature(self, message, signature):
        if not self.sock or not self.reader:
            print("[-] Not connected to server. Use 'connect' command first.")
            return False

        try:
            print("[*] Sending verification request to server...")
            verify_data = {"cmd": "VERIFY", "msg": message, "sig": signature}
            self.sock.sendall(json.dumps(verify_data).encode())
            print(f"[*] Sent: {verify_data}")

            verify = self.reader.get_response()
            if not verify or "status" not in verify:
                print(
                    f"[-] Unexpected or no response from server when verifying: {verify}"
                )
                return False

            if verify["status"] == "ok":
                print("[+] Signature is valid!")
                print(
                    f"[*] Server stats - Signatures: {verify.get('sign_cnt', 'N/A')}, Verifications: {verify.get('verify_cnt', 'N/A')}"
                )
                return True
            else:
                print("[-] Signature is invalid.")
                if "detail" in verify:
                    print(f"[*] Details: {verify['detail']}")
                return False
        except Exception as e:
            print(f"[-] Error during verification: {e}")
            return False

    def close(self):
        if self.reader:
            self.reader.stop()

        if self.sock:
            self.sock.close()
            self.sock = None
            print("[+] Connection closed")


def parse_signature(signature_str):
    try:
        sig_data = ast.literal_eval(signature_str)

        if (isinstance(sig_data, tuple) or isinstance(sig_data, list)) and len(
            sig_data
        ) == 2:
            R_prime, s_prime = sig_data

            if (isinstance(R_prime, tuple) or isinstance(R_prime, list)) and len(
                R_prime
            ) == 2:
                if all(isinstance(coord, int) for coord in R_prime):
                    if isinstance(s_prime, int):
                        return [R_prime, s_prime]

        raise ValueError("Invalid signature format")
    except Exception as e:
        raise ValueError(f"Failed to parse signature: {e}")


def print_help():
    print("\nBlind Signature Client - Available Commands:")
    print("  connect                     - Connect to the server")
    print("  reset                       - Reset the server state")
    print("  sign <message>              - Sign a message")
    print("  verify                      - Verify your message and signature")
    print("  help                        - Show this help message")
    print("  exit                        - Exit the program")
    print()


def main():
    client = BlindClient()

    print("=== Blind Signature Client ===")
    print("Type 'help' for available commands")

    while True:
        try:
            cmd_line = input("\n> ").strip()
            if not cmd_line:
                continue

            parts = cmd_line.split(maxsplit=1)
            cmd = parts[0].lower()

            if cmd == "exit":
                client.close()
                print("[+] Exiting...")
                break
            elif cmd == "help":
                print_help()
            elif cmd == "connect":
                client.connect()
            elif cmd == "reset":
                client.reset_server()
            elif cmd == "sign":
                if len(parts) < 2:
                    message = input("Enter message to sign: ")
                else:
                    message = parts[1]

                print(f"[*] Signing message: '{message}'")
                signature = client.sign_message(message)

                if signature:
                    print("[+] Signature:")
                    print(f"  R': {signature[0]}")
                    print(f"  s': {signature[1]}")
            elif cmd == "verify":
                message = input("Enter message to verify: ")

                print("Enter signature in format ([x, y], s):")
                print("Example: ([12345, 67890], 54321)")
                signature_str = input("Signature: ")

                try:
                    signature = parse_signature(signature_str)
                    print(f"[*] Verifying custom signature for message: '{message}'")
                    client.verify_signature(message, signature)
                except ValueError as e:
                    print(f"[-] {e}")
            else:
                print(f"[-] Unknown command: {cmd}")
                print("    Type 'help' for available commands")

        except KeyboardInterrupt:
            print("\n[*] Interrupted")
            client.close()
            break
        except Exception as e:
            print(f"[-] Error: {e}")
            print("[*] Continuing...")


if __name__ == "__main__":
    main()

 

server.py

더보기
더보기
더보기
더보기
import hashlib
import json
import os
import secrets
import socket
import threading
from typing import Any

from ecdsa.curves import NIST256p
from ecdsa.ellipticcurve import Point, PointJacobi
from pydantic import BaseModel, ValidationError

FLAG = os.getenv("FLAG")
curve = NIST256p
gen = curve.generator
p = gen.order()


def KeyGen():
    d = secrets.randbelow(p)
    while d == 0:
        d = secrets.randbelow(p)
    Q = gen * d
    return d, Q


def point2bytes(P):
    return P.to_bytes()


def hash_func(Rp, m):
    if isinstance(m, str):
        m = m.encode()
    return (
        int.from_bytes(hashlib.sha256(point2bytes(Rp) + m).digest(), byteorder="big")
        % p
    )


def Verify(Q, m, sig):
    R_prime, s_prime = sig
    c_prime = hash_func(R_prime, m)
    return gen * s_prime == R_prime + Q * c_prime


class ServerState:
    def __init__(self):
        self.d, self.Q = KeyGen()
        self.counter_sign = 0
        self.verified_messages = set()
        self.pending_sessions = {}  # conn -> (k, R)
        self.mutex = threading.Lock()

    def reset(self):
        with self.mutex:
            self.d, self.Q = KeyGen()
            self.counter_sign = 0
            self.verified_messages.clear()

    def new_session(self, addr):
        k = secrets.randbelow(p)
        while k == 0:
            k = secrets.randbelow(p)
        R = gen * k
        with self.mutex:
            self.pending_sessions[addr] = (k, R)

    def process_challenge(self, addr, c):
        with self.mutex:
            if addr not in self.pending_sessions:
                return None
            k, R = self.pending_sessions[addr]
        s = (k + c * self.d) % p
        with self.mutex:
            self.counter_sign += 1
        return s

    def verify_sig(self, msg, sig):
        try:
            sig = [
                PointJacobi.from_affine(Point(curve.curve, sig[0][0], sig[0][1])),
                sig[1],
            ]
            res = Verify(self.Q, msg, sig)
            if res:
                with self.mutex:
                    self.verified_messages.add(msg)
            return res
        except Exception:
            return 0


def convert_point_to_dict(point):
    if isinstance(point, PointJacobi):
        point = point.to_affine()
    return [point.x(), point.y()]


def process_json_value(value):
    if isinstance(value, PointJacobi):
        return convert_point_to_dict(value)
    elif isinstance(value, dict):
        return {k: process_json_value(v) for k, v in value.items()}
    elif isinstance(value, (list, tuple)):
        return [process_json_value(item) for item in value]
    return value


def send_socket_message(conn, msg: dict[Any, Any]):
    processed_data = process_json_value(msg)
    conn.sendall(json.dumps(processed_data).encode() + b"\n")


class ChallengeRequest(BaseModel):
    c: int


class VerifyRequest(BaseModel):
    sig: tuple[tuple[int, int], int]
    msg: str


srv = ServerState()


def handle_client(conn, addr):
    srv.new_session(conn)
    try:
        while True:
            req = conn.recv(65536)
            if not req:
                break
            try:
                data = json.loads(req)
            except Exception:
                send_socket_message(conn, {"status": "err", "detail": "bad json"})
                continue
            cmd = data.get("cmd")
            try:
                if cmd == "RESET":
                    srv.reset()
                    send_socket_message(conn, {"status": "ok", "msg": "reset done"})
                elif cmd == "GETKEY":
                    send_socket_message(conn, {"Q": srv.Q})
                elif cmd == "REQUEST":
                    _, R = srv.pending_sessions[conn]
                    send_socket_message(conn, {"R": R, "Q": srv.Q})
                elif cmd == "CHALLENGE":
                    req = ChallengeRequest.model_validate(data)
                    s = srv.process_challenge(conn, req.c)
                    send_socket_message(conn, {"s": s})
                elif cmd == "VERIFY":
                    req = VerifyRequest.model_validate(data)
                    result = srv.verify_sig(req.msg, req.sig)
                    send_socket_message(
                        conn,
                        {
                            "status": "ok" if result else "bad",
                            "sign_cnt": srv.counter_sign,
                            "verify_cnt": len(srv.verified_messages),
                        },
                    )

                    if len(srv.verified_messages) > srv.counter_sign:
                        send_socket_message(
                            conn,
                            {
                                "msg": f"Wow, you can verify unsigned messages, here is your prize: {FLAG}",
                            },
                        )
                else:
                    send_socket_message(
                        conn, {"status": "err", "detail": "unknown cmd"}
                    )

            except ValidationError as e:
                send_socket_message(conn, {"status": "err", "detail": e.errors()})
    finally:
        try:
            srv.pending_sessions.pop(conn)
            conn.close()
        except Exception:
            pass


def start_server(port=1337):
    srv.reset()
    s = socket.socket()
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.bind(("0.0.0.0", port))
    s.listen(100)
    print(f"Server started on {port}")
    while True:
        try:
            conn, addr = s.accept()
            threading.Thread(
                target=handle_client, args=(conn, addr), daemon=True
            ).start()
        except KeyboardInterrupt:
            print("Shutting down")
            break


if __name__ == "__main__":
    start_server(1337)

 

 


Analysis

서버에서는 Schnorr-like blind signature 프로토콜을 제공

 

conn에 대해 k가 고정되어 있어서 REQUEST 후 여러번 CHALLENGE를 받는다.

s_1 = k + c_1 * d
s_2 = k + c_2 * d

두번의 서명을 빼면

 

d = (s_1 - s_2) * (c_1 - c_2)^-1 (mod p)

 

동일한 k 덕에 두 패킷으로 개인 키가 노출된다.

 

서버는 sign_cnt - verify_cnt를 비교하여 

verify_cnt > sign_cnt일 때 플래그를 전송한다.

 

 

1. GETKEY로 공개키 Q 조회

2. REQUEST로 고정 R 확보

3. 서로 다른 c1, c2로 두 번 CHALLENGE하고 s1, s2 획득

4. 개인 키 계산

5. 메시지에 여러개 위조 서명

6. VERIFY로 제출

 

* 동일 nonce 사용은 재사용 금지해야한다.


Exploitation

from pwn import *
import json, secrets, hashlib
from ecdsa.curves import NIST256p
from ecdsa.ellipticcurve import Point, PointJacobi

curve = NIST256p
G = curve.generator
p = G.order()

def point2bytes(P):
    return P.to_bytes()

def H(R, m):
    if isinstance(m, str):
        m = m.encode()
    return int.from_bytes(
        hashlib.sha256(point2bytes(R) + m).digest(), "big"
    ) % p

def recv_json(r):
    return json.loads(r.recvline().decode())

io = remote("tcp.sasc.tf", 11334)

io.sendline(json.dumps({"cmd": "GETKEY"}).encode())
Qx, Qy = recv_json(io)["Q"]
Q = PointJacobi.from_affine(Point(curve.curve, Qx, Qy))

io.sendline(json.dumps({"cmd": "REQUEST"}).encode())
Rx, Ry = recv_json(io)["R"]
R = PointJacobi.from_affine(Point(curve.curve, Rx, Ry))

c1 = secrets.randbelow(p - 1) + 1
io.sendline(json.dumps({"cmd": "CHALLENGE", "c": c1}).encode())
s1 = recv_json(io)["s"]

c2 = secrets.randbelow(p - 1) + 1
while c2 == c1:
    c2 = secrets.randbelow(p - 1) + 1
io.sendline(json.dumps({"cmd": "CHALLENGE", "c": c2}).encode())
s2 = recv_json(io)["s"]

d = ((s1 - s2) * pow((c1 - c2) % p, -1, p)) % p
log.success(f"recovered d = {hex(d)}")

for i in range(3):
    msg = f"FORGED-{i}"
    k = secrets.randbelow(p - 1) + 1
    Rf = (G * k).to_affine()
    c  = H(Rf, msg)
    s  = (k + c * d) % p
    sig = [[Rf.x(), Rf.y()], s]

    io.sendline(json.dumps({"cmd": "VERIFY", "msg": msg, "sig": sig}).encode())
    res = recv_json(io)
    log.info(res)

    if "msg" in res and "prize" in res["msg"]:
        print(res["msg"])
        break

io.interactive()

 

 

두번의 CHALLENGE로 개인키가 노출되어 서명을 계속 위조할 수 있다.

 

 

pwned@CT100:~/CTFs-2025$ /usr/bin/env python3 "/home/pwned/CTFs-2025/sasctf/crpyt/blindspot/ex.py"
[+] Opening connection to tcp.sasc.tf on port 11334: Done
[+] recovered d = 0xc3aae1aea6900f15090425648662a531c8f729c8f7feb6a0b7a295f6905630e9
[*] {'status': 'ok', 'sign_cnt': 2, 'verify_cnt': 1}
[*] {'status': 'ok', 'sign_cnt': 2, 'verify_cnt': 2}
[*] {'status': 'ok', 'sign_cnt': 2, 'verify_cnt': 3}
[*] Switching to interactive mode
{"msg": "Wow, you can verify unsigned messages, here is your prize: SAS{r05_4t7ack_s3e5_7hr0u6h_7h3_bl1nd5p0t}"}
$ 
[*] Interrupted
[*] Closed connection to tcp.sasc.tf port 11334
$