secure source
8 minutes to read
We are given a web project built with Python and Flask. We have this landing page:
We can register an account and then log in. We will arrive to a dashboard that only allows us to create and view notes:
So, it’s a good time to read the source code.
Source code analysis
The enpoints are listed in views.py
. There is a /dashboard
endpoint that will show the flag if we have admin
role:
@bp.route('/dashboard', methods=['GET'])
def dashboard():
cookies = request.cookies
if not ('token' in cookies and 'pubkey' in cookies):
flash('Make sure the proper cookies are set.', 'error')
return redirect(url_for('views.home'))
token = request.cookies.get('token')
pubkey = request.cookies.get('pubkey').split(',')
if not jwt.verify_token(pubkey, token):
flash('Your token could not be verified.', 'error')
return redirect(url_for('views.home'))
if not jwt.check_admin(token):
flash('Only admins can access the dashboard.', 'error')
return redirect(url_for('views.home'))
return render_template('dashboard.html', secret=open('/flag.txt').read()), 200
We see that sessions are managed with JWT (crypto/jwt.py
):
from .ecdsa import ecc
import json, time
from base64 import b64encode, b64decode
class Tokenizer:
def __init__(self):
pass
def create_token(self, user):
header = b64encode(json.dumps({'alg': 'EC256', 'typ': 'JWT'}).encode())
payload = b64encode(json.dumps({
'username': user.username,
'email': user.email,
'iat': str(int(time.time()))
}).encode())
signature = ecc.sign(header + b'.' + payload)
return header + b'.' + payload + b'.' + signature
def verify_token(self, pubkey, token):
if len(pubkey) != 2:
return False
if not (pubkey[0].isnumeric() and pubkey[1].isnumeric()):
return False
header, payload, signature = token.split('.')
return ecc.verify(pubkey, signature, header + '.' + payload)
def check_admin(self, token):
_, payload, _ = token.split('.')
payload_json = json.loads(b64decode(payload).decode())
username = payload_json['username']
return username == 'HTBAdmin1337_ZUSD3uQG4I'
As can be seen, our username must be HTBAdmin1337_ZUSD3uQG4I
. Obviously, the /register
endpoint has a check to block any attempt of registering an admin
account:
if username.startswith('HTBAdmin1337'):
return render_template('register.html', error='You are not allowed to register an admin user.'), 403
Going back to the JWT implementation, it is using Elliptic Curve Cryptography as a signature algorithm. Particularly, it is using ECDSA (crypto/ecdsa.py
):
import sys; sys.path.append('../')
from utils import ALPHABET, l2b, b2l
from .rng import generator
from .curve import *
from hashlib import sha256
import secrets, base64
from random import choices
class ECDSA:
def __init__(self):
self.x = int(secrets.token_hex(32), 16)
print(self.x)
print(self.x % q)
self.Q = G * self.x
def sign(self, m):
k = int(generator.choices(ALPHABET, 32).encode().hex(), 16)
H = int(sha256(m).hexdigest(), 16)
r = (G * k).x
s = pow(k, -1, q) * (H + self.x * r) % q
if s == 0:
self.sign(m)
return base64.b64encode(l2b(r) + l2b(s))
def verify(self, pubkey, sig, m):
_sig = base64.b64decode(sig)
_r = b2l(_sig[:32])
_s = b2l(_sig[32:])
_Q = Point(int(pubkey[0]), int(pubkey[1]), curve=E)
if [_Q.x, _Q.y] == [O.x, O.y]:
return False
if not E.is_point_on_curve((_Q.x, _Q.y)):
return False
if not ((_Q * q).x == O.y and (_Q * q).y == O.y):
return False
if not (1 <= _r <= q - 1 and 1 <= _s <= q - 1):
return False
H = int(sha256(m.encode()).hexdigest(), 16)
sinv = pow(_s, -1, q)
u1 = H * sinv % q
u2 = _r * sinv % q
U = G * u1 + _Q * u2
if [U.x, U.y] == [O.x, O.y]:
return False
return _r == U.x
ecc = ECDSA()
ECDSA
Basically, ECDSA signatures are a pair of values
Where
The verification process goes like this, where
The signature is valid id
Finding the vulnerability
The flaw here is that the nonce
k = int(generator.choices(ALPHABET, 32).encode().hex(), 16)
This generator
object comes from crypto/rng.py
:
import random
class RNG:
def __init__(self):
self.pool = list(random.getstate()[1][:-1])
def next(self):
if len(self.pool) < 1:
self.reset_pool()
return self.pool.pop(0)
def reset_pool(self):
self.pool = list(random.getstate()[1][:-1])
def choices(self, s, k):
return ''.join(random.choices(s, k=k))
generator = RNG()
This is using the built-in random
module in Python, which uses a MT19937, a type of Mersenne Twister pseudorandom number generator (PRNG).
It is well known that MT19937 can be cracked with enough outputs of the PRNG. But this time, it is easier than that, because the initial state is saved into the attribute pool
. This state is composed by 624 32-bit integers. Notice that the next
method simply pops each of the values of pool
until it is empty.
If we are able to collect those 624 values, we can reconstruct the initial state and find the nonce
The PRNG is used to set the id
of new notes (models/note.py
):
from crypto.rng import generator
class Note:
def __init__(self, title, description):
self.id = generator.next()
self.title = title
self.description = description
def __str__(self):
return f'Note({self.id}, {self.title}, {self.description})'
def __json_serialize__(self):
return {
'id': self.id,
'title': self.title,
'description': self.description
}
So, we will need to use /create-note
and /view-note
several times to get the 624 IDs:
@bp.route('/create-note', methods=['GET', 'POST'])
def create_note():
if not 'logged_in_user' in session:
flash('To create a note you need to be logged in first.', 'error')
return redirect(url_for('views.login'))
if request.method == 'POST':
data = request.form.to_dict()
logged_in_user = session['logged_in_user']
if not ('title' in data and 'description' in data):
flash('You must provide a note title and description.', 'error')
return redirect(url_for('views.create_note'))
title = data['title']
description = data['description']
new_note = Note(title, description)
if new_note != database.insert(new_note, 'notes'):
flash('An internal database error occured. Contact an admin!', 'error')
return redirect(url_for('views.create_note'))
flash('Your note was successfully saved.', 'message')
return redirect(url_for('views.home'))
return render_template('create_note.html'), 200
@bp.route('/view-notes', methods=['GET', 'POST'])
def view_notes():
if not 'logged_in_user' in session:
flash('To view your notes you need to be logged in first.', 'error')
return redirect(url_for('views.login'))
logged_in_user = session['logged_in_user']
notes = database.fetch_user_notes(logged_in_user)
if request.method == 'GET':
return render_template('view_notes.html', notes=notes), 200
else:
return jsonify({'notes': list(map(Note.__json_serialize__, notes))})
They will be present in the HTML response text:
<main>
{% if notes %}
<ul>
{% for note in notes %}
<li>
<h2>{{ note.title }}</h2>
<p>ID : {{ note.id }}</p>
<p>Description : {{ note.description }}</p>
</li>
{% endfor %}
</ul>
{% else %}
<p>No notes found.</p>
{% endif %}
</main>
Solution
So, let’s start writing our solution. First we register and log in with a random account:
username = password = os.urandom(8).hex()
requests.post(f'{URL}/register', data={'username': username, 'password': password, 'email': 'x'})
s = requests.session()
s.post(f'{URL}/login', data={'username': username, 'password': password})
Now, we can create 624 notes and then read all the IDs in order to reconstruct the PRNG state:
for _ in range(624):
s.post(f'{URL}/create-note', data={'title': 'A', 'description': 'B'})
res = s.post(f'{URL}/view-notes')
notes = res.json().get('notes', [])
state = tuple(n.get('id', 0) for n in notes)
random.setstate((3, state + (624, ), None))
Now we extract the signature
token = s.cookies.get('token')
assert isinstance(token, str)
signature = b64decode(token.split('.')[-1])
r, s = int(signature[:32].hex(), 16), int(signature[32:].hex(), 16)
Next, take our JWT information, compute the same
q, G = brainpoolP256r1.q, brainpoolP256r1.G
h = int(sha256('.'.join(token.split('.')[:-1]).encode()).hexdigest(), 16)
k = int(''.join(random.choices(printable, k=32)).encode().hex(), 16)
assert r == (G * k).x
x = (s * k - h) * pow(r, -1, q) % q
With this, we can sign a JWT with admin
role:
admin_data = b'.'.join(map(b64encode, [b'{"alg":"EC256","typ":"JWT"}', b'{"username":"HTBAdmin1337_ZUSD3uQG4I"}']))
h = int(sha256(admin_data).hexdigest(), 16)
k = 1337
r = (G * k).x
s = pow(k, -1, q) * (h + x * r) % q
admin_token = admin_data + b'.' + b64encode(r.to_bytes(32, 'big') + s.to_bytes(32, 'big'))
Q = G * x
Finally, we send the token along with the public key in order to get the flag:
res = requests.get(f'{URL}/dashboard', cookies={'token': admin_token.decode(), 'pubkey': f'{Q.x},{Q.y}'}).text
print(res[res.index('HTB{'):res.index('}') + 1])
Flag
If we run the script, we will get the flag:
$ python3 solve.py 83.136.249.46:36889
HTB{weak_randomness_is_usually_the_most_crucial_source_of_vulnerabilities}
The full script can be found in here: solve.py
.
Unintended way
Going back to the /dashboard
endpoint, let’s examine the code carefully:
@bp.route('/dashboard', methods=['GET'])
def dashboard():
cookies = request.cookies
if not ('token' in cookies and 'pubkey' in cookies):
flash('Make sure the proper cookies are set.', 'error')
return redirect(url_for('views.home'))
token = request.cookies.get('token')
pubkey = request.cookies.get('pubkey').split(',')
if not jwt.verify_token(pubkey, token):
flash('Your token could not be verified.', 'error')
return redirect(url_for('views.home'))
if not jwt.check_admin(token):
flash('Only admins can access the dashboard.', 'error')
return redirect(url_for('views.home'))
return render_template('dashboard.html', secret=open('/flag.txt').read()), 200
This endpoint requires cookies token
and pubkey
… Wait, pubkey
? Remember how signatures work, we sign with the private key and others can verify our signature with our public key. Correct, so we can sign our admin
token with a new private key and set pubkey
to the associated public key. We don’t even need to register and log in!
$ python3 -q
>>> import requests
>>>
>>> from base64 import b64encode
>>> from hashlib import sha256
>>>
>>> from fastecdsa.curve import brainpoolP256r1
>>>
>>> URL = 'http://83.136.249.46:36889'
>>>
>>> q, G = brainpoolP256r1.q, brainpoolP256r1.G
>>>
>>> x = 0xacdc
>>>
>>> admin_data = b'.'.join(map(b64encode, [b'{"alg":"EC256","typ":"JWT"}', b'{"username":"HTBAdmin1337_ZUSD3uQG4I"}']))
>>>
>>> h = int(sha256(admin_data).hexdigest(), 16)
>>> k = 1337
>>> r = (G * k).x
>>> s = pow(k, -1, q) * (h + x * r) % q
>>>
>>> admin_token = admin_data + b'.' + b64encode(r.to_bytes(32, 'big') + s.to_bytes(32, 'big'))
>>> Q = G * x
>>>
>>> res = requests.get(f'{URL}/dashboard', cookies={'token': admin_token.decode(), 'pubkey': f'{Q.x},{Q.y}'}).text
>>> print(res[res.index('HTB{'):res.index('}') + 1])
HTB{weak_randomness_is_usually_the_most_crucial_source_of_vulnerabilities}
Or even shorter, reusing code from the application:
$ python3 -q
>>> import requests
>>>
>>> from crypto.jwt import Tokenizer, ecc
>>> from models.user import User
>>>
>>> URL = 'http://83.136.249.46:36889'
>>>
>>> jwt = Tokenizer()
>>> admin = User('HTBAdmin1337_ZUSD3uQG4I', '', '')
>>> admin_token = jwt.create_token(admin)
>>> Q = ecc.Q
>>>
>>> res = requests.get(f'{URL}/dashboard', cookies={'token': admin_token.decode(), 'pubkey': f'{Q.x},{Q.y}'}).text
>>> print(res[res.index('HTB{'):res.index('}') + 1])
HTB{weak_randomness_is_usually_the_most_crucial_source_of_vulnerabilities}