Skip to content

Commit

Permalink
Merge pull request #436 from EduardSchwarzkopf/improve-tests
Browse files Browse the repository at this point in the history
Improve tests
  • Loading branch information
DaveYesland authored Aug 3, 2024
2 parents 470b96a + b99ed79 commit 6de1e7e
Show file tree
Hide file tree
Showing 12 changed files with 402 additions and 219 deletions.
86 changes: 86 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import os
import pytest
from pacu import settings, Main
from pacu import core
from pacu.core.models import PacuSession
from sqlalchemy import orm, Column, create_engine
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy_utils import JSONType
from sqlalchemy.engine import Engine

from pacu.modules.cognito__attack.tests.dataclasses import AWSCredentials

settings.DATABASE_CONNECTION_PATH = "sqlite:///:memory:"


@pytest.fixture(scope="function")
def db() -> core.base:
core.base.engine: Engine = create_engine(settings.DATABASE_CONNECTION_PATH)
core.base.Session: sessionmaker = sessionmaker(bind=core.base.engine)
core.base.Base.metadata.create_all(core.base.engine)
yield core.base.Session()


@pytest.fixture(scope="function")
def pacu(db, aws_credentials: AWSCredentials, active_session: PacuSession):
pacu = Main()
pacu.database = db

pacu.set_keys(
key_alias="pytest",
access_key_id=aws_credentials.access_key_id,
secret_access_key=aws_credentials.secret_access_key,
session_token=aws_credentials.session_token,
)

yield pacu


@pytest.fixture(scope="function")
def active_session(db, pacu_session: PacuSession):
pacu_session.activate(db)
yield pacu_session


@pytest.fixture(scope="function")
def pacu_session(db: orm.session.Session):
query: orm.Query = db.query(PacuSession)
assert query.count() == 0

pacu_session = PacuSession(name="test")
db.add(pacu_session)
yield pacu_session


@pytest.fixture(scope="function")
def db_new_column(db: Session):
PacuSession.TestSvc = Column(JSONType, nullable=False, default=dict)
PacuSession.aws_data_field_names = PacuSession.aws_data_field_names + ("TestSvc",)
core.base.Session: sessionmaker = sessionmaker(bind=core.base.engine)
yield core.base.Session()


@pytest.fixture(scope="function")
def pacu_with_data(pacu: Main, active_session: PacuSession):
active_session.update(pacu.database, CloudWatch={"test_key": "test_value"})
yield pacu


@pytest.fixture(autouse=True)
def aws_credentials():
"""Mocked AWS Credentials for moto."""

value = "testing"
creds = AWSCredentials(
access_key_id=value,
secret_access_key=value,
session_token=value,
security_token=value,
)

os.environ["AWS_ACCESS_KEY_ID"] = creds.access_key_id
os.environ["AWS_SECRET_ACCESS_KEY"] = creds.secret_access_key
os.environ["AWS_SECURITY_TOKEN"] = creds.session_token
os.environ["AWS_SESSION_TOKEN"] = creds.session_token

yield creds
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import pytest
import moto
import boto3
import os


@pytest.fixture(scope="function")
def s3():
with moto.mock_s3():
yield boto3.client("s3", region_name="us-east-1")


@pytest.fixture
def environ():
os.environ["PRINCIPAL"] = "asdf"
Original file line number Diff line number Diff line change
@@ -1,66 +1,61 @@
import os
import re
import boto3
import moto

import pytest

from chalice.test import Client
from pacu.modules.cfn__resource_injection.cfn__resource_injection_lambda.app import app, add_role_dict, add_role_bytes, fetch, update
from pacu.modules.cfn__resource_injection.cfn__resource_injection_lambda.app import (
add_role_dict,
add_role_bytes,
update,
)


@pytest.fixture
def environ():
os.environ['PRINCIPAL'] = 'asdf'
def upload_json(environ, s3):
s3.create_bucket(Bucket="test-bucket")
s3.put_object(Bucket="test-bucket", Key="test-key", Body=new_cfn_json)
return "test-bucket", "test-key", new_cfn_json


def test_add_role_dict(environ):
@pytest.mark.usefixtures("environ")
def test_add_role_dict():
cfn = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "Test CloudFormation Template",
"Globals": {},
"Outputs": {},
"Resources": {
"OurBucket": {
"Properties": {},
"Type": "AWS::S3::Bucket"
}
}
"Resources": {"OurBucket": {"Properties": {}, "Type": "AWS::S3::Bucket"}},
}

assert {
'AWSTemplateFormatVersion': '2010-09-09',
'Description': 'Test CloudFormation Template',
'Globals': {},
'Outputs': {},
'Resources': {
'MaintenanceRole': {
'Properties': {
'AssumeRolePolicyDocument': '{"Version": "2012-10-17", "Statement": [{"Effect": "Allow", '
'"Principal": {"AWS": "asdf"}, "Action": "sts:AssumeRole"}]}',
'Policies': [
{
'PolicyDocument': {
'Statement': [
{
'Action': '*',
'Effect': 'Allow',
'Resource': '*'
}
],
'Version': '2012-10-17',
},
'PolicyName': 'default',
}
]
},
'Type': 'AWS::IAM::Role',
},
'OurBucket': {
'Properties': {},
'Type': 'AWS::S3::Bucket',
}
}
} == add_role_dict(cfn)
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "Test CloudFormation Template",
"Globals": {},
"Outputs": {},
"Resources": {
"MaintenanceRole": {
"Properties": {
"AssumeRolePolicyDocument": '{"Version": "2012-10-17", "Statement": [{"Effect": "Allow", '
'"Principal": {"AWS": "asdf"}, "Action": "sts:AssumeRole"}]}',
"Policies": [
{
"PolicyDocument": {
"Statement": [
{"Action": "*", "Effect": "Allow", "Resource": "*"}
],
"Version": "2012-10-17",
},
"PolicyName": "default",
}
],
},
"Type": "AWS::IAM::Role",
},
"OurBucket": {
"Properties": {},
"Type": "AWS::S3::Bucket",
},
},
} == add_role_dict(cfn)


new_cfn_json = b"""
Expand Down Expand Up @@ -134,49 +129,27 @@ def test_add_role_bytes_yaml(environ):
add_role_bytes(new_cfn_yaml)


@pytest.fixture
def upload_json(environ, s3):
s3.create_bucket(Bucket='test-bucket')
s3.put_object(Bucket='test-bucket', Key='test-key', Body=new_cfn_json)
return 'test-bucket', 'test-key', new_cfn_json


@pytest.fixture
def aws_credentials():
"""Mocked AWS Credentials for moto."""
os.environ['AWS_ACCESS_KEY_ID'] = 'testing'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'testing'
os.environ['AWS_SECURITY_TOKEN'] = 'testing'
os.environ['AWS_SESSION_TOKEN'] = 'testing'


@pytest.fixture(scope='function')
def s3(aws_credentials):
with moto.mock_s3():
yield boto3.client('s3', region_name='us-east-1')


def test_update(s3, upload_json):
bucket, key, _ = upload_json[0], upload_json[1], upload_json[2]

body = s3.get_object(Bucket=bucket, Key=key)['Body'].read()
body = s3.get_object(Bucket=bucket, Key=key)["Body"].read()
assert clean_json(new_cfn_json.decode()) == clean_json(body.decode())

update(s3, bucket, key)

body = s3.get_object(Bucket=bucket, Key=key)['Body'].read()
body = s3.get_object(Bucket=bucket, Key=key)["Body"].read()
assert clean_json(modified_cfn_json.decode()) == clean_json(body.decode())


def test_update_second_time(s3, upload_json):
bucket, key, body = upload_json[0], upload_json[1], upload_json[2]
bucket, key = upload_json[0], upload_json[1]
update(s3, bucket, key)
resp = s3.get_object(Bucket=bucket, Key=key)
a = clean_json(modified_cfn_json.decode())
b = clean_json(resp['Body'].read().decode())
b = clean_json(resp["Body"].read().decode())
assert a == b


def clean_json(s: str):
c = re.compile(r'\s')
return c.sub('', s)
c = re.compile(r"\s")
return c.sub("", s)
37 changes: 22 additions & 15 deletions pacu/modules/cognito__attack/main.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import re
import argparse
import base64
import json
import re
import webbrowser
from copy import deepcopy
from dataclasses import dataclass
from typing import Dict, List, Optional

import qrcode
import argparse
import json
from botocore.client import BaseClient
from botocore.exceptions import ClientError
from pycognito.aws_srp import AWSSRP
from dataclasses import dataclass
from typing import List, Dict, Optional
from pycognito.exceptions import SoftwareTokenMFAChallengeException
from copy import deepcopy
from botocore.exceptions import ClientError

from pacu import Main
from botocore.client import BaseClient

# Using Spencer's iam_enum.py as a template

Expand Down Expand Up @@ -500,9 +502,12 @@ def main(args, pacu_main: Main):
"Your refresh token is: "
+ tokens["AuthenticationResult"]["RefreshToken"]
)
print(
"Your token type is: " + tokens["AuthenticationResult"]["TokenType"]
)

if "TokenType" in tokens["AuthenticationResult"]:
print(
"Your token type is: "
+ tokens["AuthenticationResult"]["TokenType"]
)
attack_user["Username"] = username
attack_user["Region"] = up_client["Region"]
attack_user["UserPoolId"] = up_client["UserPoolId"]
Expand All @@ -516,9 +521,11 @@ def main(args, pacu_main: Main):
attack_user["Tokens"]["RefreshToken"] = tokens["AuthenticationResult"][
"RefreshToken"
]
attack_user["Tokens"]["TokenType"] = tokens["AuthenticationResult"][
"TokenType"
]

if "TokenType" in tokens["AuthenticationResult"]:
attack_user["Tokens"]["TokenType"] = tokens["AuthenticationResult"][
"TokenType"
]
credentials = get_identity_credentials(
cognito_identity_pools,
identity_client,
Expand Down Expand Up @@ -944,7 +951,7 @@ def main(args, pacu_main: Main):
for var in vars(args):
if var == "regions":
continue
if not getattr(args, var) and ARG_FIELD_MAPPER[var] in gathered_data:
if not getattr(args, var) and ARG_FIELD_MAPPER.get(var) in gathered_data:
del gathered_data[ARG_FIELD_MAPPER[var]]

cognito_data = deepcopy(session.Cognito)
Expand Down
48 changes: 48 additions & 0 deletions pacu/modules/cognito__attack/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import boto3
import moto
import pytest

from pacu.modules.cognito__attack.tests.dataclasses import CognitoServiceConfig
from pacu.settings import REGION


@pytest.fixture
def mock_cognito_user_pool():

with moto.mock_cognitoidp():

client = boto3.client(
"cognito-idp",
region_name=REGION,
)

response = client.create_user_pool(
PoolName="TestUserPool", UsernameAttributes=["email"]
)

user_pool_id = response["UserPool"]["Id"]

client_response = client.create_user_pool_client(
ClientName="AppClient",
GenerateSecret=False,
UserPoolId=user_pool_id,
)

with moto.mock_cognitoidentity():
c = boto3.client(
"cognito-identity",
region_name=REGION,
)

c_resposnse = c.create_identity_pool(
IdentityPoolName="TestIdentityPool",
AllowUnauthenticatedIdentities=False,
)

yield CognitoServiceConfig(
client=client,
user_pool_id=user_pool_id,
client_id=client_response["UserPoolClient"]["ClientId"],
client_name=client_response["UserPoolClient"]["ClientName"],
identity_pool_id=c_resposnse["IdentityPoolId"],
)
Loading

0 comments on commit 6de1e7e

Please sign in to comment.