Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop swallowing errors when paginating #2903

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 65 additions & 38 deletions connectors/sources/confluence.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
from urllib.parse import urljoin

import aiohttp
from aiohttp.client_exceptions import ClientResponseError, ServerDisconnectedError
from aiohttp.client_exceptions import (
ClientResponseError,
ServerDisconnectedError,
)

from connectors.access_control import ACCESS_CONTROL
from connectors.logger import logger
Expand Down Expand Up @@ -193,7 +196,7 @@ async def _handle_client_errors(self, url, exception):
try:
retry_seconds = int(response_headers["Retry-After"])
except (TypeError, ValueError) as exception:
self._logger.error(
self._logger.warning(
f"Rate limit reached but an unexpected error occurred while reading value of 'Retry-After' header: {exception}. Retrying in {DEFAULT_RETRY_SECONDS} seconds..."
)
else:
Expand All @@ -214,7 +217,7 @@ async def _handle_client_errors(self, url, exception):
raise InternalServerError
else:
self._logger.error(
f"Error while making a GET call for URL: {url}. Error details: {exception}"
f"Error while making a GET call for URL: {url}.", exc_info=exception
)
raise

Expand Down Expand Up @@ -265,7 +268,7 @@ async def paginated_api_call(self, url_name, **url_kwargs):
self._logger.debug(f"Starting pagination for API endpoint {url}")
response = await self.api_call(url=url)
json_response = await response.json()
links = json_response.get("_links")
links = json_response.get("_links", {})
yield json_response
if links.get("next") is None:
return
Expand All @@ -275,9 +278,10 @@ async def paginated_api_call(self, url_name, **url_kwargs):
)
except Exception as exception:
self._logger.warning(
f"Skipping data for type {url_name} from {url}. Exception: {exception}."
f"Error raised while paginating {url_name} from {url}.",
exc_info=exception,
)
break
raise exception

async def paginated_api_call_for_datacenter_syncrule(self, url_name, **url_kwargs):
"""Make a paginated API call for datacenter using the passed url_name.
Expand Down Expand Up @@ -305,9 +309,10 @@ async def paginated_api_call_for_datacenter_syncrule(self, url_name, **url_kwarg
break
except Exception as exception:
self._logger.warning(
f"Skipping data for type {url_name} from {url}. Exception: {exception}."
f"Error raised while paginating {url_name} from {url}.",
exc_info=exception,
)
break
raise exception

async def download_func(self, url):
yield await self.api_call(url)
Expand Down Expand Up @@ -345,36 +350,57 @@ async def fetch_server_space_permission(self, url):
return permission
except ClientResponseError as exception:
self._logger.warning(
f"Something went wrong. Make sure you have installed Extender for running confluence datacenter/server DLS. Exception: {exception}."
"Something went wrong. Make sure you have installed Extender for running confluence datacenter/server DLS.",
exc_info=exception,
)
return {}

async def fetch_page_blog_documents(self, api_query):
async for response in self.paginated_api_call(
url_name=CONTENT,
api_query=api_query,
):
attachment_count = 0
for document in response.get("results", []):
if document.get("children").get("attachment"):
attachment_count = (
document.get("children", {})
.get("attachment", {})
.get("size", 0)
)
if self.index_labels:
labels = await self.fetch_label(document["id"])
document["labels"] = labels
yield document, attachment_count
try:
async for response in self.paginated_api_call(
url_name=CONTENT,
api_query=api_query,
):
for document in response.get("results", []):
try:
if document.get("children").get("attachment"):
attachment_count = (
document.get("children", {})
.get("attachment", {})
.get("size", 0)
)
if self.index_labels:
labels = await self.fetch_label(document["id"])
document["labels"] = labels
yield document, attachment_count
except Exception as e:
# avoid letting single-document issues bubble up to interrupt the iteration
self._logger.warning(
f"Encountered error while handling content documents: '{document.get('id')}'. Skipping it.",
exc_info=e,
)
except Exception as e:
# avoid letting errors while paginating content cause a sync failure
self._logger.warning(
f"Encountered an error while fetching content with query: '{api_query}'. Any remaining content from this query will be skipped.",
exc_info=e,
)

async def fetch_attachments(self, content_id):
async for response in self.paginated_api_call(
url_name=ATTACHMENT,
api_query=ATTACHMENT_QUERY,
id=content_id,
):
for attachment in response.get("results", []):
yield attachment
try:
async for response in self.paginated_api_call(
url_name=ATTACHMENT,
api_query=ATTACHMENT_QUERY,
id=content_id,
):
for attachment in response.get("results", []):
yield attachment
except Exception as e:
# swallow attachment exceptions during pagination, rather than let them fail the sync.
self._logger.warning(
f"Encountered an error while fetching attachments for '{content_id}'. Any remaining attachments will be skipped.",
exc_info=e,
)

async def ping(self):
await self.api_call(
Expand Down Expand Up @@ -856,7 +882,7 @@ async def ping(self):
try:
await self.confluence_client.ping()
except Exception as e:
self._logger.warning(f"Error while connecting to Confluence: {e}")
self._logger.warning("Error while connecting to Confluence", exc_info=e)
raise

def get_permission(self, permission):
Expand Down Expand Up @@ -1096,10 +1122,10 @@ async def _attachment_coro(self, document, access_control):
)
)
except Exception as exception:
self._logger.exception(
f"Error while fetching attachments of {document.get('title')} with id {document.get('_id')}, type: {document.get('type')} in space {document.get('space')}: {exception}"
self._logger.warning(
f"Error while fetching attachments of {document.get('title')} with id {document.get('_id')}, type: {document.get('type')} in space {document.get('space')}",
exc_info=exception,
)
raise
finally:
await self.queue.put(END_SIGNAL) # pyright: ignore

Expand Down Expand Up @@ -1154,7 +1180,7 @@ async def _space_coro(self):
)
yield space
except Exception as exception:
self._logger.exception(f"Error while fetching spaces: {exception}")
self._logger.exception("Error while fetching spaces", exc_info=exception)
raise

async def _page_blog_coro(self, api_query, target_type):
Expand Down Expand Up @@ -1218,7 +1244,8 @@ async def _page_blog_coro(self, api_query, target_type):
self.fetcher_count += 1
except Exception as exception:
self._logger.exception(
f"Error while fetching pages and blogposts with query '{api_query}': {exception}"
f"Error while fetching pages and blogposts with query '{api_query}'",
exc_info=exception,
)
raise
finally:
Expand Down