diff --git a/connectors/config.py b/connectors/config.py index 689c28739..7376625dd 100644 --- a/connectors/config.py +++ b/connectors/config.py @@ -87,6 +87,13 @@ def _default_config(): "preflight_idle": 30, "max_errors": 20, "max_errors_span": 600, + "error_monitor": { + "max_total_errors": 1000, + "max_consecutive_errors": 10, + "max_error_rate": 0.15, + "error_window_size": 100, + "error_queue_size": 10, + }, "max_concurrent_content_syncs": 1, "max_concurrent_access_control_syncs": 1, "max_file_download_size": DEFAULT_MAX_FILE_SIZE, diff --git a/connectors/es/sink.py b/connectors/es/sink.py index 43b863e19..9e498581f 100644 --- a/connectors/es/sink.py +++ b/connectors/es/sink.py @@ -110,6 +110,10 @@ def __init__(self, cause=None): self.__cause__ = cause +class DocumentIngestionError(Exception): + pass + + class Sink: """Send bulk operations in batches by consuming a queue. @@ -136,6 +140,7 @@ def __init__( max_concurrency, max_retries, retry_interval, + error_monitor, logger_=None, enable_bulk_operations_logging=False, ): @@ -145,6 +150,7 @@ def __init__( self.pipeline = pipeline self.chunk_mem_size = chunk_mem_size * 1024 * 1024 self.bulk_tasks = ConcurrentTasks(max_concurrency=max_concurrency) + self.error_monitor = error_monitor self.max_retires = max_retries self.retry_interval = retry_interval self.error = None @@ -272,18 +278,19 @@ async def _process_bulk_response(self, res, ids_to_ops, do_log=False): successful_result = result in SUCCESSFUL_RESULTS if not successful_result: if "error" in item[action_item]: + message = f"Failed to execute '{action_item}' on document with id '{doc_id}'. Error: {item[action_item].get('error')}" + self.error_monitor.track_error(DocumentIngestionError(message)) if do_log: - self._logger.debug( - f"Failed to execute '{action_item}' on document with id '{doc_id}'. Error: {item[action_item].get('error')}" - ) + self._logger.debug(message) self.counters.increment(RESULT_ERROR, namespace=BULK_RESPONSES) else: + message = f"Executed '{action_item}' on document with id '{doc_id}', but got non-successful result: {result}" + self.error_monitor.track_error(DocumentIngestionError(message)) if do_log: - self._logger.debug( - f"Executed '{action_item}' on document with id '{doc_id}', but got non-successful result: {result}" - ) + self._logger.debug(message) self.counters.increment(RESULT_UNDEFINED, namespace=BULK_RESPONSES) else: + self.error_monitor.track_success() if do_log: self._logger.debug( f"Successfully executed '{action_item}' on document with id '{doc_id}'. Result: {result}" @@ -424,6 +431,7 @@ def __init__( client, queue, index, + error_monitor, filter_=None, sync_rules_enabled=False, content_extraction_enabled=True, @@ -449,27 +457,37 @@ def __init__( self.concurrent_downloads = concurrent_downloads self._logger = logger_ or logger self._canceled = False + self.error_monitor = error_monitor self.skip_unchanged_documents = skip_unchanged_documents async def _deferred_index(self, lazy_download, doc_id, doc, operation): - data = await lazy_download(doit=True, timestamp=doc[TIMESTAMP_FIELD]) - - if data is not None: - self.counters.increment(BIN_DOCS_DOWNLOADED) - data.pop("_id", None) - data.pop(TIMESTAMP_FIELD, None) - doc.update(data) - - doc.pop("_original_filename", None) - - await self.put_doc( - { - "_op_type": operation, - "_index": self.index, - "_id": doc_id, - "doc": doc, - } - ) + try: + data = await lazy_download(doit=True, timestamp=doc[TIMESTAMP_FIELD]) + + if data is not None: + self.counters.increment(BIN_DOCS_DOWNLOADED) + data.pop("_id", None) + data.pop(TIMESTAMP_FIELD, None) + doc.update(data) + + doc.pop("_original_filename", None) + + await self.put_doc( + { + "_op_type": operation, + "_index": self.index, + "_id": doc_id, + "doc": doc, + } + ) + self.error_monitor.track_success() + except ForceCanceledError: + raise + except Exception as ex: + self._logger.error( + f"Failed to do deferred index operation for doc {doc_id}: {ex}" + ) + self.error_monitor.track_error(ex) def force_cancel(self): self._canceled = True @@ -599,6 +617,8 @@ async def get_docs(self, generator, skip_unchanged_documents=False): } ) + lazy_downloads.raise_any_exception() + await asyncio.sleep(0) finally: # wait for all downloads to be finished @@ -803,7 +823,8 @@ class SyncOrchestrator: - once they are both over, returns totals """ - def __init__(self, elastic_config, logger_=None): + def __init__(self, elastic_config, error_monitor, logger_=None): + self.error_monitor = error_monitor self._logger = logger_ or logger self._logger.debug(f"SyncOrchestrator connecting to {elastic_config['host']}") self.es_management_client = ESManagementClient(elastic_config) @@ -884,7 +905,7 @@ def _extractor_task_running(self): async def cancel(self): if self._sink_task_running(): self._logger.info( - f"Cancling the Sink task: {self._sink_task.name}" # pyright: ignore + f"Canceling the Sink task: {self._sink_task.get_name()}" # pyright: ignore ) self._sink_task.cancel() else: @@ -894,7 +915,7 @@ async def cancel(self): if self._extractor_task_running(): self._logger.info( - f"Canceling the Extractor task: {self._extractor_task.name}" # pyright: ignore + f"Canceling the Extractor task: {self._extractor_task.get_name()}" # pyright: ignore ) self._extractor_task.cancel() else: @@ -1005,6 +1026,7 @@ async def async_bulk( self.es_management_client, stream, index, + error_monitor=self.error_monitor, filter_=filter_, sync_rules_enabled=sync_rules_enabled, content_extraction_enabled=content_extraction_enabled, @@ -1031,6 +1053,7 @@ async def async_bulk( max_concurrency=max_concurrency, max_retries=max_bulk_retries, retry_interval=retry_interval, + error_monitor=self.error_monitor, logger_=self._logger, enable_bulk_operations_logging=enable_bulk_operations_logging, ) diff --git a/connectors/source.py b/connectors/source.py index b94c73c3e..463428549 100644 --- a/connectors/source.py +++ b/connectors/source.py @@ -32,7 +32,6 @@ from connectors.logger import logger from connectors.utils import ( TIKA_SUPPORTED_FILETYPES, - ErrorMonitor, convert_to_b64, epoch_timestamp_zulu, get_file_extension, @@ -410,7 +409,7 @@ def __init__(self, configuration): # this will be overwritten by set_framework_config() self.framework_config = DataSourceFrameworkConfig.Builder().build() - self.error_monitor = ErrorMonitor() + self.error_monitor = None def __str__(self): return f"Datasource `{self.__class__.name}`" @@ -419,6 +418,9 @@ def set_logger(self, logger_): self._logger = logger_ self._set_internal_logger() + def set_error_monitor(self, error_monitor): + self.error_monitor = error_monitor + def _set_internal_logger(self): # no op for BaseDataSource # if there are internal class (e.g. Client class) to which the logger need to be set, @@ -778,12 +780,14 @@ async def download_and_extract_file( doc = await self.handle_file_content_extraction( doc, source_filename, temp_filename ) + self.error_monitor.track_success() return doc except Exception as e: self._logger.warning( f"File download and extraction or conversion for file {source_filename} failed: {e}", exc_info=True, ) + self.error_monitor.track_error(e) if return_doc_if_failed: return doc else: diff --git a/connectors/sources/mongo.py b/connectors/sources/mongo.py index ce71ff458..8e9552238 100644 --- a/connectors/sources/mongo.py +++ b/connectors/sources/mongo.py @@ -240,8 +240,9 @@ def _serialize(value): value = _serialize(value.as_doc().to_dict()) return value - for key, value in doc.items(): - doc[key] = _serialize(value) + with self.with_error_monitoring(): + for key, value in doc.items(): + doc[key] = _serialize(value) return doc diff --git a/connectors/sync_job_runner.py b/connectors/sync_job_runner.py index 07bbfeeaa..1084d7937 100644 --- a/connectors/sync_job_runner.py +++ b/connectors/sync_job_runner.py @@ -35,7 +35,7 @@ INDEXED_DOCUMENT_VOLUME, ) from connectors.source import BaseDataSource -from connectors.utils import truncate_id +from connectors.utils import ErrorMonitor, truncate_id UTF_8 = "utf-8" @@ -113,6 +113,8 @@ def __init__( self.es_config = es_config self.service_config = service_config self.sync_orchestrator = None + error_monitor_config = service_config.get("error_monitor", {}) + self.error_monitor = ErrorMonitor(**error_monitor_config) self.job_reporting_task = None self.bulk_options = self.es_config.get("bulk", {}) self._start_time = None @@ -149,6 +151,7 @@ async def execute(self): configuration=self.sync_job.configuration ) self.data_provider.set_logger(self.sync_job.logger) + self.data_provider.set_error_monitor(self.error_monitor) self.data_provider.set_framework_config( self._data_source_framework_config() ) @@ -183,7 +186,7 @@ async def execute(self): await self._update_native_connector_authentication() self.sync_orchestrator = SyncOrchestrator( - self.es_config, self.sync_job.logger + self.es_config, self.error_monitor, self.sync_job.logger ) if job_type in [JobType.INCREMENTAL, JobType.FULL]: diff --git a/connectors/utils.py b/connectors/utils.py index f979a7943..ee5c3d01d 100644 --- a/connectors/utils.py +++ b/connectors/utils.py @@ -1009,7 +1009,7 @@ def __init__( max_consecutive_errors=10, max_error_rate=0.15, error_window_size=100, - error_queue_size=20, + error_queue_size=10, ): self.max_error_rate = max_error_rate self.error_window_size = error_window_size @@ -1082,13 +1082,13 @@ def _error_window_error_rate(self): def _raise_if_necessary(self): if self.consecutive_error_count > self.max_consecutive_errors: - msg = f"Exceeded maximum consecutive errors - saw {self.consecutive_error_count} errors in a row" + msg = f"Exceeded maximum consecutive errors - saw {self.consecutive_error_count} errors in a row. Last error: {self.last_error}" raise TooManyErrors(msg) from self.last_error elif self.total_error_count > self.max_total_errors: - msg = f"Exceeded maximum total error count - saw {self.total_error_count} errors" + msg = f"Exceeded maximum total error count - saw {self.total_error_count} errors. Last error: {self.last_error}" raise TooManyErrors(msg) from self.last_error elif self.error_window_size > 0: error_rate = self._error_window_error_rate() if error_rate > self.max_error_rate: - msg = f"Exceeded maximum error ratio of {self.max_error_rate} for last {self.error_window_size} operations." + msg = f"Exceeded maximum error ratio of {self.max_error_rate} for last {self.error_window_size} operations. Last error: {self.last_error}" raise TooManyErrors(msg) from self.last_error diff --git a/tests/sources/support.py b/tests/sources/support.py index 61f49633e..f014754e0 100644 --- a/tests/sources/support.py +++ b/tests/sources/support.py @@ -4,6 +4,7 @@ # you may not use this file except in compliance with the Elastic License 2.0. # from contextlib import asynccontextmanager +from unittest.mock import Mock from connectors.source import DEFAULT_CONFIGURATION, DataSourceConfiguration @@ -18,6 +19,7 @@ async def create_source(klass, **extras): config[k] = DEFAULT_CONFIGURATION.copy() | {"value": v} source = klass(configuration=DataSourceConfiguration(config)) + source.set_error_monitor(Mock()) try: yield source finally: diff --git a/tests/test_sink.py b/tests/test_sink.py index 3d4d882ea..bce481182 100644 --- a/tests/test_sink.py +++ b/tests/test_sink.py @@ -118,7 +118,7 @@ async def test_prepare_content_index_raise_error_when_index_creation_failed( status=400, ) - es = SyncOrchestrator(config) + es = SyncOrchestrator(config, error_monitor=Mock()) es._sink = Mock() es._extractor = Mock() @@ -160,7 +160,7 @@ async def test_prepare_content_index_create_index( headers=headers, ) - es = SyncOrchestrator(config) + es = SyncOrchestrator(config, error_monitor=Mock()) es._sink = Mock() es._extractor = Mock() @@ -218,7 +218,7 @@ async def test_prepare_content_index(mock_responses): body='{"acknowledged": True}', ) - es = SyncOrchestrator(config) + es = SyncOrchestrator(config, error_monitor=Mock()) es._sink = Mock() es._extractor = Mock() with mock.patch.object( @@ -324,7 +324,7 @@ async def test_async_bulk(mock_responses): config = {"host": "http://nowhere.com:9200", "user": "tarek", "password": "blah"} set_responses(mock_responses) - es = SyncOrchestrator(config) + es = SyncOrchestrator(config, error_monitor=Mock()) pipeline = Pipeline({}) async def get_docs(): @@ -478,6 +478,7 @@ async def setup_extractor( ESManagementClient(config), queue, INDEX, + error_monitor=Mock(), filter_=filter_mock, content_extraction_enabled=content_extraction_enabled, ) @@ -1131,6 +1132,7 @@ def test_bulk_populate_stats(res, expected_result): sink = Sink( client=None, queue=None, + error_monitor=Mock(), chunk_size=0, pipeline=None, chunk_mem_size=0, @@ -1166,6 +1168,7 @@ async def test_batch_bulk_with_retry(): sink = Sink( client=client, queue=None, + error_monitor=Mock(), chunk_size=0, pipeline={"name": "pipeline"}, chunk_mem_size=0, @@ -1200,6 +1203,7 @@ async def test_batch_bulk_with_errors(patch_logger): sink = Sink( client=client, queue=None, + error_monitor=Mock(), chunk_size=0, pipeline={"name": "pipeline"}, chunk_mem_size=0, @@ -1249,7 +1253,7 @@ async def test_sync_orchestrator_done_and_cleanup( sink_task.done.return_value = sink_task_done config = {"host": "http://nowhere.com:9200", "user": "tarek", "password": "blah"} - es = SyncOrchestrator(config) + es = SyncOrchestrator(config, error_monitor=Mock()) es._extractor = Mock() es._extractor.error = None es._sink = Mock() @@ -1276,6 +1280,7 @@ async def test_extractor_put_doc(): None, queue, INDEX, + error_monitor=Mock() ) await extractor.put_doc(doc) @@ -1291,6 +1296,7 @@ async def test_force_canceled_extractor_put_doc(): None, queue, INDEX, + error_monitor=Mock() ) extractor.force_cancel() @@ -1307,6 +1313,7 @@ async def test_force_canceled_extractor_with_other_errors(patch_logger): None, queue, INDEX, + error_monitor=Mock(), ) generator = AsyncMock(side_effect=Exception("a non-ForceCanceledError")) @@ -1326,6 +1333,7 @@ async def test_sink_fetch_doc(): sink = Sink( None, queue, + error_monitor=Mock(), chunk_size=0, pipeline={"name": "pipeline"}, chunk_mem_size=0, @@ -1347,6 +1355,7 @@ async def test_force_canceled_sink_fetch_doc(): sink = Sink( None, queue, + error_monitor=Mock(), chunk_size=0, pipeline={"name": "pipeline"}, chunk_mem_size=0, @@ -1368,6 +1377,7 @@ async def test_force_canceled_sink_with_other_errors(patch_logger): sink = Sink( None, queue, + error_monitor=Mock(), chunk_size=0, pipeline={"name": "pipeline"}, chunk_mem_size=0, @@ -1412,7 +1422,7 @@ async def test_force_canceled_sink_with_other_errors(patch_logger): @pytest.mark.asyncio async def test_cancel_sync(extractor_task_done, sink_task_done, force_cancel): config = {"host": "http://nowhere.com:9200", "user": "tarek", "password": "blah"} - es = SyncOrchestrator(config) + es = SyncOrchestrator(config, error_monitor=Mock()) es._extractor = Mock() es._extractor.force_cancel = Mock() @@ -1467,6 +1477,7 @@ def _put_side_effect(value): es_client, queue, INDEX, + error_monitor=Mock() ) await extractor.run(doc_generator, JobType.FULL) @@ -1489,6 +1500,7 @@ async def test_should_not_log_bulk_operations_if_doc_id_tracing_is_disabled( sink = Sink( client=client, queue=Mock(), + error_monitor=Mock(), chunk_size=0, pipeline={"name": "pipeline"}, chunk_mem_size=0, @@ -1572,6 +1584,7 @@ async def test_should_log_bulk_operations_if_doc_id_tracing_is_enabled( sink = Sink( client=client, queue=Mock(), + error_monitor=Mock(), chunk_size=0, pipeline={"name": "pipeline"}, chunk_mem_size=0, @@ -1602,6 +1615,7 @@ async def test_should_log_error_when_id_is_missing(patch_logger): sink = Sink( client=client, queue=Mock(), + error_monitor=Mock(), chunk_size=0, pipeline={"name": "pipeline"}, chunk_mem_size=0, @@ -1637,6 +1651,7 @@ async def test_should_log_error_when_unknown_action_item_returned(patch_logger): sink = Sink( client=client, queue=Mock(), + error_monitor=Mock(), chunk_size=0, pipeline={"name": "pipeline"}, chunk_mem_size=0,