From 897fed88823d67c67a9a7f1d64114fb29b573d88 Mon Sep 17 00:00:00 2001 From: ilyasabdellaoui Date: Mon, 23 Sep 2024 01:29:27 +0000 Subject: [PATCH 1/4] [Outlook] Feature: Office365 multi-user support Introduced BaseOffice365User abstract base class to standardize Office 365 user handling. Added MultiOffice365Users to manage multiple emails from config. Added client_emails (comma-separated) in OutlookDataSource config. Resolved issue with fetching too many users causing SMTP server not found error. --- connectors/sources/outlook.py | 119 ++++++++++++++++++++++++++++++---- tests/sources/test_outlook.py | 99 +++++++++++++++++++++++----- 2 files changed, 189 insertions(+), 29 deletions(-) diff --git a/connectors/sources/outlook.py b/connectors/sources/outlook.py index 47a495162..4ea42c555 100644 --- a/connectors/sources/outlook.py +++ b/connectors/sources/outlook.py @@ -7,9 +7,11 @@ import asyncio import os +from abc import ABC, abstractmethod from copy import copy from datetime import date from functools import cached_property, partial +from typing import List import aiofiles import aiohttp @@ -348,13 +350,13 @@ async def get_user_accounts(self): yield user_account -class Office365Users: - """Fetch users from Office365 Active Directory""" +class BaseOffice365User(ABC): + """Abstract base class for Office 365 user management""" def __init__(self, client_id, client_secret, tenant_id): - self.tenant_id = tenant_id self.client_id = client_id self.client_secret = client_secret + self.tenant_id = tenant_id @cached_property def _get_session(self): @@ -403,6 +405,21 @@ async def _fetch_token(self): except Exception as exception: self._check_errors(response=exception) + @abstractmethod + async def get_users(self): + pass + + @abstractmethod + async def get_user_accounts(self): + pass + + +class Office365Users(BaseOffice365User): + """Fetch users from Office365 Active Directory""" + + def __init__(self, client_id, client_secret, tenant_id): + super().__init__(client_id, client_secret, tenant_id) + @retryable( retries=RETRIES, interval=RETRY_INTERVAL, @@ -456,6 +473,57 @@ async def get_user_accounts(self): yield user_account +class MultiOffice365Users(BaseOffice365User): + """Fetch multiple Office365 users based on a list of email addresses.""" + + def __init__(self, client_id, client_secret, tenant_id, client_emails: List[str]): + super().__init__(client_id, client_secret, tenant_id) + self.client_emails = client_emails + + async def get_users(self): + access_token = await self._fetch_token() + for email in self.client_emails: + url = f"https://graph.microsoft.com/v1.0/users/{email}" + try: + async with self._get_session.get( + url=url, + headers={ + "Authorization": f"Bearer {access_token}", + "Content-Type": "application/json", + }, + ) as response: + json_response = await response.json() + yield json_response + except Exception: + raise + + async def get_user_accounts(self): + async for user in self.get_users(): + mail = user.get("mail") + if mail is None: + continue + + credentials = OAuth2Credentials( + client_id=self.client_id, + tenant_id=self.tenant_id, + client_secret=self.client_secret, + identity=Identity(primary_smtp_address=mail), + ) + configuration = Configuration( + credentials=credentials, + auth_type=OAUTH2, + service_endpoint=EWS_ENDPOINT, + retry_policy=FaultTolerance(max_wait=120), + ) + user_account = Account( + primary_smtp_address=mail, + config=configuration, + autodiscover=False, + access_type=IMPERSONATION, + ) + yield user_account + + class OutlookDocFormatter: """Format Outlook object documents to Elasticsearch document""" @@ -583,6 +651,27 @@ def attachment_doc_formatter(self, attachment, attachment_type, timezone): } +class UserFactory: + """Factory class for creating Office365 user instances""" + + @staticmethod + def create_user(configuration: dict) -> BaseOffice365User: + if configuration.get("client_emails"): + client_emails = [email.strip() for email in configuration["client_emails"].split(",")] + return MultiOffice365Users( + client_id=configuration["client_id"], + client_secret=configuration["client_secret"], + tenant_id=configuration["tenant_id"], + client_emails=client_emails + ) + else: + return Office365Users( + client_id=configuration["client_id"], + client_secret=configuration["client_secret"], + tenant_id=configuration["tenant_id"] + ) + + class OutlookClient: """Outlook client to handle API calls made to Outlook""" @@ -605,11 +694,7 @@ def set_logger(self, logger_): @cached_property def _get_user_instance(self): if self.is_cloud: - return Office365Users( - client_id=self.configuration["client_id"], - client_secret=self.configuration["client_secret"], - tenant_id=self.configuration["tenant_id"], - ) + return UserFactory.create_user(self.configuration) return ExchangeUsers( ad_server=self.configuration["active_directory_server"], @@ -666,9 +751,12 @@ async def get_tasks(self, account): yield task async def get_contacts(self, account): - folder = account.root / "Top of Information Store" / "Contacts" - for contact in await asyncio.to_thread(folder.all().only, *CONTACT_FIELDS): - yield contact + try: + folder = account.root / "Top of Information Store" / "Contacts" + for contact in await asyncio.to_thread(folder.all().only, *CONTACT_FIELDS): + yield contact + except Exception: + raise class OutlookDataSource(BaseDataSource): @@ -735,6 +823,13 @@ def get_default_configuration(cls): "sensitive": True, "type": "str", }, + "client_emails": { + "depends_on": [{"field": "data_source", "value": OUTLOOK_CLOUD}], + "label": "Client Email Addresses (comma-separated)", + "order": 5, + "required": False, + "type": "str", + }, "exchange_server": { "depends_on": [{"field": "data_source", "value": OUTLOOK_SERVER}], "label": "Exchange Server", @@ -1072,9 +1167,11 @@ async def get_docs(self, filtering=None): dictionary: dictionary containing meta-data of the files. """ async for account in self.client._get_user_instance.get_user_accounts(): + self._logger.debug(f"Processing account: {account}") timezone = account.default_timezone or DEFAULT_TIMEZONE async for mail in self._fetch_mails(account=account, timezone=timezone): + self._logger.debug(f"Fetched mail: {mail}") yield mail async for contact in self._fetch_contacts( diff --git a/tests/sources/test_outlook.py b/tests/sources/test_outlook.py index 31cc507c2..6020e8e92 100644 --- a/tests/sources/test_outlook.py +++ b/tests/sources/test_outlook.py @@ -374,6 +374,7 @@ async def create_outlook_source( tenant_id="foo", client_id="bar", client_secret="faa", + client_emails=None, exchange_server="127.0.0.1", active_directory_server="127.0.0.1", username="fee", @@ -383,12 +384,16 @@ async def create_outlook_source( ssl_ca="", use_text_extraction_service=False, ): + if client_emails is None: + client_emails = "" + async with create_source( OutlookDataSource, data_source=data_source, tenant_id=tenant_id, client_id=client_id, client_secret=client_secret, + client_emails=client_emails, exchange_server=exchange_server, active_directory_server=active_directory_server, username=username, @@ -415,26 +420,36 @@ def get_stream_reader(): return async_mock -def side_effect_function(url, headers): +def side_effect_function(client_emails=None): """Dynamically changing return values for API calls Args: url, ssl: Params required for get call + client_emails: Optional string of comma-separated email addresses """ - if url == "https://graph.microsoft.com/v1.0/users?$top=999": - return get_json_mock( - mock_response={ - "@odata.nextLink": "https://graph.microsoft.com/v1.0/users?$top=999&$skipToken=fake-skip-token", - "value": [{"mail": "test.user@gmail.com"}], - }, - status=200, - ) - elif ( - url - == "https://graph.microsoft.com/v1.0/users?$top=999&$skipToken=fake-skip-token" - ): - return get_json_mock( - mock_response={"value": [{"mail": "dummy.user@gmail.com"}]}, status=200 - ) + def inner(url, headers): + if client_emails: + emails = [email.strip() for email in client_emails.split(",")] + for email in emails: + if url == f"https://graph.microsoft.com/v1.0/users/{email}": + users_response = {"value": [{"mail": email}]} + return get_json_mock(mock_response=users_response, status=200) + elif url == "https://graph.microsoft.com/v1.0/users?$top=999": + return get_json_mock( + mock_response={ + "@odata.nextLink": "https://graph.microsoft.com/v1.0/users?$top=999&$skipToken=fake-skip-token", + "value": [{"mail": "test.user@gmail.com"}], + }, + status=200, + ) + elif ( + url + == "https://graph.microsoft.com/v1.0/users?$top=999&$skipToken=fake-skip-token" + ): + return get_json_mock( + mock_response={"value": [{"mail": "dummy.user@gmail.com"}]}, status=200 + ) + + return inner @pytest.mark.asyncio @@ -459,6 +474,7 @@ def side_effect_function(url, headers): "tenant_id": "foo", "client_id": "bar", "client_secret": "", + "client_emails": None, } ), ], @@ -497,6 +513,17 @@ async def test_validate_configuration_with_invalid_dependency_fields_raises_erro "tenant_id": "foo", "client_id": "bar", "client_secret": "foo.bar", + "client_emails": None + } + ), + ( + # Outlook Cloud with non-blank dependent fields & client_emails provided + { + "data_source": OUTLOOK_CLOUD, + "tenant_id": "foo", + "client_id": "bar", + "client_secret": "foo.bar", + "client_emails": "test.user@gmail.com" } ), ], @@ -552,7 +579,7 @@ async def test_ping_for_cloud(): ): with mock.patch( "aiohttp.ClientSession.get", - side_effect=side_effect_function, + side_effect=side_effect_function(), ): await source.ping() @@ -597,13 +624,49 @@ async def test_get_users_for_cloud(): ): with mock.patch( "aiohttp.ClientSession.get", - side_effect=side_effect_function, + side_effect=side_effect_function(), ): async for response in source.client._get_user_instance.get_users(): user_mails = [user["mail"] for user in response["value"]] users.extend(user_mails) assert users == ["test.user@gmail.com", "dummy.user@gmail.com"] + client_emails = "one.user@gmail.com" + async with create_outlook_source(client_emails=client_emails) as source: + users = [] + with mock.patch( + "aiohttp.ClientSession.post", + return_value=get_json_mock( + mock_response={"access_token": "fake-token"}, status=200 + ), + ): + with mock.patch( + "aiohttp.ClientSession.get", + side_effect=side_effect_function(client_emails), + ): + async for response in source.client._get_user_instance.get_users(): + user_mails = [user["mail"] for user in response["value"]] + users.extend(user_mails) + assert users == ["one.user@gmail.com"] + + client_emails = "first.user@gmail.com, second.user@gmail.com" + async with create_outlook_source(client_emails=client_emails) as source: + users = [] + with mock.patch( + "aiohttp.ClientSession.post", + return_value=get_json_mock( + mock_response={"access_token": "fake-token"}, status=200 + ), + ): + with mock.patch( + "aiohttp.ClientSession.get", + side_effect=side_effect_function(client_emails), + ): + async for response in source.client._get_user_instance.get_users(): + user_mails = [user["mail"] for user in response["value"]] + users.extend(user_mails) + assert set(users) == {"first.user@gmail.com", "second.user@gmail.com"} + @pytest.mark.asyncio @patch("connectors.sources.outlook.Connection") From 8076809afe10795539ec4d4fbb0d138b23a977ac Mon Sep 17 00:00:00 2001 From: ilyasabdellaoui Date: Sun, 29 Sep 2024 19:31:12 +0000 Subject: [PATCH 2/4] [Outlook] Fix: Update order values and add tooltip for client_emails --- connectors/sources/outlook.py | 29 ++++++++++++++++------------- tests/sources/test_outlook.py | 7 ++++--- 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/connectors/sources/outlook.py b/connectors/sources/outlook.py index 4ea42c555..3c299ad99 100644 --- a/connectors/sources/outlook.py +++ b/connectors/sources/outlook.py @@ -411,7 +411,7 @@ async def get_users(self): @abstractmethod async def get_user_accounts(self): - pass + pass class Office365Users(BaseOffice365User): @@ -657,18 +657,20 @@ class UserFactory: @staticmethod def create_user(configuration: dict) -> BaseOffice365User: if configuration.get("client_emails"): - client_emails = [email.strip() for email in configuration["client_emails"].split(",")] + client_emails = [ + email.strip() for email in configuration["client_emails"].split(",") + ] return MultiOffice365Users( client_id=configuration["client_id"], client_secret=configuration["client_secret"], tenant_id=configuration["tenant_id"], - client_emails=client_emails + client_emails=client_emails, ) else: return Office365Users( client_id=configuration["client_id"], client_secret=configuration["client_secret"], - tenant_id=configuration["tenant_id"] + tenant_id=configuration["tenant_id"], ) @@ -827,40 +829,41 @@ def get_default_configuration(cls): "depends_on": [{"field": "data_source", "value": OUTLOOK_CLOUD}], "label": "Client Email Addresses (comma-separated)", "order": 5, + "tooltip": "Specify the email addresses to limit data fetching to specific clients. If left empty, data will be fetched for all users.", "required": False, "type": "str", }, "exchange_server": { "depends_on": [{"field": "data_source", "value": OUTLOOK_SERVER}], "label": "Exchange Server", - "order": 5, + "order": 6, "tooltip": "Exchange server's IP address. E.g. 127.0.0.1", "type": "str", }, "active_directory_server": { "depends_on": [{"field": "data_source", "value": OUTLOOK_SERVER}], "label": "Active Directory Server", - "order": 6, + "order": 7, "tooltip": "Active Directory server's IP address. E.g. 127.0.0.1", "type": "str", }, "username": { "depends_on": [{"field": "data_source", "value": OUTLOOK_SERVER}], "label": "Exchange server username", - "order": 7, + "order": 8, "type": "str", }, "password": { "depends_on": [{"field": "data_source", "value": OUTLOOK_SERVER}], "label": "Exchange server password", - "order": 8, + "order": 9, "sensitive": True, "type": "str", }, "domain": { "depends_on": [{"field": "data_source", "value": OUTLOOK_SERVER}], "label": "Exchange server domain name", - "order": 9, + "order": 10, "tooltip": "Domain name such as gmail.com, outlook.com", "type": "str", }, @@ -868,7 +871,7 @@ def get_default_configuration(cls): "depends_on": [{"field": "data_source", "value": OUTLOOK_SERVER}], "display": "toggle", "label": "Enable SSL", - "order": 10, + "order": 11, "type": "bool", "value": False, }, @@ -878,13 +881,13 @@ def get_default_configuration(cls): {"field": "ssl_enabled", "value": True}, ], "label": "SSL certificate", - "order": 11, + "order": 12, "type": "str", }, "use_text_extraction_service": { "display": "toggle", "label": "Use text extraction service", - "order": 12, + "order": 13, "tooltip": "Requires a separate deployment of the Elastic Text Extraction Service. Requires that pipeline settings disable text extraction.", "type": "bool", "ui_restrictions": ["advanced"], @@ -893,7 +896,7 @@ def get_default_configuration(cls): "use_document_level_security": { "display": "toggle", "label": "Enable document level security", - "order": 13, + "order": 14, "tooltip": "Document level security ensures identities and permissions set in Outlook are maintained in Elasticsearch. This enables you to restrict and personalize read-access users and groups have to documents in this index. Access control syncs ensure this metadata is kept up to date in your Elasticsearch documents.", "type": "bool", "value": False, diff --git a/tests/sources/test_outlook.py b/tests/sources/test_outlook.py index 6020e8e92..8d408ea23 100644 --- a/tests/sources/test_outlook.py +++ b/tests/sources/test_outlook.py @@ -426,6 +426,7 @@ def side_effect_function(client_emails=None): url, ssl: Params required for get call client_emails: Optional string of comma-separated email addresses """ + def inner(url, headers): if client_emails: emails = [email.strip() for email in client_emails.split(",")] @@ -448,7 +449,7 @@ def inner(url, headers): return get_json_mock( mock_response={"value": [{"mail": "dummy.user@gmail.com"}]}, status=200 ) - + return inner @@ -513,7 +514,7 @@ async def test_validate_configuration_with_invalid_dependency_fields_raises_erro "tenant_id": "foo", "client_id": "bar", "client_secret": "foo.bar", - "client_emails": None + "client_emails": None, } ), ( @@ -523,7 +524,7 @@ async def test_validate_configuration_with_invalid_dependency_fields_raises_erro "tenant_id": "foo", "client_id": "bar", "client_secret": "foo.bar", - "client_emails": "test.user@gmail.com" + "client_emails": "test.user@gmail.com", } ), ], From 104e29915bb37f4e7b753968b993311ea3c73b46 Mon Sep 17 00:00:00 2001 From: ilyasabdellaoui Date: Sun, 29 Sep 2024 21:53:13 +0000 Subject: [PATCH 3/4] [Outlook] Feat: Implement JSON batching for user fetching in Outlook Cloud connector - Refactored the method to utilize Microsoft Graph's JSON batching feature. - The method now processes client emails in batches of up to 20 to optimize API requests. - Added error handling for individual user fetch errors within the batch response. - If any batch request fails, the method collects the errors and raises an exception with details. --- connectors/sources/outlook.py | 50 +++++++++++++++++++++++++++++------ 1 file changed, 42 insertions(+), 8 deletions(-) diff --git a/connectors/sources/outlook.py b/connectors/sources/outlook.py index 3c299ad99..968e2bee7 100644 --- a/connectors/sources/outlook.py +++ b/connectors/sources/outlook.py @@ -214,6 +214,18 @@ class SSLFailed(Exception): pass +class OutlookUserFetchFailed(Exception): + """Exception class to notify that fetching a specific user from Outlook failed.""" + + pass + + +class BatchRequestFailed(Exception): + """Exception class to notify that a batch request to fetch users failed.""" + + pass + + class ManageCertificate: async def store_certificate(self, certificate): async with aiofiles.open(CERT_FILE, "w") as file: @@ -482,20 +494,41 @@ def __init__(self, client_id, client_secret, tenant_id, client_emails: List[str] async def get_users(self): access_token = await self._fetch_token() - for email in self.client_emails: - url = f"https://graph.microsoft.com/v1.0/users/{email}" + errors = [] + for i in range(0, len(self.client_emails), 20): + batch_emails = self.client_emails[i : i + 20] + requests = [ + {"id": str(index + 1), "method": "GET", "url": f"/users/{email}"} + for index, email in enumerate(batch_emails) + ] + batch_request_body = {"requests": requests} try: - async with self._get_session.get( - url=url, + async with self._get_session.post( + url="https://graph.microsoft.com/v1.0/$batch", headers={ "Authorization": f"Bearer {access_token}", "Content-Type": "application/json", }, + json=batch_request_body, ) as response: json_response = await response.json() - yield json_response - except Exception: - raise + for res in json_response.get("responses", []): + user_id = res.get("id") + status = res.get("status") + if status == 200: + yield res.get("body") + else: + msg = f"Error for user {user_id}: {res.get('body')}" + errors.append(OutlookUserFetchFailed(msg)) + except Exception as e: + msg = f"Batch request failed: {str(e)}" + errors.append(BatchRequestFailed(msg)) + + if errors: + msg = "Errors occurred while fetching users: " + "\n".join( + str(e) for e in errors + ) + raise Exception(msg) async def get_user_accounts(self): async for user in self.get_users(): @@ -714,7 +747,8 @@ async def _fetch_all_users(self): yield user async def ping(self): - await anext(self._get_user_instance.get_users()) + async for _user in self._get_user_instance.get_users(): + return async def get_mails(self, account): for mail_type in MAIL_TYPES: From 71a30874d8cd83b41adb268beede67d052603650 Mon Sep 17 00:00:00 2001 From: ilyasabdellaoui Date: Sun, 29 Sep 2024 23:51:20 +0000 Subject: [PATCH 4/4] [Outlook] refactor: update client_emails to use list type in config --- connectors/sources/outlook.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/connectors/sources/outlook.py b/connectors/sources/outlook.py index 968e2bee7..98fa50572 100644 --- a/connectors/sources/outlook.py +++ b/connectors/sources/outlook.py @@ -690,9 +690,7 @@ class UserFactory: @staticmethod def create_user(configuration: dict) -> BaseOffice365User: if configuration.get("client_emails"): - client_emails = [ - email.strip() for email in configuration["client_emails"].split(",") - ] + client_emails = [email.strip() for email in configuration["client_emails"]] return MultiOffice365Users( client_id=configuration["client_id"], client_secret=configuration["client_secret"], @@ -865,7 +863,7 @@ def get_default_configuration(cls): "order": 5, "tooltip": "Specify the email addresses to limit data fetching to specific clients. If left empty, data will be fetched for all users.", "required": False, - "type": "str", + "type": "list", }, "exchange_server": { "depends_on": [{"field": "data_source", "value": OUTLOOK_SERVER}],