Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Connector APIs] Connector update last sync info, status, error #2641

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions connectors/es/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,30 @@ async def connector_check_in(self, connector_id):
headers={"accept": "application/json"},
)

async def connector_update_error(self, connector_id, error):
await self.client.perform_request(
"PUT",
f"/_connector/{connector_id}/_error",
headers={"accept": "application/json", "Content-Type": "application/json"},
body={"error": error},
)

async def connector_update_status(self, connector_id, status):
await self.client.perform_request(
"PUT",
f"/_connector/{connector_id}/_status",
headers={"accept": "application/json", "Content-Type": "application/json"},
body={"status": status},
)

async def connector_update_last_sync_info(self, connector_id, last_sync_info):
await self.client.perform_request(
"PUT",
f"/_connector/{connector_id}/_last_sync",
headers={"accept": "application/json", "Content-Type": "application/json"},
body=last_sync_info,
)

async def connector_update_filtering_draft_validation(
self, connector_id, validation_result
):
Expand Down Expand Up @@ -85,6 +109,25 @@ async def connector_check_in(self, connector_id):
partial(self._api_wrapper.connector_check_in, connector_id)
)

async def connector_update_error(self, connector_id, error):
await self._retrier.execute_with_retry(
partial(self._api_wrapper.connector_update_error, connector_id, error)
)

async def connector_update_status(self, connector_id, status):
await self._retrier.execute_with_retry(
partial(self._api_wrapper.connector_update_status, connector_id, status)
)

async def connector_update_last_sync_info(self, connector_id, last_sync_info):
await self._retrier.execute_with_retry(
partial(
self._api_wrapper.connector_update_last_sync_info,
connector_id,
last_sync_info,
)
)

async def connector_update_filtering_draft_validation(
self, connector_id, validation_result
):
Expand Down
82 changes: 58 additions & 24 deletions connectors/protocol/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -700,12 +700,17 @@ def next_sync(self, job_type, now):
return next_run(scheduling_property.get("interval"), now)

async def _update_datetime(self, field, new_ts):
await self.index.update(
doc_id=self.id,
doc={field: iso_utc(new_ts)},
if_seq_no=self._seq_no,
if_primary_term=self._primary_term,
)
if self.index.feature_use_connectors_api:
await self.index.api.connector_update_last_sync_info(
connector_id=self.id, last_sync_info={field: iso_utc(new_ts)}
)
else:
await self.index.update(
doc_id=self.id,
doc={field: iso_utc(new_ts)},
if_seq_no=self._seq_no,
if_primary_term=self._primary_term,
)

async def update_last_sync_scheduled_at_by_job_type(self, job_type, new_ts):
match job_type:
Expand Down Expand Up @@ -738,24 +743,43 @@ async def sync_starts(self, job_type):
msg = f"Unknown job type: {job_type}"
raise ValueError(msg)

doc = {
"status": Status.CONNECTED.value,
"error": None,
} | last_sync_information
if self.index.feature_use_connectors_api:
await self.index.api.connector_update_last_sync_info(
connector_id=self.id, last_sync_info=last_sync_information
)
await self.index.api.connector_update_status(
connector_id=self.id, status=Status.CONNECTED.value
)
await self.index.api.connector_update_error(
connector_id=self.id, error=None
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a little concerning - we do 3 calls to do single thing? Should we merge them into one call?

Copy link
Member Author

@jedrazb jedrazb Jun 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could unify error and status endpoint in a single call (requires small ES adjustment). Set status as a function of error being null or non-null.

However, I think we should maintain the _last_sync as a separate call. Integrating them would require expanding the last_sync_info endpoint with even more values in the request body (e.g. it would need to take error) - doable but then we are converging to _update like functionality.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, uniting status update + error update in one endpoint and leaving update_last_sync_info endpoint separately is a good way forward

else:
doc = {
"status": Status.CONNECTED.value,
"error": None,
} | last_sync_information

await self.index.update(
doc_id=self.id,
doc=doc,
if_seq_no=self._seq_no,
if_primary_term=self._primary_term,
)
await self.index.update(
doc_id=self.id,
doc=doc,
if_seq_no=self._seq_no,
if_primary_term=self._primary_term,
)

async def error(self, error):
doc = {
"status": Status.ERROR.value,
"error": str(error),
}
await self.index.update(doc_id=self.id, doc=doc)
if self.index.feature_use_connectors_api:
await self.index.api.connector_update_error(
connector_id=self.id, error=error
)
await self.index.api.connector_update_status(
connector_id=self.id, status=Status.ERROR.value
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here - this is an atomic action "mark as error" that updates status and writes error, should it be single action?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that we could unify this in a following way:

  • we just call _error endpoint, the logic in ES could set the status depending if error is null or not

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be perfect IMO!

else:
doc = {
"status": Status.ERROR.value,
"error": str(error),
}
await self.index.update(doc_id=self.id, doc=doc)

async def sync_done(self, job, cursor=None):
job_status = JobStatus.ERROR if job is None else job.status
Expand Down Expand Up @@ -794,8 +818,6 @@ async def sync_done(self, job, cursor=None):

doc = {
"last_synced": iso_utc(),
"status": connector_status.value,
"error": job_error,
} | last_sync_information

# only update sync cursor after a successful content sync job
Expand All @@ -806,7 +828,19 @@ async def sync_done(self, job, cursor=None):
doc["last_indexed_document_count"] = job.indexed_document_count
doc["last_deleted_document_count"] = job.deleted_document_count

await self.index.update(doc_id=self.id, doc=doc)
if self.index.feature_use_connectors_api:
await self.index.api.connector_update_status(
connector_id=self.id, status=connector_status.value
)
await self.index.api.connector_update_error(
connector_id=self.id, error=job_error
)
await self.index.api.connector_update_last_sync_info(
connector_id=self.id, last_sync_info=last_sync_information
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And same thing here - is there any chance we unite these three into one?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See answer above about other 3 calls

else:
doc = doc | {"status": connector_status.value, "error": job_error}
await self.index.update(doc_id=self.id, doc=doc)

@with_concurrency_control()
async def prepare(self, config, sources):
Expand Down
2 changes: 1 addition & 1 deletion connectors/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ def _callback(self, task):
)
elif task.exception():
logger.error(
f"Exception found for task {task.get_name()}: {task.exception()}",
f"Exception found for task {task.get_name()}: {task.exception()} {task}"
)

def _add_task(self, coroutine, name=None):
Expand Down
Loading