Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DOP-20962] Add scheduler integration test #118

Merged
merged 11 commits into from
Nov 6, 2024
79 changes: 79 additions & 0 deletions .github/workflows/scheduler-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
name: Scheduler Tests
on:
workflow_call:

env:
DEFAULT_PYTHON: '3.12'

jobs:
tests:
name: Run Scheduler tests
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Set up QEMU
uses: docker/setup-qemu-action@v3

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3

- name: Cache jars
uses: actions/cache@v4
with:
path: ./cached_jars
key: ${{ runner.os }}-python-${{ env.DEFAULT_PYTHON }}-test-scheduler
restore-keys: |
${{ runner.os }}-python-${{ env.DEFAULT_PYTHON }}-test-scheduler
${{ runner.os }}-python-

- name: Build Worker Image
uses: docker/build-push-action@v6
with:
context: .
tags: mtsrus/syncmaster-worker:${{ github.sha }}
target: test
file: docker/Dockerfile.worker
load: true
cache-from: mtsrus/syncmaster-worker:develop

- name: Docker compose up
run: |
docker compose -f docker-compose.test.yml --profile all down -v --remove-orphans
docker compose -f docker-compose.test.yml --profile worker up -d --wait --wait-timeout 200
env:
WORKER_IMAGE_TAG: ${{ github.sha }}

# This is important, as coverage is exported after receiving SIGTERM
- name: Run Scheduler Tests
run: |
docker compose -f ./docker-compose.test.yml --profile worker exec -T worker coverage run -m pytest -vvv -s -m "worker and scheduler_integration"

- name: Dump worker logs on failure
if: failure()
uses: jwalton/gh-docker-logs@v2
with:
images: mtsrus/syncmaster-worker
dest: ./logs

- name: Shutdown
if: always()
run: |
docker compose -f docker-compose.test.yml --profile all down -v --remove-orphans

- name: Upload worker logs
uses: actions/upload-artifact@v4
if: failure()
with:
name: worker-logs-scheduler
path: logs/*

- name: Upload coverage results
uses: actions/upload-artifact@v4
with:
name: coverage-scheduler
path: reports/*
# https://github.com/actions/upload-artifact/issues/602
include-hidden-files: true
4 changes: 4 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ jobs:
name: S3 tests
uses: ./.github/workflows/s3-tests.yml

scheduler_tests:
name: Scheduler tests
uses: ./.github/workflows/scheduler-tests.yml

unit_tests:
name: Unit tests
uses: ./.github/workflows/unit-test.yml
Expand Down
3 changes: 2 additions & 1 deletion docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ services:
context: .
target: test
command: --loglevel=info -Q test_queue
entrypoint: [python, -m, celery, -A, tests.test_integration.celery_test, worker, --max-tasks-per-child=1]
env_file: .env.docker
volumes:
- ./syncmaster:/app/syncmaster
Expand All @@ -90,7 +91,7 @@ services:
condition: service_healthy
rabbitmq:
condition: service_healthy
profiles: [worker, s3, oracle, hdfs, hive, all]
profiles: [worker, scheduler, s3, oracle, hdfs, hive, all]

test-postgres:
image: postgres
Expand Down
2 changes: 2 additions & 0 deletions syncmaster/scheduler/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from syncmaster.scheduler.transfer_fetcher import TransferFetcher
from syncmaster.scheduler.transfer_job_manager import TransferJobManager
2 changes: 0 additions & 2 deletions syncmaster/settings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ class Settings(BaseSettings):
SCHEDULER_TRANSFER_FETCHING_TIMEOUT: int = 180 # seconds
SCHEDULER_MISFIRE_GRACE_TIME: int = 300 # seconds

CORRELATION_CELERY_HEADER_ID: str = "CORRELATION_CELERY_HEADER_ID"

TOKEN_EXPIRED_TIME: int = 60 * 60 * 10 # 10 hours
CREATE_SPARK_SESSION_FUNCTION: ImportString = "syncmaster.worker.spark.get_worker_spark_session"

Expand Down
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"tests.test_unit.test_runs.run_fixtures",
"tests.test_unit.test_connections.connection_fixtures",
"tests.test_unit.test_scheduler.scheduler_fixtures",
"tests.test_integration.test_scheduler.scheduler_fixtures",
]


Expand Down
3 changes: 3 additions & 0 deletions tests/test_integration/celery_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from syncmaster.worker.config import celery

celery.conf.update(imports=list(celery.conf.imports) + ["tests.test_integration.test_scheduler.test_task"])
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from tests.test_integration.test_scheduler.scheduler_fixtures.mocker_fixtures import (
mock_add_job,
mock_send_task_to_tick,
)
from tests.test_integration.test_scheduler.scheduler_fixtures.transfer_fixture import (
group_transfer_integration_mock,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import asyncio

import pytest
from apscheduler.triggers.cron import CronTrigger
from pytest_mock import MockerFixture, MockType

from syncmaster.scheduler.transfer_job_manager import TransferJobManager
from syncmaster.worker.config import celery


@pytest.fixture
def mock_send_task_to_tick(mocker: MockerFixture) -> MockType:
original_to_thread = asyncio.to_thread
return mocker.patch(
"asyncio.to_thread",
new=lambda func, *args, **kwargs: original_to_thread(celery.send_task, "tick", *args[1:], **kwargs),
)


@pytest.fixture
def mock_add_job(mocker: MockerFixture, transfer_job_manager: TransferJobManager) -> MockType:
original_add_job = transfer_job_manager.scheduler.add_job
return mocker.patch.object(
transfer_job_manager.scheduler,
"add_job",
side_effect=lambda func, id, trigger, misfire_grace_time, args: original_add_job(
func=func,
id=id,
trigger=CronTrigger(second="*"),
misfire_grace_time=misfire_grace_time,
args=args,
),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
from collections.abc import AsyncGenerator

import pytest_asyncio
from sqlalchemy.ext.asyncio import AsyncSession

from syncmaster.backend.api.v1.auth.utils import sign_jwt
from syncmaster.db.repositories.utils import decrypt_auth_data
from syncmaster.settings import Settings
from tests.mocks import (
MockConnection,
MockCredentials,
MockGroup,
MockTransfer,
MockUser,
UserTestRoles,
)
from tests.test_unit.conftest import create_group_member
from tests.test_unit.utils import (
create_connection,
create_credentials,
create_group,
create_queue,
create_transfer,
create_user,
)


@pytest_asyncio.fixture
async def group_transfer_integration_mock(
session: AsyncSession,
settings: Settings,
create_connection_data: dict | None,
create_transfer_data: dict | None,
) -> AsyncGenerator[MockTransfer, None]:
group_owner = await create_user(
session=session,
username="group_transfer_owner",
is_active=True,
)
group = await create_group(
session=session,
name="group_for_group_transfer",
owner_id=group_owner.id,
)

queue = await create_queue(
session=session,
name="test_queue",
group_id=group.id,
)

members: list[MockUser] = []
for username in (
"transfer_group_member_maintainer",
"transfer_group_member_developer",
"transfer_group_member_guest",
):
members.append(
await create_group_member(
username=username,
group_id=group.id,
session=session,
settings=settings,
),
)

await session.commit()
mock_group = MockGroup(
group=group,
owner=MockUser(
user=group_owner,
auth_token=sign_jwt(group_owner.id, settings),
role=UserTestRoles.Owner,
),
members=members,
)

source_connection = await create_connection(
session=session,
name="group_transfer_source_connection",
group_id=group.id,
data=create_connection_data,
)
source_connection_creds = await create_credentials(
session=session,
settings=settings,
connection_id=source_connection.id,
)
target_connection = await create_connection(
session=session,
name="group_transfer_target_connection",
group_id=group.id,
data=create_connection_data,
)
target_connection_creds = await create_credentials(
session=session,
settings=settings,
connection_id=target_connection.id,
)

transfer = await create_transfer(
session=session,
name="group_transfer",
group_id=group.id,
source_connection_id=source_connection.id,
target_connection_id=target_connection.id,
queue_id=queue.id,
source_params=create_transfer_data,
target_params=create_transfer_data,
)

yield MockTransfer(
transfer=transfer,
source_connection=MockConnection(
connection=source_connection,
owner_group=mock_group,
credentials=MockCredentials(
value=decrypt_auth_data(source_connection_creds.value, settings=settings),
connection_id=source_connection.id,
),
),
target_connection=MockConnection(
connection=target_connection,
owner_group=mock_group,
credentials=MockCredentials(
value=decrypt_auth_data(target_connection_creds.value, settings=settings),
connection_id=target_connection.id,
),
),
owner_group=mock_group,
)
await session.delete(transfer)
await session.delete(source_connection)
await session.delete(target_connection)
await session.delete(group)
await session.delete(group_owner)
await session.delete(queue)
for member in members:
await session.delete(member.user)
await session.commit()
51 changes: 51 additions & 0 deletions tests/test_integration/test_scheduler/test_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import asyncio

import pytest
from pytest_mock import MockType
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from syncmaster.db.models import Run, Status
from syncmaster.scheduler import TransferFetcher, TransferJobManager
from syncmaster.settings import Settings
from tests.mocks import MockTransfer

pytestmark = [pytest.mark.asyncio, pytest.mark.worker, pytest.mark.scheduler_integration]


async def test_scheduler(
session: AsyncSession,
settings: Settings,
group_transfer_integration_mock: MockTransfer,
transfer_job_manager: TransferJobManager,
mock_send_task_to_tick: MockType,
mock_add_job: MockType,
):
group_transfer = group_transfer_integration_mock
transfer_fetcher = TransferFetcher(settings)
transfers = await transfer_fetcher.fetch_updated_jobs()
assert transfers
assert group_transfer.transfer.id in {t.id for t in transfers}

transfer_job_manager.update_jobs(transfers)

job = transfer_job_manager.scheduler.get_job(str(group_transfer.id))
assert job is not None

await asyncio.sleep(1.5) # make sure that created job with every-second cron worked

run = await session.scalar(
select(Run).filter_by(transfer_id=group_transfer.id).order_by(Run.created_at.desc()),
)
assert run is not None
assert run.status in [Status.CREATED, Status.STARTED]

for _ in range(3):
await asyncio.sleep(2)
await session.refresh(run)
run = await session.scalar(select(Run, run.id))
if run.status == Status.FINISHED:
break

assert run.status == Status.FINISHED
assert run.ended_at is not None
28 changes: 28 additions & 0 deletions tests/test_integration/test_scheduler/test_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import time
from datetime import datetime, timezone

from sqlalchemy.orm import Session

from syncmaster.db.models.run import Run, Status
from syncmaster.exceptions.run import RunNotFoundError
from syncmaster.worker.base import WorkerTask
from syncmaster.worker.config import celery


@celery.task(name="tick", bind=True, track_started=True)
def tick(self: WorkerTask, run_id: int) -> None:
with Session(self.engine) as session:
run = session.get(Run, run_id)
if run is None:
raise RunNotFoundError

run.started_at = datetime.now(tz=timezone.utc)
run.status = Status.STARTED
session.add(run)
session.commit()

time.sleep(2) # to make sure that previous status is handled in test
run.status = Status.FINISHED
run.ended_at = datetime.now(tz=timezone.utc)
session.add(run)
session.commit()
Loading
Loading