Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configure loggers for function #1421

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions client/qiskit_serverless/core/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,11 @@ def stop(self, job_id: str, service: Optional[QiskitRuntimeService] = None):
"""Stops job/program."""
raise NotImplementedError

def logs(self, job_id: str):
def logs(self, job_id: str, log_type: Optional[str] = None):
"""Return logs."""
raise NotImplementedError

def filtered_logs(self, job_id: str, **kwargs):
def filtered_logs(self, job_id: str, log_type: Optional[str] = None, **kwargs):
"""Return filtered logs."""
raise NotImplementedError

Expand Down Expand Up @@ -174,10 +174,10 @@ def status(self, job_id: str):
def stop(self, job_id: str, service: Optional[QiskitRuntimeService] = None):
return self._job_client.stop_job(job_id)

def logs(self, job_id: str):
def logs(self, job_id: str, log_type: Optional[str] = None):
return self._job_client.get_job_logs(job_id)

def filtered_logs(self, job_id: str, **kwargs):
def filtered_logs(self, job_id: str, log_type: Optional[str] = None, **kwargs):
raise NotImplementedError

def result(self, job_id: str):
Expand Down Expand Up @@ -248,7 +248,7 @@ def stop(self, job_id: str, service: Optional[QiskitRuntimeService] = None):
"""Stops job/program."""
return f"job:{job_id} has already stopped"

def logs(self, job_id: str):
def logs(self, job_id: str, log_type: Optional[str] = None):
return self._jobs[job_id]["logs"]

def result(self, job_id: str):
Expand All @@ -260,7 +260,7 @@ def get(self, job_id) -> Optional["Job"]:
def list(self, **kwargs) -> List["Job"]:
return [job["job"] for job in list(self._jobs.values())]

def filtered_logs(self, job_id: str, **kwargs):
def filtered_logs(self, job_id: str, log_type: Optional[str] = None, **kwargs):
"""Return filtered logs."""
raise NotImplementedError

Expand Down Expand Up @@ -481,20 +481,21 @@ def stop(self, job_id: str, service: Optional[QiskitRuntimeService] = None):

return response_data.get("message")

def logs(self, job_id: str):
def logs(self, job_id: str, log_type: Optional[str] = None):
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("job.logs"):
response_data = safe_json_request(
request=lambda: requests.get(
f"{self.host}/api/{self.version}/jobs/{job_id}/logs/",
headers={"Authorization": f"Bearer {self._token}"},
params={"log_type": log_type},
timeout=REQUESTS_TIMEOUT,
)
)
return response_data.get("logs")

def filtered_logs(self, job_id: str, **kwargs):
all_logs = self.logs(job_id=job_id)
def filtered_logs(self, job_id: str, log_type: Optional[str] = None, **kwargs):
all_logs = self.logs(job_id=job_id, log_type=log_type)
included = ""
include = kwargs.get("include")
if include is not None:
Expand Down Expand Up @@ -670,17 +671,21 @@ def stop(self, service: Optional[QiskitRuntimeService] = None):
"""Stops the job from running."""
return self._job_client.stop(self.job_id, service=service)

def logs(self) -> str:
def logs( # pylint: disable=unused-argument
self, log_type: Optional[str] = None
) -> str:
"""Returns logs of the job."""
return self._job_client.logs(self.job_id)
return self._job_client.logs(self.job_id, log_type)

def filtered_logs(self, **kwargs) -> str:
def filtered_logs(self, log_type: Optional[str] = None, **kwargs) -> str:
"""Returns logs of the job.
Args:
include: rex expression finds match in the log line to be included
exclude: rex expression finds match in the log line to be excluded
"""
return self._job_client.filtered_logs(job_id=self.job_id, **kwargs)
return self._job_client.filtered_logs(
job_id=self.job_id, log_type=log_type, **kwargs
)

def result(self, wait=True, cadence=5, verbose=False, maxwait=0):
"""Return results of the job.
Expand Down
45 changes: 44 additions & 1 deletion gateway/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,19 @@ def result(self, request, pk=None): # pylint: disable=invalid-name,unused-argum
serializer = self.get_serializer(job)
return Response(serializer.data)

def read_log(self, path):
"""read log file"""
if os.path.exists(path):
with open(path, "r", encoding="UTF-8") as log_file:
log = log_file.read()
else:
log = "no log yet"
return log

@action(methods=["GET"], detail=True)
def logs(self, request, pk=None): # pylint: disable=invalid-name,unused-argument
def logs( # pylint: disable=invalid-name,unused-argument,too-many-return-statements
self, request, pk=None
):
"""Returns logs from job."""
tracer = trace.get_tracer("gateway.tracer")
ctx = TraceContextTextMapPropagator().extract(carrier=request.headers)
Expand All @@ -471,6 +482,38 @@ def logs(self, request, pk=None): # pylint: disable=invalid-name,unused-argumen
return Response(status=404)
logs = job.logs
author = self.request.user
log_type = request.query_params.get("log_type")
if log_type:
if log_type == "function" and job.program and job.program.provider:
if job.program.provider.admin_group in author.groups.all():
return Response(
{
"logs": self.read_log(
f"{settings.MEDIA_ROOT}/{job.program.provider.name}/{job.id}/function.log" # pylint: disable=line-too-long
)
}
)
return Response({"logs": "No available logs"})
if log_type == "user":
if author == job.author:
return Response(
{
"logs": self.read_log(
f"{settings.MEDIA_ROOT}/{author.username}/{job.id}/user.log"
)
}
)
if job.program and job.program.provider:
if job.program.provider.admin_group in author.groups.all():
return Response(
{
"logs": self.read_log(
f"{settings.MEDIA_ROOT}/{author.username}/{job.id}/user.log"
)
}
)
return Response({"logs": "No available logs"})
author = self.request.user
if job.program and job.program.provider:
if job.program.provider.admin_group in author.groups.all():
return Response({"logs": logs})
Expand Down
23 changes: 23 additions & 0 deletions gateway/templates/main.tmpl
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Default entrypoint."""

import logging
import os
import sys

Expand All @@ -9,6 +10,28 @@ sys.path.append("{{mount_path}}")

from {{package_name}} import Runner

job_id = os.environ.get("ENV_JOB_ID_GATEWAY", "NOJOB")
if not os.path.exists(os.path.join("/data", job_id)):
os.mkdir(os.path.join("/data", job_id))
if not os.path.exists(os.path.join("/function_data", job_id)):
os.mkdir(os.path.join("/function_data", job_id))

user_logger = logging.getLogger("user")
function_logger = logging.getLogger("function")

user_logger.setLevel(logging.INFO)
function_logger.setLevel(logging.INFO)

file_handler = logging.FileHandler(f"/data/{job_id}/user.log")
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
file_handler.setFormatter(formatter)
user_logger.addHandler(file_handler)

file_handler = logging.FileHandler(f"/function_data/{job_id}/function.log")
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
file_handler.setFormatter(formatter)
function_logger.addHandler(file_handler)

arguments = get_arguments()

runner = Runner()
Expand Down
106 changes: 106 additions & 0 deletions gateway/tests/api/test_job.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Tests jobs APIs."""

import os
from django.urls import reverse
from rest_framework import status
from rest_framework.test import APITestCase
Expand Down Expand Up @@ -136,3 +137,108 @@ def test_job_logs(self):
)
self.assertEqual(jobs_response.status_code, status.HTTP_200_OK)
self.assertEqual(jobs_response.data.get("logs"), "No available logs")

def test_job_logs_type_user_by_author(self):
"""Tests job log non-authorized."""
user = models.User.objects.get(username="test_user")
self.client.force_authenticate(user=user)

media_root = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"..",
"resources",
"fake_media",
)
media_root = os.path.normpath(os.path.join(os.getcwd(), media_root))
with self.settings(MEDIA_ROOT=media_root):
jobs_response = self.client.get(
reverse("v1:jobs-logs", args=["1a7947f9-6ae8-4e3d-ac1e-e7d608deec82"]),
{"log_type": "user"},
format="json",
)
self.assertEqual(jobs_response.status_code, status.HTTP_200_OK)
self.assertTrue("test_user user log line 1" in jobs_response.data.get("logs"))

def test_job_logs_type_user_by_provider(self):
"""Tests job log non-authorized."""
user = models.User.objects.get(username="test_user_2")
self.client.force_authenticate(user=user)

media_root = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"..",
"resources",
"fake_media",
)
media_root = os.path.normpath(os.path.join(os.getcwd(), media_root))
with self.settings(MEDIA_ROOT=media_root):
jobs_response = self.client.get(
reverse("v1:jobs-logs", args=["1a7947f9-6ae8-4e3d-ac1e-e7d608deec85"]),
{"log_type": "user"},
format="json",
)
self.assertEqual(jobs_response.status_code, status.HTTP_200_OK)
self.assertTrue("test_user_2 user log line 1" in jobs_response.data.get("logs"))

def test_job_logs_type_user(self):
"""Tests job log non-authorized."""
user = models.User.objects.get(username="test_user_3")
self.client.force_authenticate(user=user)

media_root = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"..",
"resources",
"fake_media",
)
media_root = os.path.normpath(os.path.join(os.getcwd(), media_root))
with self.settings(MEDIA_ROOT=media_root):
jobs_response = self.client.get(
reverse("v1:jobs-logs", args=["1a7947f9-6ae8-4e3d-ac1e-e7d608deec85"]),
{"log_type": "user"},
format="json",
)
self.assertEqual(jobs_response.status_code, status.HTTP_200_OK)
self.assertEqual(jobs_response.data.get("logs"), "No available logs")

def test_job_logs_type_function_with_provider_by_user(self):
"""Tests job log non-authorized."""
user = models.User.objects.get(username="test_user")
self.client.force_authenticate(user=user)

media_root = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"..",
"resources",
"fake_media",
)
media_root = os.path.normpath(os.path.join(os.getcwd(), media_root))
with self.settings(MEDIA_ROOT=media_root):
jobs_response = self.client.get(
reverse("v1:jobs-logs", args=["1a7947f9-6ae8-4e3d-ac1e-e7d608deec85"]),
{"log_type": "function"},
format="json",
)
self.assertEqual(jobs_response.status_code, status.HTTP_200_OK)
self.assertEqual(jobs_response.data.get("logs"), "No available logs")

def test_job_logs_type_function_by_provider(self):
"""Tests job log non-authorized."""
user = models.User.objects.get(username="test_user_2")
self.client.force_authenticate(user=user)

media_root = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"..",
"resources",
"fake_media",
)
media_root = os.path.normpath(os.path.join(os.getcwd(), media_root))
with self.settings(MEDIA_ROOT=media_root):
jobs_response = self.client.get(
reverse("v1:jobs-logs", args=["1a7947f9-6ae8-4e3d-ac1e-e7d608deec85"]),
{"log_type": "function"},
format="json",
)
self.assertEqual(jobs_response.status_code, status.HTTP_200_OK)
self.assertTrue("Function log line 1" in jobs_response.data.get("logs"))
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Function log line 1
Function log line 2
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
test_user user log line 1
test_user user log line 2
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
test_user_2 user log line 1
test_user_2 user log line 2
Loading