Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logging #2329

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions connectors/sources/azure_blob_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ async def get_content(self, blob, timestamp=None, doit=None):
if not self.can_file_be_downloaded(file_extension, filename, file_size):
return

self._logger.debug(f"Downloading content for file: {filename}")
document = {"_id": blob["id"], "_timestamp": blob["_timestamp"]}
return await self.download_and_extract_file(
document,
Expand All @@ -194,6 +195,9 @@ async def get_content(self, blob, timestamp=None, doit=None):
)

async def blob_download_func(self, blob_name, container_name):
self._logger.debug(
f"Downloading content for blob: {blob_name} from {container_name} container"
)
async with BlobClient.from_connection_string(
conn_str=self.connection_string,
container_name=container_name,
Expand All @@ -212,6 +216,7 @@ async def get_container(self, container_list):
Yields:
dictionary: Container document with name & metadata
"""
self._logger.debug("Fetching containers")
container_set = set(container_list)
async with BlobServiceClient.from_connection_string(
conn_str=self.connection_string, retry_total=self.retry_count
Expand Down Expand Up @@ -247,6 +252,7 @@ async def get_blob(self, container):
Yields:
dictionary: Formatted blob document
"""
self._logger.info(f"Fetching blobs for '{container['name']}' container")
async with ContainerClient.from_connection_string(
conn_str=self.connection_string,
container_name=container["name"],
Expand Down
25 changes: 17 additions & 8 deletions connectors/sources/confluence.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@
WILDCARD = "*"


class InvalidConfluenceDataSourceTypeError(ValueError):
pass


class ConfluenceClient:
"""Confluence client to handle API calls made to Confluence"""

Expand Down Expand Up @@ -114,7 +118,7 @@ def _get_session(self):
if self.session:
return self.session

self._logger.debug("Creating a client session")
self._logger.debug(f"Creating a '{self.data_source_type}' client session")
if self.data_source_type == CONFLUENCE_CLOUD:
auth = (
self.configuration["account_email"],
Expand All @@ -125,11 +129,16 @@ def _get_session(self):
self.configuration["username"],
self.configuration["password"],
)
else:
elif self.data_source_type == CONFLUENCE_DATA_CENTER:
auth = (
self.configuration["data_center_username"],
self.configuration["data_center_password"],
)
else:
msg = f"Unknown data source type '{self.data_source_type}' for Confluence connector"
self._logger.error(msg)

raise InvalidConfluenceDataSourceTypeError(msg)

basic_auth = aiohttp.BasicAuth(login=auth[0], password=auth[1])
timeout = aiohttp.ClientTimeout(total=None) # pyright: ignore
Expand Down Expand Up @@ -795,9 +804,7 @@ async def fetch_server_space_permission(self, space_key):
return {}

url = URLS[SPACE_PERMISSION].format(space_key=space_key)
self._logger.debug(
f"Fetching permissions for space '{space_key} from Confluence server'"
)
self._logger.info(f"Fetching permissions for '{space_key}' space")
return await self.confluence_client.fetch_server_space_permission(url=url)

async def fetch_documents(self, api_query):
Expand Down Expand Up @@ -857,7 +864,7 @@ async def fetch_attachments(
String: Download link to get the content of the attachment
"""
self._logger.info(
f"Fetching attachments for '{parent_name}' from '{parent_space}' space"
f"Fetching attachments for '{parent_name}' {parent_type} from '{parent_space}' space"
)
async for attachment in self.confluence_client.fetch_attachments(
content_id=content_id,
Expand Down Expand Up @@ -941,7 +948,7 @@ async def download_attachment(self, url, attachment, timestamp=None, doit=False)
if not self.can_file_be_downloaded(file_extension, filename, file_size):
return

self._logger.info(f"Downloading content for file: {filename}")
self._logger.debug(f"Downloading content for file: {filename}")
document = {"_id": attachment["_id"], "_timestamp": attachment["_timestamp"]}
return await self.download_and_extract_file(
document,
Expand Down Expand Up @@ -1123,7 +1130,9 @@ async def get_docs(self, filtering=None):
advanced_rules = filtering.get_advanced_rules()
for query_info in advanced_rules:
query = query_info.get("query")
logger.debug(f"Fetching confluence content using custom query: {query}")
self._logger.debug(
f"Fetching confluence content using custom query: {query}"
)
async for document, download_link in self.search_by_query(query):
if download_link:
yield document, partial(
Expand Down
40 changes: 31 additions & 9 deletions connectors/sources/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,12 @@ async def ping(self):
async def get_tables_to_fetch(self, is_filtering=False):
tables = configured_tables(self.tables)
if is_wildcard(tables) or is_filtering:
msg = (
"Fetching all tables as the configuration field 'tables' is set to '*'"
if not is_filtering
else "Fetching all tables as the advanced sync rules are enabled."
)
self._logger.info(msg)
async for row in fetch(
cursor_func=partial(
self.get_cursor,
Expand All @@ -268,6 +274,7 @@ async def get_tables_to_fetch(self, is_filtering=False):
):
yield row[0]
else:
self._logger.info(f"Fetching user configured tables: {tables}")
for table in tables:
yield table

Expand Down Expand Up @@ -302,9 +309,13 @@ async def get_table_primary_key(self, table):
retry_count=self.retry_count,
)
]

self._logger.debug(f"Found primary keys for '{table}' table")

return primary_keys

async def get_table_last_update_time(self, table):
self._logger.debug(f"Fetching last updated time for table: {table}")
[last_update_time] = await anext(
fetch(
cursor_func=partial(
Expand Down Expand Up @@ -332,15 +343,20 @@ async def data_streamer(self, table=None, query=None):
Yields:
list: It will first yield the column names, then data in each row
"""
if query is not None:
cursor_query = query
msg = f"Streaming records from database for using query: {query}"
else:
cursor_query = self.queries.table_data(
schema=self.schema,
table=table,
)
msg = f"Streaming records from database for table: {table}"
self._logger.debug(msg)
async for data in fetch(
cursor_func=partial(
self.get_cursor,
self.queries.table_data(
schema=self.schema,
table=table,
)
if query is None
else query,
cursor_query,
),
fetch_columns=True,
fetch_size=self.fetch_size,
Expand Down Expand Up @@ -496,6 +512,7 @@ def row2doc(self, row, doc_id, table, timestamp):
return row

async def get_primary_key(self, tables):
self._logger.debug(f"Extracting primary keys for tables: {tables}")
primary_key_columns = []
for table in tables:
primary_key_columns.extend(
Expand Down Expand Up @@ -534,6 +551,7 @@ async def fetch_documents_from_table(self, table):
Yields:
Dict: Document to be indexed
"""
self._logger.info(f"Fetching records for the table: {table}")
try:
docs_generator = self._yield_all_docs_from_tables(table=table)
async for doc in docs_generator:
Expand All @@ -553,6 +571,9 @@ async def fetch_documents_from_query(self, tables, query):
Yields:
Dict: Document to be indexed
"""
self._logger.info(
f"Fetching records for {tables} tables using the custom query: {query}"
)
try:
docs_generator = self._yield_docs_custom_query(tables=tables, query=query)
async for doc in docs_generator:
Expand Down Expand Up @@ -597,6 +618,7 @@ async def _yield_docs_custom_query(self, tables, query):
async def _yield_all_docs_from_tables(self, table):
row_count = await self.mssql_client.get_table_row_count(table=table)
if row_count > 0:
self._logger.debug(f"Total '{row_count}' rows found in '{table}' table")
# Query to get the table's primary key
keys = await self.get_primary_key(tables=[table])
if keys:
Expand Down Expand Up @@ -641,6 +663,9 @@ async def get_docs(self, filtering=None):
"""
if filtering and filtering.has_advanced_rules():
advanced_rules = filtering.get_advanced_rules()
self._logger.info(
f"Fetching records from the database using advanced sync rules: {advanced_rules}"
)
for rule in advanced_rules:
query = rule.get("query")
tables = rule.get("tables")
Expand All @@ -661,9 +686,6 @@ async def get_docs(self, filtering=None):
)
continue

self._logger.debug(
f"Found table: {table} in database: {self.database}."
)
table_count += 1
async for row in self.fetch_documents_from_table(table=table):
yield row, None
Expand Down
12 changes: 11 additions & 1 deletion connectors/sources/oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ async def get_cursor(self, query):
Returns:
cursor: Synchronous cursor
"""
self._logger.debug(f"Retrieving the cursor for query: {query}")
try:
loop = asyncio.get_running_loop()
if self.connection is None:
Expand Down Expand Up @@ -159,6 +160,9 @@ async def ping(self):
async def get_tables_to_fetch(self):
tables = configured_tables(self.tables)
if is_wildcard(tables):
self._logger.info(
"Fetching all tables as the configuration field 'tables' is set to '*'"
)
async for row in fetch(
cursor_func=partial(
self.get_cursor,
Expand All @@ -171,6 +175,7 @@ async def get_tables_to_fetch(self):
):
yield row[0]
else:
self._logger.info(f"Fetching user configured tables: {tables}")
for table in tables:
yield table

Expand All @@ -190,6 +195,7 @@ async def get_table_row_count(self, table):
return row_count

async def get_table_primary_key(self, table):
self._logger.debug(f"Extracting primary keys for table: {table}")
primary_keys = [
key
async for [key] in fetch(
Expand All @@ -204,9 +210,11 @@ async def get_table_primary_key(self, table):
retry_count=self.retry_count,
)
]
self._logger.debug(f"Found primary keys for '{table}' table")
return primary_keys

async def get_table_last_update_time(self, table):
self._logger.debug(f"Fetching last updated time for table: {table}")
[last_update_time] = await anext(
fetch(
cursor_func=partial(
Expand All @@ -233,6 +241,7 @@ async def data_streamer(self, table):
Yields:
list: It will first yield the column names, then data in each row
"""
self._logger.debug(f"Streaming records from database for table: {table}")
async for data in fetch(
cursor_func=partial(
self.get_cursor,
Expand Down Expand Up @@ -388,10 +397,12 @@ async def fetch_documents(self, table):
Yields:
Dict: Document to be indexed
"""
self._logger.info(f"Fetching records for the table: {table}")
try:
row_count = await self.oracle_client.get_table_row_count(table=table)
if row_count > 0:
# Query to get the table's primary key
self._logger.debug(f"Total '{row_count}' rows found in '{table}' table")
keys = await self.oracle_client.get_table_primary_key(table=table)
keys = map_column_names(column_names=keys, tables=[table])
if keys:
Expand Down Expand Up @@ -444,7 +455,6 @@ async def get_docs(self, filtering=None):
"""
table_count = 0
async for table in self.oracle_client.get_tables_to_fetch():
self._logger.debug(f"Found table: {table} in database: {self.database}.")
table_count += 1
async for row in self.fetch_documents(table=table):
yield row, None
Expand Down
Loading