Skip to content

Commit

Permalink
Claim sync jobs with Connector APIs (#2646)
Browse files Browse the repository at this point in the history
  • Loading branch information
jedrazb authored Jun 19, 2024
1 parent 4e9c9f7 commit 6952df2
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 8 deletions.
21 changes: 21 additions & 0 deletions connectors/es/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,17 @@ async def connector_activate_filtering_draft(self, connector_id):
headers={"accept": "application/json"},
)

async def connector_sync_job_claim(self, sync_job_id, worker_hostname, sync_cursor):
await self.client.perform_request(
"PUT",
f"/_connector/_sync_job/{sync_job_id}/_claim",
headers={"accept": "application/json", "Content-Type": "application/json"},
body={
"worker_hostname": worker_hostname,
**({"sync_cursor": sync_cursor} if sync_cursor else {}),
},
)

async def connector_sync_job_create(self, connector_id, job_type, trigger_method):
return await self.client.perform_request(
"POST",
Expand Down Expand Up @@ -90,6 +101,16 @@ async def connector_activate_filtering_draft(self, connector_id):
partial(self._api_wrapper.connector_activate_filtering_draft, connector_id)
)

async def connector_sync_job_claim(self, sync_job_id, worker_hostname, sync_cursor):
return await self._retrier.execute_with_retry(
partial(
self._api_wrapper.connector_sync_job_claim,
sync_job_id,
worker_hostname,
sync_cursor,
)
)

async def connector_sync_job_create(self, connector_id, job_type, trigger_method):
return await self._retrier.execute_with_retry(
partial(
Expand Down
23 changes: 15 additions & 8 deletions connectors/protocol/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,14 +322,21 @@ def _wrap_errors(self, operation_name, e):

async def claim(self, sync_cursor=None):
try:
doc = {
"status": JobStatus.IN_PROGRESS.value,
"started_at": iso_utc(),
"last_seen": iso_utc(),
"worker_hostname": socket.gethostname(),
"connector.sync_cursor": sync_cursor,
}
await self.index.update(doc_id=self.id, doc=doc)
if self.index.feature_use_connectors_api:
await self.index.api.connector_sync_job_claim(
sync_job_id=self.id,
worker_hostname=socket.gethostname(),
sync_cursor=sync_cursor,
)
else:
doc = {
"status": JobStatus.IN_PROGRESS.value,
"started_at": iso_utc(),
"last_seen": iso_utc(),
"worker_hostname": socket.gethostname(),
"connector.sync_cursor": sync_cursor,
}
await self.index.update(doc_id=self.id, doc=doc)
except Exception as e:
self._wrap_errors("claim job", e)

Expand Down
16 changes: 16 additions & 0 deletions tests/protocol/test_connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -790,11 +790,27 @@ async def test_sync_job_claim():
}

sync_job = SyncJob(elastic_index=index, doc_source=source)
sync_job.index.feature_use_connectors_api = False
await sync_job.claim(sync_cursor=SYNC_CURSOR)

index.update.assert_called_with(doc_id=sync_job.id, doc=expected_doc_source_update)


@pytest.mark.asyncio
async def test_sync_job_claim_with_connector_api():
source = {"_id": "1"}
index = Mock()
index.api.connector_sync_job_claim = AsyncMock(return_value={"result": "updated"})

sync_job = SyncJob(elastic_index=index, doc_source=source)
sync_job.index.feature_use_connectors_api = True
await sync_job.claim(sync_cursor=SYNC_CURSOR)

index.api.connector_sync_job_claim.assert_called_with(
sync_job_id=sync_job.id, worker_hostname=ANY, sync_cursor=SYNC_CURSOR
)


@pytest.mark.asyncio
async def test_sync_job_claim_fails():
source = {"_id": "1"}
Expand Down

0 comments on commit 6952df2

Please sign in to comment.