cryptoGRAPHy (1, 2, 3)
23 minutos de lectura
Esta serie de retos está contextualizada en la teoría de grafos combinada con la criptografía. El autor implementó una librería de Python usando networkx
para manejar grafos y funciones criptográficas como el cifrado AES, HMAC o el hash SHA256.
Estos retos fueron algo polémicos debido a que los jugadores necesitaban leer, analizar y comprender la librería implementada para el esquema de cifrado de grafos (Graph Encryption Scheme) y luego buscar una solución para el reto correspondiente. Afortunadamente, la librería no cambiaba entre niveles. Sin embargo, algunos equipos se quejaron porque la solución no implicaba mucha criptografía, sino mucha teoría de grafos y habilidades de programación.
Además, los dos primeros retos fueron un poco introductorios para manejarse con la librería. El último en realidad se inspiró en un ataque realista a un esquema de cifrado de grafos, por lo que es mucho más interesante.
De todos modos, disfruté mucho de los retos porque me hicieron desempolvar mis conocimientos sobre teoría de grafos y matemática discreta, ¡así que comencemos!
Graph Encryption Scheme
En la librería tenemos tres archivos (afortunadamente, la mayoría de funciones y clases tienen docstrings):
GES.py
: Implementación del esquema de cifrado de grafosDES.py
: Implementación del esquema de cifrado de diccionariosutils.py
: Funciones de utilidad
Entendiendo el cifrado
Echando un vistazo a server.py
de cryptoGRAPHy 1, podemos ver que el servidor siempre comenzará definiendo un grafo (con networkx
). Luego generará una clave con GESClass.keyGen
(que llama a DESClass.keyGen
por detrás):
class GESClass:
# ...
def keyGen(self, security_parameter: int) -> bytes:
'''
Input: Security parameter
Output: Secret key key_SKE||key_DES
'''
key_SKE = get_random_bytes(security_parameter)
key_DES = DES.keyGen(security_parameter)
return key_SKE + key_DES
# ...
class DESClass:
# ...
def keyGen(self, security_parameter: int) -> bytes:
'''
Input: Security parameter
Output: Secret key
'''
return get_random_bytes(security_parameter)
# ...
Una vez que se establece la clave, el grafo se cifra usando GESClass.encryptGraph
:
class GESClass:
# ...
def encryptGraph(self, key: bytes, G: nx.Graph) -> dict[bytes, bytes]:
'''
Input: Secret key and a graph G
Output: Encrypted graph encrypted_db
'''
SPDX = computeSPDX(key, G, self.cores)
key_DES = key[16:]
EDB = DES.encryptDict(key_DES, SPDX, self.cores)
del(SPDX)
gc.collect()
return EDB
# ...
Como puede verse, la función llama a computeSPDX
:
def computeSPDX(key: bytes, G: nx.Graph, cores: int) -> dict[bytes, bytes]:
SPDX = {}
chunk = round(len(G.nodes())/cores)
key_SKE = key[:16]
key_DES = key[16:]
with Pool(cores) as pool:
iterable = product([G], G)
for S in pool.istarmap(computeSDSP, iterable, chunksize=chunk):
for pair in S:
label, value = pair[0], pair[1]
label_bytes = utils.pair_to_bytes(label)
value_bytes = utils.pair_to_bytes(value)
if label_bytes not in SPDX:
token = DES.tokenGen(key_DES, value_bytes)
ct = utils.SymmetricEncrypt(key_SKE,value_bytes)
ct_value = token + ct
SPDX[label_bytes] = ct_value
return SPDX
Esta función puede ser un poco difícil de entender (además, no hay docstring), porque llama a otra función llamada computeSDSP
, que se ejecuta dentro de pool.istarmap
para acelerar el proceso. El resumen es que computeSDSP
se llama con argumentos G
y un nodo del grafo, para cada nodo del grafo. Esta es la función computeSDSP
:
def computeSDSP(G: nx.Graph, root):
'''
Input: Graph G and a root
Output: Tuples of the form ((start, root), (next_vertex, root))
'''
paths = nx.single_source_shortest_path(G, root)
S = set()
for _, path in paths.items():
path.reverse()
if len(path) > 1:
for i in range(len(path)-1):
label = (path[i], root)
value = (path[i+1],root)
S.add((label, value))
return S
Nuevamente, esta función puede resultar difícil de entender. La esencia es que la función toma root
como destino y calcula todas las rutas más cortas a root
desde cualquier otro nodo fuente del grafo (paths
). Para cada ruta, los diferentes nodos de la ruta se insertan en un conjunto usando el formato ((node, root), (next, root))
. Yo interpreto ((node, root), (next, root))
como:
Si estoy en “node” y quiero llegar al nodo “root”, el siguiente nodo es “next”.
Al final, el conjunto contendrá el camino corto más largo hacia root
del grafo. Además, dado que se almacena como (label, value)
(donde label = (node, root)
y value = (next, root)
), podemos encontrar fácilmente el camino más corto para root
desde cualquier nodo del grafo. De hecho, esto se conoce como camino más corto de un solo destino. (single-destination shortest path, SDSP).
Repasemos la función computeSDPX
de nuevo:
def computeSPDX(key: bytes, G: nx.Graph, cores: int) -> dict[bytes, bytes]:
SPDX = {}
chunk = round(len(G.nodes())/cores)
key_SKE = key[:16]
key_DES = key[16:]
with Pool(cores) as pool:
iterable = product([G], G)
for S in pool.istarmap(computeSDSP, iterable, chunksize=chunk):
for pair in S:
label, value = pair[0], pair[1]
label_bytes = utils.pair_to_bytes(label)
value_bytes = utils.pair_to_bytes(value)
if label_bytes not in SPDX:
token = DES.tokenGen(key_DES, value_bytes)
ct = utils.SymmetricEncrypt(key_SKE,value_bytes)
ct_value = token + ct
SPDX[label_bytes] = ct_value
return SPDX
Ahora, para cada SDSP del grafo, las variables label
y value
se traducen a bytes (una traducción simple):
def int_to_bytes(x: int) -> bytes:
return str(x).encode()
def pair_to_bytes(pair: Tuple[int, int]) -> bytes:
return int_to_bytes(pair[0]) + b',' + int_to_bytes(pair[1])
Luego, utiliza la etiqueta (label_bytes
) como clave de un diccionario (SPDX
). El valor del diccionario correspondiente está cifrado.
Para cifrar el valor del diccionario, primero se genera un token
sobre value_bytes
usando key_DES
(key[16:]
de la primera key
generada):
class DESClass:
# ...
def tokenGen(self, key: bytes, label: bytes) -> bytes:
'''
Input: A key and a label
Output: A token on label
'''
K1 = utils.HashMAC(key, b'1'+label)[:16]
K2 = utils.HashMAC(key, b'2'+label)[:16]
return K1 + K2
# ...
Donde utils.HashMAC
implementa HMAC con SHA256:
def HashMAC(key: bytes, plaintext: bytes) -> bytes:
'''
Input: Key and plaintext
Output: A token on plaintext with the key using HMAC
'''
token = HMAC.new(key, digestmod=SHA256)
token.update(bytes(plaintext))
return token.digest()
Entonces, podemos suponer que token
está exclusivamente relacionado con value_bytes
(lo que significa que para cada combinación de (next, node)
, hay un solo token
asociado). Obsérvese que token
tiene 32 bytes.
Continuando con el cifrado, value_bytes
se cifra con AES usando key_SKE
(key[:16]
) como clave. Véanse utils.SymmetricEncrypt
/ utils.SymmetricDecrypt
:
def SymmetricEncrypt(key: bytes, plaintext: bytes) -> bytes:
'''
Encrypt the plaintext using AES-CBC mode with provided key.
Input: 16-byte key and plaintext
Output: Ciphertext
'''
if len(key) != 16:
raise ValueError
cipher = AES.new(key, AES.MODE_CBC)
ct = cipher.encrypt(pad(plaintext, AES.block_size))
iv = cipher.iv
return ct + iv
def SymmetricDecrypt(key: bytes, ciphertext: bytes) -> bytes:
'''
Decrypt the ciphertex using AES-CBC mode with provided key.
Input: 16-byte key and ciphertext
Output: Plaintext
'''
if len(key) != 16:
raise ValueError
ct, iv = ciphertext[:-16], ciphertext[-16:]
cipher = AES.new(key, AES.MODE_CBC, iv)
pt = unpad(cipher.decrypt(ct), AES.block_size)
return pt
Obsérvese que el IV del cifrado AES CBC se agrega al texto cifrado al final después de la operación y se toma del texto cifrado para su descifrado.
Finalmente, el valor token + ct
se agrega a SPDX[label_bytes]
.
Ok, pero aún no hemos terminado. Recordemos GESClass.encryptGraph
, solo hemos calculado SPDX
:
class GESClass:
# ...
def encryptGraph(self, key: bytes, G: nx.Graph) -> dict[bytes, bytes]:
'''
Input: Secret key and a graph G
Output: Encrypted graph encrypted_db
'''
SPDX = computeSPDX(key, G, self.cores)
key_DES = key[16:]
EDB = DES.encryptDict(key_DES, SPDX, self.cores)
del(SPDX)
gc.collect()
return EDB
# ...
El siguiente paso es coger key_DES
(key[16:]
) y llamar a DESClass.encryptDict
para cifrar el diccionario SPDX
completo, que es el valor a devolver:
class DESClass:
# ...
def encryptDict(self, key: bytes, plaintext_dx: dict[bytes, bytes], cores: int) -> dict[bytes, bytes]:
'''
Input: A key and a plaintext dictionary
Output: An encrypted dictionary EDX
'''
encrypted_db = {}
chunk = int(len(plaintext_dx)/cores)
iterable = product([key], plaintext_dx.items())
with Pool(cores) as pool:
for ct_label, ct_value in pool.istarmap(encryptDictHelper, iterable, chunksize=chunk):
encrypted_db[ct_label] = ct_value
return encrypted_db
# ...
Nuevamente, es un poco extraño de leer, pero es muy similar a lo anterior. Esta llamando a encryptDictHelper
con (key, label_bytes, token + ct)
como argumentos para cada par label_bytes
- token + ct
en el diccionario.
Y encryptDictHelper
realiza las siguientes operaciones:
def encryptDictHelper(key, dict_item):
label = dict_item[0]
value = dict_item[1]
K1 = utils.HashMAC(key, b'1'+label)[:16]
K2 = utils.HashMAC(key, b'2'+label)[:16]
ct_label = utils.Hash(K1)
ct_value = utils.SymmetricEncrypt(K2, value)
return ct_label, ct_value
Nótese que K1
es token[:16]
y K2
es token[16:]
. Por lo tanto, simplemente genera el hash SHA256 de K1
y cifra token + ct
con AES usando K2
como clave. Esos serán los pares clave-valor del diccionario cifrado EDB
.
Comprendiendo las consultas
Todavía hay dos métodos de los que aún no hemos hablado: GESClass.search
y DESClass.search
. El primero llama al segundo por detrás y se usa para encontrar el camino más corto entre dos nodos del grafo:
class GESClass:
# ...
def search(self, token: bytes, encrypted_db: dict[bytes, bytes]) -> Tuple(bytes, bytes):
'''
Input: Search token
Output: (tokens, cts)
'''
resp, tok = b"", b""
curr = token
while True:
value = DES.search(curr, encrypted_db)
if value == b'':
break
curr = value[:32]
resp += value[32:]
tok += curr
return tuple([tok, resp])
# ...
class DESClass:
# ...
def search(self, search_token: bytes, encrypted_db: dict[bytes, bytes]) -> bytes:
'''
Input: Search token and EDX
Output: The corresponding encrypted value.
'''
K1 = search_token[:16]
K2 = search_token[16:]
hash_val = utils.Hash(K1)
if hash_val in encrypted_db:
ct_value = encrypted_db[hash_val]
return utils.SymmetricDecrypt(K2, ct_value)
else:
return b''
# ...
Básicamente, para buscar el camino más corto desde A
hasta B
, se genera un token
(que identifica la etiqueta (A, B)
) y se busca en EDB
(también llamado encrypted_db
). Recordemos que token = K1 + K2
, por lo que K1
es la clave del diccionario y K2
es la clave AES para descifrar el valor del diccionario. Sólo se devuelve el valor de DESClass.search
.
Imaginemos que el siguiente nodo desde A
en el cambino más corto es C
, entonces la salida de DESClass.search
en este ejemplo será token + ct
. Este token
se correspondería con (C, B)
y ct
estaría cifrado con AES y key_SKE
, que se portaría precisamente la etiqueta (C, B)
.
Para cada resultado de DESClass.search
, el resultado se divide en token
y ct
y ambas se agregan a tok
y resp
, respectivamente. Finalmente, la tupla (tok, resp)
se devuelve desde GESClass.search
.
Resumen
GESClass.keyGen
: Generakey
comokey_SKE
ykey_DES
. Luego,key[:16]
se emplea para el cifrado AES ykey[16:]
para HMAC- Single-destination shortest path (SDSP): Representa todos los caminos más cortos desde todos los nodos hasta un único destino. Cada entrada se representa como
((node, root), (next, root))
, para que puedan encadenarse para construir un camino más corto desde unnode
hastaroot
SPDX
: Es un diccionario que mapea(node, root)
contoken + ct
(relacionado con(next, root)
)DESClass.tokenGen
(yGESClass.tokenGen
): Define un token único para una etiqueta(node, root)
. Es un valor de 32 bytes compuesto por valores de 16 bytesK1
yK2
EDB
(encrypted_db
): Es un diccionario que asigna el hash SHA256 deK1
y el valor cifrado con AES detoken + ct
usandoK2
como claveGESClass.search
: Busca la ruta más corta desde un origen a un destino en el SDSP y devuelve la concatenación de todos lostoken
(asociados a(node, root)
) de la ruta y todos los textos cifrados (en realidad,token + ct
)
cryptoGRAPHy 1
Se nos proporciona el código fuente del servidor, que utiliza algunas funciones de la librería de Graph Encryption Scheme:
from lib import GES, utils
import networkx as nx
import random
from SECRET import flag, decrypt
NODE_COUNT = 130
EDGE_COUNT = 260
SECURITY_PARAMETER = 16
def gen_random_graph(node_count: int, edge_count: int) -> nx.Graph:
nodes = [i for i in range(1, node_count + 1)]
edges = []
while len(edges) < edge_count:
u, v = random.choices(nodes, k=2)
if u != v and (u, v) not in edges and (v, u) not in edges:
edges.append([u, v])
return utils.generate_graph(edges)
if __name__ == '__main__':
try:
print("[+] Generating random graph...")
G = gen_random_graph(NODE_COUNT, EDGE_COUNT)
myGES = GES.GESClass(cores=4, encrypted_db={})
key = myGES.keyGen(SECURITY_PARAMETER)
print(f"[*] Key: {key.hex()}")
print("[+] Encrypting graph...")
enc_db = myGES.encryptGraph(key, G)
print("[!] Answer 50 queries to get the flag. In each query, input the shortest path \
decrypted from response. It will be a string of space-separated nodes from \
source to destination, e.g. '1 2 3 4'.")
for q in range(50):
while True:
u, v = random.choices(list(G.nodes), k=2)
if nx.has_path(G, u, v):
break
print(f"[+] Query {q+1}/50: {u} {v}")
token = myGES.tokenGen(key, (u, v))
_, resp = myGES.search(token, enc_db)
print(f"[*] Response: {resp.hex()}")
ans = input("> Original query: ").strip()
if ans != decrypt(u, v, resp, key):
print("[!] Wrong answer!")
exit()
print(f"[+] Flag: {flag}")
except:
exit()
Obsérvese que hay una función llamada decrypt
que está oculta para nosotros.
Análisis del código fuente
Primero, el servidor genera un grafo aleatorio, genera una clave (¡que se muestra!) y cifra el grafo:
print("[+] Generating random graph...")
G = gen_random_graph(NODE_COUNT, EDGE_COUNT)
myGES = GES.GESClass(cores=4, encrypted_db={})
key = myGES.keyGen(SECURITY_PARAMETER)
print(f"[*] Key: {key.hex()}")
print("[+] Encrypting graph...")
enc_db = myGES.encryptGraph(key, G)
Después de eso, se nos pide que respondamos correctamente a 50 consultas para obtener la flag. Para cada consulta, el servidor tomará dos nodos aleatorios (u
y v
) del grafo que estén conectados y encontrará el camino más corto entre ellos. Se nos dará el token
para (u, v)
y la parte resp
de GESClass.search
(los textos cifrados):
for q in range(50):
while True:
u, v = random.choices(list(G.nodes), k=2)
if nx.has_path(G, u, v):
break
print(f"[+] Query {q+1}/50: {u} {v}")
token = myGES.tokenGen(key, (u, v))
_, resp = myGES.search(token, enc_db)
print(f"[*] Response: {resp.hex()}")
ans = input("> Original query: ").strip()
if ans != decrypt(u, v, resp, key):
print("[!] Wrong answer!")
exit()
print(f"[+] Flag: {flag}")
Solución
Recordemos del análisis la librería de Graph Encryption Scheme que resp
está cifrado con AES y la clave key_SKE
, que es key[:16]
(la sabemos desde el principio). Entonces, podemos encontrar todos los nodos intermedios desde u
hasta v
(incluido v
pero no u
) descifrando cada fragmento de 32 bytes (16 para texto cifrado y 16 para IV). El valor de u
lo imprime el servidor, por lo que no es un problema aquí.
Podemos implementar esto fácilmente con Python. Este es el código relevante:
io = get_process()
io.recvuntil(b'[*] Key: ')
key = bytes.fromhex(io.recvline().decode())
SKE, DES = key[:16], key[16:]
round_prog = io.progress('Round')
for r in range(50):
round_prog.status(f'{r + 1} / 50')
io.recvuntil(b'/50: ')
start = io.recvuntil(b' ')
io.recvuntil(b'[*] Response: ')
res = bytes.fromhex(io.recvline().decode())
ct_iv = [(res[i:i+16], res[i+16:i+32]) for i in range(0, len(res), 32)]
shortest_path = []
for ct, iv in ct_iv:
cipher = AES.new(SKE, AES.MODE_CBC, iv)
pt = unpad(cipher.decrypt(ct), AES.block_size)
shortest_path.append(pt.split(b',')[0])
io.sendlineafter(b'> Original query: ', start + b' '.join(shortest_path))
round_prog.success('50 / 50')
print(io.recv().decode().strip())
Flag
Una vez que ejecutemos el script, encontraremos la flag:
$ python3 solve.py chals.sekai.team 3001
[+] Opening connection to chals.sekai.team on port 3001: Done
[+] Round: 50 / 50
[+] Flag: SEKAI{GES_15_34sy_2_br34k_kn@w1ng_th3_k3y}
[*] Closed connection to chals.sekai.team port 3001
El script completo se puede encontrar aquí: solve.py
.
cryptoGRAPHy 2
Para esta segunda parte, se nos proporciona el código fuente del servidor, que nuevamente utiliza algunas funciones de la librería de Graph Encryption Scheme:
from lib import GES, utils
import networkx as nx
import random
from SECRET import flag, get_SDSP_node_degrees
'''
get_SDSP_node_degrees(G, dest) returns the node degrees in the single-destination shortest path (SDSP) tree, sorted in ascending order.
For example, if G has 5 nodes with edges (1,2),(1,3),(2,3),(2,5),(4,5) and dest=1, returns "1 1 2 2 2".
[+] Original: [+] SDSP:
1--2--5--4 1--2--5--4
| / |
3 3
'''
# Another example for sanity check
TestGraph = utils.generate_graph([[1, 2], [1, 4], [1, 6], [6, 5], [6, 7], [4, 7], [2, 5]])
assert get_SDSP_node_degrees(TestGraph, 1) == '1 1 1 2 2 3'
NODE_COUNT = 130
EDGE_PROB = 0.031
SECURITY_PARAMETER = 32
def gen_random_graph() -> nx.Graph:
return nx.fast_gnp_random_graph(n=NODE_COUNT, p=EDGE_PROB)
if __name__ == '__main__':
try:
print("[!] Pass 10 challenges to get the flag:")
for q in range(10):
print(f"[+] Challenge {q+1}/10. Generating random graph...")
while True:
G = gen_random_graph()
if nx.is_connected(G):
break
myGES = GES.GESClass(cores=4, encrypted_db={})
key = myGES.keyGen(SECURITY_PARAMETER)
print("[+] Encrypting graph...")
enc_db = myGES.encryptGraph(key, G)
dest = random.choice(list(G.nodes()))
print(f"[*] Destination: {dest}")
attempts = NODE_COUNT
while attempts > 0:
attempts -= 1
query = input("> Query u,v: ").strip()
try:
u, v = map(int, query.split(','))
assert u in G.nodes() and v in G.nodes() and u != v
except:
print("[!] Invalid query!")
break
token = myGES.tokenGen(key, (u, v))
print(f"[*] Token: {token.hex()}")
tok, resp = myGES.search(token, enc_db)
print(f"[*] Query Response: {tok.hex() + resp.hex()}")
ans = input("> Answer: ").strip()
if ans != get_SDSP_node_degrees(G, dest):
print("[!] Wrong answer!")
exit()
print(f"[+] Flag: {flag}")
except:
exit()
SDSP y grados de los nodos
Ahora, el servidor oculta una función llamada get_SDSP_node_degrees
, pero al menos nos explica lo que hace:
from SECRET import flag, get_SDSP_node_degrees
'''
get_SDSP_node_degrees(G, dest) returns the node degrees in the single-destination shortest path (SDSP) tree, sorted in ascending order.
For example, if G has 5 nodes with edges (1,2),(1,3),(2,3),(2,5),(4,5) and dest=1, returns "1 1 2 2 2".
[+] Original: [+] SDSP:
1--2--5--4 1--2--5--4
| / |
3 3
'''
# Another example for sanity check
TestGraph = utils.generate_graph([[1, 2], [1, 4], [1, 6], [6, 5], [6, 7], [4, 7], [2, 5]])
assert get_SDSP_node_degrees(TestGraph, 1) == '1 1 1 2 2 3'
Este reto me hizo comprender exactamente el concepto de SDSP. El ejemplo dibujado puede ser un poco breve para generalizar, así que dibujemos el segundo ejemplo. Este es el grafo:
4----1----2
| | |
7----6----5
Ahora, el destino es 1
, así que busquemos las rutas más cortas desde cada nodo:
2
->1
:
4----1----2
| | |
7----6----5
4
->1
:
4----1----2
| | |
7----6----5
5
->1
:
4----1----2
| | |
7----6----5
6
->1
:
4----1----2
| | |
7----6----5
7
->1
:
4----1----2
| | |
7----6----5
Entonces, el SDSP es el grafo inicial pero eliminando los bordes no utilizados, y en realidad es un árbol:
4----1----2
| | |
7 6 5
Y la salida de get_SDSP_node_degrees
es una lista de los grados de cada nodo del SDSP (el grado de un nodo es el número de nodos adyacentes de dicho nodo), ordenados en orden ascendente. Por lo tanto, el resultado para este ejemplo es 1 1 1 2 2 3
. Podemos reemplazar los identificadores de los nodos por su grado correspondiente, para ilustrar el concepto de grado de un nodo:
2----3----2
| | |
1 1 1
Análisis del código fuente
Esta vez, el servidor no imprime la key
… Necesitaremos obtener la salida de get_SDSP_node_degrees
un total de 10 veces.
La inicialización es la misma que antes (se genera un grafo aleatorio, se generaa la clave y se cifra el grafo). Después de eso, el servidor toma un punto de destino aleatorio, que se imprime en texto claro:
print("[!] Pass 10 challenges to get the flag:")
for q in range(10):
print(f"[+] Challenge {q+1}/10. Generating random graph...")
while True:
G = gen_random_graph()
if nx.is_connected(G):
break
myGES = GES.GESClass(cores=4, encrypted_db={})
key = myGES.keyGen(SECURITY_PARAMETER)
print("[+] Encrypting graph...")
enc_db = myGES.encryptGraph(key, G)
dest = random.choice(list(G.nodes()))
print(f"[*] Destination: {dest}")
El servidor nos permite realizar hasta 130 consultas (NODE_COUNT
, el número de nodos en el grafo), y se nos dará el token
para nuestra entrada (u, v)
, así como tok
y resp
de GESClass.search
(concatenados):
attempts = NODE_COUNT
while attempts > 0:
attempts -= 1
query = input("> Query u,v: ").strip()
try:
u, v = map(int, query.split(','))
assert u in G.nodes() and v in G.nodes() and u != v
except:
print("[!] Invalid query!")
break
token = myGES.tokenGen(key, (u, v))
print(f"[*] Token: {token.hex()}")
tok, resp = myGES.search(token, enc_db)
print(f"[*] Query Response: {tok.hex() + resp.hex()}")
ans = input("> Answer: ").strip()
if ans != get_SDSP_node_degrees(G, dest):
print("[!] Wrong answer!")
exit()
print(f"[+] Flag: {flag}")
Solución
Entonces, intentaremos encontrar el SDSP para un nodo de destino determinado. Por lo tanto, todas las consultas a realizar deben ser (node, root)
, donde root
es el punto de destino dado.
Además, obsérvese que hay una comprobación de que u != v
. Podemos usar esto para romper el bucle while
enviando (root, root)
al final, cuando ya tengamos los otros 129 resultados de la consulta, y luego intentar adivinar el resultado de getSDSP_node_degrees
.
Para cada ronda, haremos lo siguiente:
data = {}
nodes = [0] * NODES_SIZE
io.recvuntil(b'[*] Destination: ')
dest = int(io.recvline().decode())
for i in range(NODES_SIZE):
if i == dest:
continue
io.sendlineafter(b'> Query u,v: ', f'{i},{dest}'.encode())
io.recvuntil(b'[*] Token: ')
token = bytes.fromhex(io.recvline().decode())
io.recvuntil(b'[*] Query Response: ')
res = bytes.fromhex(io.recvline().decode())
keys = [token] + [res[i:i+32] for i in range(0, len(res) // 2, 32)]
data[i] = list(map(lambda b: b.hex(), keys))
if nodes[dest] == 0:
nodes[dest] = keys[-1].hex()
if nodes[i] == 0:
nodes[i] = keys[0].hex()
Estamos tomando el token
, que está relacionado con (node, root)
, y luego tomamos la parte token
de la respuesta del servidor (la primera mitad son todos los tokens de 32 bytes para cada etiqueta (next, root)
en la ruta más corta). Los guardaremos en un diccionario data
para cada nodo.
Además, usaremos el token
para identificar cada nodo (guardado en la lista nodes
), y uno de los ultimos valores de token
de res
como identificador del destino ((root, root)
).
Una vez que tengamos esta información, podemos generar el grafo SDSP guardando todas las aristar posibles en un conjunto (siguiendo los caminos más cortos):
edges = set()
for d in data.values():
prev_node = nodes.index(d[0])
for i, next_node in enumerate(d[1:] + [nodes[0]]):
edge = (prev_node, nodes.index(next_node))
if (edge[1], edge[0]) not in edges and edge[0] != edge[1] and edge != (dest, 0):
edges.add(edge)
prev_node = nodes.index(next_node)
G = generate_graph(edges)
Y el último paso es tomar los grados de los nodos y ordenarlos:
degrees = sorted(map(lambda t: t[1], G.degree))
io.sendlineafter(b'> Query u,v: ', f'{dest},{dest}'.encode())
io.sendlineafter(b'> Answer: ', ' '.join(map(str, degrees)).encode())
Flag
Podemos ejecutar el script para calcular lo anterior 10 veces y obtener la flag:
$ python3 solve.py chals.sekai.team 3062
[+] Opening connection to chals.sekai.team on port 3062: Done
[+] Round: 10 / 10
[+] Flag: SEKAI{3ff1c13nt_GES_4_Shortest-Path-Queries-_-}
[*] Closed connection to chals.sekai.team port 3062
El script completo se puede encontrar aquí: solve.py
.
cryptoGRAPHy 3
Para el último reto de la serie, se nos proporciona nuevamente el código fuente del servidor:
from itertools import product, chain
from multiprocessing import Pool
from lib import GES
import networkx as nx
import random
import time
from SECRET import flag, generate_tree, decrypt
NODE_COUNT = 60
SECURITY_PARAMETER = 128
MENU = '''============ MENU ============
1. Graph Information
2. Query Responses
3. Challenge
4. Exit
=============================='''
def query_resps(cores: int, key: bytes, G: nx.Graph, myGES: GES.GESClass, enc_db):
n = len(G.nodes())
query_list = []
queries = product(set(), set())
for component in nx.connected_components(G):
queries = chain(queries, product(component, component))
iterable = product([key], queries)
chunk = n * n // cores
with Pool(cores) as pool:
for token in pool.istarmap(myGES.tokenGen, iterable, chunksize=chunk):
tok, resp = myGES.search(token, enc_db)
query_list.append((token.hex() + tok.hex(), resp.hex()))
random.shuffle(query_list)
return query_list
if __name__ == '__main__':
try:
G = generate_tree()
assert len(G.nodes()) == NODE_COUNT
myGES = GES.GESClass(cores=4, encrypted_db={})
key = myGES.keyGen(SECURITY_PARAMETER)
enc_db = myGES.encryptGraph(key, G)
t = time.time()
print("[!] Recover 10 queries in 30 seconds. It is guaranteed that each answer is unique.")
while True:
print(MENU)
option = input("> Option: ").strip()
if option == "1":
print("[!] Graph information:")
print("[*] Edges:", G.edges())
elif option == "2":
print(f"[*] Query Responses: ")
resp = query_resps(4, key, G, myGES, enc_db)
for r in resp:
print(f"{r[0]} {r[1]}")
elif option == "3":
break
else:
exit()
print("[!] In each query, input the shortest path decrypted from response. \
It will be a string of space-separated nodes from source to destination, e.g. '1 2 3 4'.")
for q in range(10):
print(f"[+] Challenge {q+1}/10.")
while True:
u, v = random.choices(list(G.nodes()), k=2)
if u != v and nx.has_path(G, u, v):
break
token = myGES.tokenGen(key, (u, v))
print(f"[*] Token: {token.hex()}")
tok, resp = myGES.search(token, enc_db)
print(f"[*] Query Response: {tok.hex() + resp.hex()}")
ans = input("> Original query: ").strip()
if ans != decrypt(u, v, resp, key):
print("[!] Wrong answer!")
exit()
if time.time() - t > 30:
print("[!] Time's up!")
exit()
print(f"[+] Flag: {flag}")
except:
exit()
Análisis del código fuente
Nuevamente, el servidor no imprime la key
… Pero en su lugar, se nos da el grafo inicial en texto claro (opción 1
):
if option == "1":
print("[!] Graph information:")
print("[*] Edges:", G.edges())
Particularmente, el grafo es un árbol que se genera con una función oculta llamada generate_tree
. Al principio, esto fue un poco extraño para mí, ya que networkx
tiene una función llamada random_tree
, por lo que no hay razón para ocultarla. Al realizar algunas pruebas en local, descubrí que si dos nodos están al mismo nivel, el reto no se podía resolver porque esos nodos no se pueden distinguir cuando están cifrados. Luego, mirando el árbol generado por el servidor, no había ningún par de nodos al mismo nivel, por lo que el reto sí tenía solución y la función generate_tree
(oculta) se encargaría de evitar esa situación.
En la opción 2
, se nos darán las respuestas ((tok, resp)
) a todas las consultas posibles en el grafo, que tiene un total de 60 nodos. Por lo tanto, recibiremos un total de $60^2 = 3600$ respuestas. El problema es que la lista de respuestas está desordenada:
def query_resps(cores: int, key: bytes, G: nx.Graph, myGES: GES.GESClass, enc_db):
n = len(G.nodes())
query_list = []
queries = product(set(), set())
for component in nx.connected_components(G):
queries = chain(queries, product(component, component))
iterable = product([key], queries)
chunk = n * n // cores
with Pool(cores) as pool:
for token in pool.istarmap(myGES.tokenGen, iterable, chunksize=chunk):
tok, resp = myGES.search(token, enc_db)
query_list.append((token.hex() + tok.hex(), resp.hex()))
random.shuffle(query_list)
return query_list
Una vez recibida toda la información, podemos usar la opción 3
para comenzar el reto real: durante 10 rondas, el servidor tomará dos nodos aleatorios u
y v
del grafo que estén conectados y nos solicitará que pongamos la consulta original (es decir, el camino más corto entre ellos). Se nos dará el token
para (u, v)
y el resultado de GESClass.search
:
print("[!] In each query, input the shortest path decrypted from response. \
It will be a string of space-separated nodes from source to destination, e.g. '1 2 3 4'.")
for q in range(10):
print(f"[+] Challenge {q+1}/10.")
while True:
u, v = random.choices(list(G.nodes()), k=2)
if u != v and nx.has_path(G, u, v):
break
token = myGES.tokenGen(key, (u, v))
print(f"[*] Token: {token.hex()}")
tok, resp = myGES.search(token, enc_db)
print(f"[*] Query Response: {tok.hex() + resp.hex()}")
ans = input("> Original query: ").strip()
if ans != decrypt(u, v, resp, key):
print("[!] Wrong answer!")
exit()
if time.time() - t > 30:
print("[!] Time's up!")
exit()
print(f"[+] Flag: {flag}")
Solución
En primer lugar, podemos tomar toda la información de las opciones 1
y 2
:
io = get_process()
io.sendlineafter(b'> Option: ', b'1')
io.recvuntil(b'[*] Edges: ')
G = generate_graph(eval(io.recvline().decode()))
io.sendlineafter(b'> Option: ', b'2')
io.recvline()
queries = []
while True:
line = io.recvline().decode()[:-1]
if 'MENU' in line:
break
tok, res = map(bytes.fromhex, line.split(' '))
queries.append({
'token': tok[:32].hex(),
'tok': [tok[i:i+32].hex() for i in range(32, len(tok), 32)],
'res': [res[i:i+32].hex() for i in range(0, len(res), 32)]
})
Observe cómo utilicé una lista queries
(que contendrá un total de 3600 elementos) con un token
y dos listas para tok
y res
.
Echando un vistazo a la lista queries
, podemos descubrir que hay exactamente 60 elementos con tok
y res
vacíos dado que son consultas de la forma (root, root)
. Podemos usar estos elementos para identificar nodos del grafo, así que vamos a separarlos en otra lista:
toks_0 = [q['token'] for q in queries if len(q['tok']) == 0]
En realidad, cada token en tok_0
aparecerá en otros elementos de las listas queries.tok
en la última posición, ya que (root, root)
es un destino. Así, podemos encontrar todos los token
de la forma (node, root)
que contienen un token
dado de tok_0
en su lista tok
.
Y lo que es más interesante, podemos unir todos los token
en un grafo. En realidad, este grafo será un árbol. Esto es muy importante, porque los árboles no contienen ningún ciclo, por lo que el camino entre dos nodos es en realidad el camino más corto entre ellos. Y lo que es aún más alucinante, podemos encontrar un mapeo entre el árbol token
y el árbol de texto claro que tenemos desde el principio.
En términos matemáticos, esto se llama isomorfismo entre árboles, que define una relación uno a uno entre los nodos de dos árboles. Afortunadamente, networkx
proporciona una función para encontrar un isomorfismo entre árboles. (tree_isomorphism
), para que podamos hacer coincidir cada token
con un nodo de texto claro:
mappings = {}
def define_tree(queries):
nodes = [q['token'] for q in queries]
edges = set()
for node in nodes:
for q in queries:
if len(q['tok']) and q['tok'][0] == node:
edges.add((node, q['token']))
GG = generate_graph(edges)
isomorphism = tree_isomorphism(GG, G)
for enc, node in isomorphism:
mappings[enc] = node
Y este proceso hay que hacerlo 60 veces, ya que hay un total de 60 destinos diferentes:
for tok in toks_0:
define_tree([q for q in queries if tok in q['tok'] or tok == q['token']])
Una vez que tengamos el mapeo entre eltoken
cifrado y el nodo de texto claro, podemos encontrar fácilmente la ruta más corta de la consulta que se nos proporciona buscando en el diccionario mapping
. Y eso es precisamente lo que estamos haciendo aquí:
io.sendlineafter(b'> Option: ', b'3')
round_prog = io.progress('Round')
for r in range(10):
round_prog.status(f'{r + 1} / 10')
io.recvuntil(b'[*] Token: ')
token = bytes.fromhex(io.recvline().decode())
io.recvuntil(b'[*] Query Response: ')
res = bytes.fromhex(io.recvline().decode())
keys = [token.hex()] + [res[i:i+32].hex() for i in range(0, len(res) // 2, 32)]
shortest_path = []
for key in keys:
shortest_path.append(mappings[key])
io.sendlineafter(b'> Original query: ', ' '.join(map(str, shortest_path)).encode())
round_prog.success('10 / 10')
print(io.recv().decode().strip())
Flag
Una vez que ejecutemos el script veremos la flag (que nos da un paper que describe este ataque de recuperación de consulta con mucho más detalle):
$ python3 solve.py chals.sekai.team 3023
[+] Opening connection to chals.sekai.team on port 3023: Done
[+] Round: 10 / 10
[+] Flag: SEKAI{Full_QR_Attack_is_not_easy_https://eprint.iacr.org/2022/838.pdf}
[*] Closed connection to chals.sekai.team port 3023
El script completo se puede encontrar aquí: solve.py
.