secure source
8 minutos de lectura
Se nos proporciona un proyecto web construido con Python y Flask. Tenemos esta página inicial:
Podemos registrarnos y luego iniciar sesión. Llegaremos a un dashboard que solo nos permite crear y ver notas:
Entonces, es buen momento para leer el código fuente.
Análisis del código fuente
Los endpoints se listan en views.py
. Hay un endpoint /dashboard
que mostrará la flag si tenemos rol admin
:
@bp.route('/dashboard', methods=['GET'])
def dashboard():
cookies = request.cookies
if not ('token' in cookies and 'pubkey' in cookies):
flash('Make sure the proper cookies are set.', 'error')
return redirect(url_for('views.home'))
token = request.cookies.get('token')
pubkey = request.cookies.get('pubkey').split(',')
if not jwt.verify_token(pubkey, token):
flash('Your token could not be verified.', 'error')
return redirect(url_for('views.home'))
if not jwt.check_admin(token):
flash('Only admins can access the dashboard.', 'error')
return redirect(url_for('views.home'))
return render_template('dashboard.html', secret=open('/flag.txt').read()), 200
Vemos que las sesiones se gestionan con JWT (crypto/jwt.py
):
from .ecdsa import ecc
import json, time
from base64 import b64encode, b64decode
class Tokenizer:
def __init__(self):
pass
def create_token(self, user):
header = b64encode(json.dumps({'alg': 'EC256', 'typ': 'JWT'}).encode())
payload = b64encode(json.dumps({
'username': user.username,
'email': user.email,
'iat': str(int(time.time()))
}).encode())
signature = ecc.sign(header + b'.' + payload)
return header + b'.' + payload + b'.' + signature
def verify_token(self, pubkey, token):
if len(pubkey) != 2:
return False
if not (pubkey[0].isnumeric() and pubkey[1].isnumeric()):
return False
header, payload, signature = token.split('.')
return ecc.verify(pubkey, signature, header + '.' + payload)
def check_admin(self, token):
_, payload, _ = token.split('.')
payload_json = json.loads(b64decode(payload).decode())
username = payload_json['username']
return username == 'HTBAdmin1337_ZUSD3uQG4I'
Como se puede ver, nuestro nombre de usuario debe ser HTBAdmin1337_ZUSD3uQG4I
. Obviamente, el endpoint /register
tiene una comprobación para bloquear cualquier intento de registrar una cuenta de admin
:
if username.startswith('HTBAdmin1337'):
return render_template('register.html', error='You are not allowed to register an admin user.'), 403
Volviendo a la implementación de JWT, está utilizando criptografía de curva elíptica como algoritmo de firma. Particularmente, está usando ECDSA (crypto/ecdsa.py
):
import sys; sys.path.append('../')
from utils import ALPHABET, l2b, b2l
from .rng import generator
from .curve import *
from hashlib import sha256
import secrets, base64
from random import choices
class ECDSA:
def __init__(self):
self.x = int(secrets.token_hex(32), 16)
print(self.x)
print(self.x % q)
self.Q = G * self.x
def sign(self, m):
k = int(generator.choices(ALPHABET, 32).encode().hex(), 16)
H = int(sha256(m).hexdigest(), 16)
r = (G * k).x
s = pow(k, -1, q) * (H + self.x * r) % q
if s == 0:
self.sign(m)
return base64.b64encode(l2b(r) + l2b(s))
def verify(self, pubkey, sig, m):
_sig = base64.b64decode(sig)
_r = b2l(_sig[:32])
_s = b2l(_sig[32:])
_Q = Point(int(pubkey[0]), int(pubkey[1]), curve=E)
if [_Q.x, _Q.y] == [O.x, O.y]:
return False
if not E.is_point_on_curve((_Q.x, _Q.y)):
return False
if not ((_Q * q).x == O.y and (_Q * q).y == O.y):
return False
if not (1 <= _r <= q - 1 and 1 <= _s <= q - 1):
return False
H = int(sha256(m.encode()).hexdigest(), 16)
sinv = pow(_s, -1, q)
u1 = H * sinv % q
u2 = _r * sinv % q
U = G * u1 + _Q * u2
if [U.x, U.y] == [O.x, O.y]:
return False
return _r == U.x
ecc = ECDSA()
ECDSA
Básicamente, las firmas de ECDSA son un par de valores
Donde
El proceso de verificación es así, donde
La firma es válida si
Encontrando la vulnerabilidad
El fallo aquí es que el nonce
k = int(generator.choices(ALPHABET, 32).encode().hex(), 16)
Este objeto generator
viene de crypto/rng.py
:
import random
class RNG:
def __init__(self):
self.pool = list(random.getstate()[1][:-1])
def next(self):
if len(self.pool) < 1:
self.reset_pool()
return self.pool.pop(0)
def reset_pool(self):
self.pool = list(random.getstate()[1][:-1])
def choices(self, s, k):
return ''.join(random.choices(s, k=k))
generator = RNG()
Se está utilizando el módulo random
de Python, que utiliza MT19937, un generador de números pseudo-aleatorios (PRNG) de tipo de Mersenne Twister .
Es bien sabido que MT19937 se puede romper con suficientes salidas del PRNG. Pero esta vez, es más fácil que eso, porque el estado inicial se guarda en el atributo pool
. Este estado está compuesto por 624 números enteros de 32 bits. Obsérvese que el método next
simplemente devuelve el siguiente valor de pool
hasta que esté vacío.
Si podemos recolectar esos 624 valores, podemos reconstruir el estado inicial y encontrar el nonce
El PRNG se usa para establecer el id
de las notas (models/note.py
):
from crypto.rng import generator
class Note:
def __init__(self, title, description):
self.id = generator.next()
self.title = title
self.description = description
def __str__(self):
return f'Note({self.id}, {self.title}, {self.description})'
def __json_serialize__(self):
return {
'id': self.id,
'title': self.title,
'description': self.description
}
Entonces, tendremos que usar /create-note
y /view-note
varias veces para obtener los 624 IDs:
@bp.route('/create-note', methods=['GET', 'POST'])
def create_note():
if not 'logged_in_user' in session:
flash('To create a note you need to be logged in first.', 'error')
return redirect(url_for('views.login'))
if request.method == 'POST':
data = request.form.to_dict()
logged_in_user = session['logged_in_user']
if not ('title' in data and 'description' in data):
flash('You must provide a note title and description.', 'error')
return redirect(url_for('views.create_note'))
title = data['title']
description = data['description']
new_note = Note(title, description)
if new_note != database.insert(new_note, 'notes'):
flash('An internal database error occured. Contact an admin!', 'error')
return redirect(url_for('views.create_note'))
flash('Your note was successfully saved.', 'message')
return redirect(url_for('views.home'))
return render_template('create_note.html'), 200
@bp.route('/view-notes', methods=['GET', 'POST'])
def view_notes():
if not 'logged_in_user' in session:
flash('To view your notes you need to be logged in first.', 'error')
return redirect(url_for('views.login'))
logged_in_user = session['logged_in_user']
notes = database.fetch_user_notes(logged_in_user)
if request.method == 'GET':
return render_template('view_notes.html', notes=notes), 200
else:
return jsonify({'notes': list(map(Note.__json_serialize__, notes))})
Estarán presentes en el texto HTML de la respuesta:
<main>
{% if notes %}
<ul>
{% for note in notes %}
<li>
<h2>{{ note.title }}</h2>
<p>ID : {{ note.id }}</p>
<p>Description : {{ note.description }}</p>
</li>
{% endfor %}
</ul>
{% else %}
<p>No notes found.</p>
{% endif %}
</main>
Solución
Entonces, comencemos a escribir nuestra solución. Primero nos registramos e iniciamos sesión con una cuenta aleatoria:
username = password = os.urandom(8).hex()
requests.post(f'{URL}/register', data={'username': username, 'password': password, 'email': 'x'})
s = requests.session()
s.post(f'{URL}/login', data={'username': username, 'password': password})
Ahora, podemos crear 624 notas y luego leer todos los IDs para reconstruir el estado del PRNG:
for _ in range(624):
s.post(f'{URL}/create-note', data={'title': 'A', 'description': 'B'})
res = s.post(f'{URL}/view-notes')
notes = res.json().get('notes', [])
state = tuple(n.get('id', 0) for n in notes)
random.setstate((3, state + (624, ), None))
Ahora extraemos la firma
token = s.cookies.get('token')
assert isinstance(token, str)
signature = b64decode(token.split('.')[-1])
r, s = int(signature[:32].hex(), 16), int(signature[32:].hex(), 16)
A continuación, tomamos nuestra información del JWT, calculamos el mismo
q, G = brainpoolP256r1.q, brainpoolP256r1.G
h = int(sha256('.'.join(token.split('.')[:-1]).encode()).hexdigest(), 16)
k = int(''.join(random.choices(printable, k=32)).encode().hex(), 16)
assert r == (G * k).x
x = (s * k - h) * pow(r, -1, q) % q
Con esto, podemos firmar un JWT con el rol admin
:
admin_data = b'.'.join(map(b64encode, [b'{"alg":"EC256","typ":"JWT"}', b'{"username":"HTBAdmin1337_ZUSD3uQG4I"}']))
h = int(sha256(admin_data).hexdigest(), 16)
k = 1337
r = (G * k).x
s = pow(k, -1, q) * (h + x * r) % q
admin_token = admin_data + b'.' + b64encode(r.to_bytes(32, 'big') + s.to_bytes(32, 'big'))
Q = G * x
Finalmente, enviamos el token junto con la clave pública para obtener la flag:
res = requests.get(f'{URL}/dashboard', cookies={'token': admin_token.decode(), 'pubkey': f'{Q.x},{Q.y}'}).text
print(res[res.index('HTB{'):res.index('}') + 1])
Flag
Si ejecutamos el script, obtendremos la flag:
$ python3 solve.py 83.136.249.46:36889
HTB{weak_randomness_is_usually_the_most_crucial_source_of_vulnerabilities}
El script completo se puede encontrar aquí: solve.py
.
Vía no intencionada
Volviendo al endpoint /dashboard
, examinemos el código más detenidamente:
@bp.route('/dashboard', methods=['GET'])
def dashboard():
cookies = request.cookies
if not ('token' in cookies and 'pubkey' in cookies):
flash('Make sure the proper cookies are set.', 'error')
return redirect(url_for('views.home'))
token = request.cookies.get('token')
pubkey = request.cookies.get('pubkey').split(',')
if not jwt.verify_token(pubkey, token):
flash('Your token could not be verified.', 'error')
return redirect(url_for('views.home'))
if not jwt.check_admin(token):
flash('Only admins can access the dashboard.', 'error')
return redirect(url_for('views.home'))
return render_template('dashboard.html', secret=open('/flag.txt').read()), 200
Este endpoint requiere cookies token
y pubkey
… Espera, pubkey
? Recordemos cómo funcionan las firmas, se firma con la clave privada y otros podrán verificar nuestra firma con nuestra clave pública. Correcto, entonces podemos firmar nuestro token de admin
con una nueva clave privada y establecer pubkey
con clave pública asociada. ¡Ni siquiera necesitamos registrarnos e iniciar sesión!
$ python3 -q
>>> import requests
>>>
>>> from base64 import b64encode
>>> from hashlib import sha256
>>>
>>> from fastecdsa.curve import brainpoolP256r1
>>>
>>> URL = 'http://83.136.249.46:36889'
>>>
>>> q, G = brainpoolP256r1.q, brainpoolP256r1.G
>>>
>>> x = 0xacdc
>>>
>>> admin_data = b'.'.join(map(b64encode, [b'{"alg":"EC256","typ":"JWT"}', b'{"username":"HTBAdmin1337_ZUSD3uQG4I"}']))
>>>
>>> h = int(sha256(admin_data).hexdigest(), 16)
>>> k = 1337
>>> r = (G * k).x
>>> s = pow(k, -1, q) * (h + x * r) % q
>>>
>>> admin_token = admin_data + b'.' + b64encode(r.to_bytes(32, 'big') + s.to_bytes(32, 'big'))
>>> Q = G * x
>>>
>>> res = requests.get(f'{URL}/dashboard', cookies={'token': admin_token.decode(), 'pubkey': f'{Q.x},{Q.y}'}).text
>>> print(res[res.index('HTB{'):res.index('}') + 1])
HTB{weak_randomness_is_usually_the_most_crucial_source_of_vulnerabilities}
O incluso más simple, reutilizando el código de la aplicación:
$ python3 -q
>>> import requests
>>>
>>> from crypto.jwt import Tokenizer, ecc
>>> from models.user import User
>>>
>>> URL = 'http://83.136.249.46:36889'
>>>
>>> jwt = Tokenizer()
>>> admin = User('HTBAdmin1337_ZUSD3uQG4I', '', '')
>>> admin_token = jwt.create_token(admin)
>>> Q = ecc.Q
>>>
>>> res = requests.get(f'{URL}/dashboard', cookies={'token': admin_token.decode(), 'pubkey': f'{Q.x},{Q.y}'}).text
>>> print(res[res.index('HTB{'):res.index('}') + 1])
HTB{weak_randomness_is_usually_the_most_crucial_source_of_vulnerabilities}