Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fleet] only show remote ES output health status if later than last updated time #177685

Merged
merged 5 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 55 additions & 2 deletions x-pack/plugins/fleet/server/services/output.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ const CONFIG_WITHOUT_ES_HOSTS = {
},
};

function mockOutputSO(id: string, attributes: any = {}) {
function mockOutputSO(id: string, attributes: any = {}, updatedAt?: string) {
return {
id: outputIdToUuid(id),
type: 'ingest-outputs',
Expand All @@ -74,6 +74,7 @@ function mockOutputSO(id: string, attributes: any = {}) {
output_id: id,
...attributes,
},
updated_at: updatedAt,
};
}

Expand Down Expand Up @@ -146,7 +147,9 @@ function getMockedSoClient(
}

default:
throw new Error('not found: ' + id);
return mockOutputSO(id, {
type: 'remote_elasticsearch',
});
}
});
soClient.update.mockImplementation(async (type, id, data) => {
Expand Down Expand Up @@ -1868,6 +1871,11 @@ describe('Output Service', () => {
});

describe('getLatestOutputHealth', () => {
let soClient: any;
beforeEach(() => {
soClient = getMockedSoClient();
});

it('should return unknown state if no hits', async () => {
esClientMock.search.mockResolvedValue({
hits: {
Expand Down Expand Up @@ -1907,6 +1915,51 @@ describe('Output Service', () => {
timestamp: '2023-11-30T14:25:31Z',
});
});

it('should apply range filter if updated_at available', async () => {
const updatedAt = '2023-11-30T14:25:31Z';
soClient.get.mockResolvedValue(
mockOutputSO(
'id',
{
type: 'remote_elasticsearch',
},
updatedAt
)
);

await outputService.getLatestOutputHealth(esClientMock, 'id');

expect((esClientMock.search.mock.lastCall?.[0] as any)?.query.bool.must).toEqual([
{
range: {
'@timestamp': {
gte: updatedAt,
},
},
},
]);
});

it('should not apply range filter if updated_at is not available', async () => {
soClient.get.mockResolvedValue(
mockOutputSO('id', {
type: 'remote_elasticsearch',
})
);

await outputService.getLatestOutputHealth(esClientMock, 'id');

expect((esClientMock.search.mock.lastCall?.[0] as any)?.query.bool.must).toEqual([]);
});

it('should not apply range filter if output query returns error', async () => {
soClient.get.mockResolvedValue({ error: { message: 'error' } });

await outputService.getLatestOutputHealth(esClientMock, 'id');

expect((esClientMock.search.mock.lastCall?.[0] as any)?.query.bool.must).toEqual([]);
});
});

describe('backfillAllOutputPresets', () => {
Expand Down
33 changes: 32 additions & 1 deletion x-pack/plugins/fleet/server/services/output.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1085,10 +1085,23 @@ class OutputService {
}

async getLatestOutputHealth(esClient: ElasticsearchClient, id: string): Promise<OutputHealth> {
const lastUpdateTime = await this.getOutputLastUpdateTime(id);

const mustFilter = [];
if (lastUpdateTime) {
mustFilter.push({
range: {
'@timestamp': {
gte: lastUpdateTime,
},
},
});
}

const response = await esClient.search(
{
index: OUTPUT_HEALTH_DATA_STREAM,
query: { bool: { filter: { term: { output: id } } } },
query: { bool: { filter: { term: { output: id } }, must: mustFilter } },
sort: { '@timestamp': 'desc' },
size: 1,
},
Expand All @@ -1109,6 +1122,24 @@ class OutputService {
timestamp: latestHit['@timestamp'],
};
}

async getOutputLastUpdateTime(id: string): Promise<string | undefined> {
const outputSO = await this.encryptedSoClient.get<OutputSOAttributes>(
SAVED_OBJECT_TYPE,
outputIdToUuid(id)
);

if (outputSO.error) {
appContextService
.getLogger()
.debug(
`Error getting output ${id} SO, using updated_at:undefined, cause: ${outputSO.error.message}`
);
return undefined;
}

return outputSO.updated_at;
}
}

interface OutputHealth {
Expand Down
13 changes: 10 additions & 3 deletions x-pack/test/fleet_api_integration/apis/outputs/crud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ export default function (providerContext: FtrProviderContext) {
document: {
state: 'HEALTHY',
message: '',
'@timestamp': '' + Date.parse('2023-11-29T14:25:31Z'),
'@timestamp': new Date(Date.now() - 1).toISOString(),
output: defaultOutputId,
},
});
Expand All @@ -285,7 +285,7 @@ export default function (providerContext: FtrProviderContext) {
document: {
state: 'DEGRADED',
message: 'connection error',
'@timestamp': '' + Date.parse('2023-11-30T14:25:31Z'),
'@timestamp': new Date().toISOString(),
output: defaultOutputId,
},
});
Expand All @@ -297,7 +297,7 @@ export default function (providerContext: FtrProviderContext) {
state: 'HEALTHY',
message: '',
'@timestamp': '' + Date.parse('2023-11-31T14:25:31Z'),
output: 'remote2',
output: ESOutputId,
},
});
});
Expand All @@ -310,6 +310,13 @@ export default function (providerContext: FtrProviderContext) {
expect(outputHealth.message).to.equal('connection error');
expect(outputHealth.timestamp).not.to.be.empty();
});
it('should not return output health if older than output last updated time', async () => {
const { body: outputHealth } = await supertest
.get(`/api/fleet/outputs/${ESOutputId}/health`)
.expect(200);

expect(outputHealth.state).to.equal('UNKNOWN');
});
});

describe('PUT /outputs/{outputId}', () => {
Expand Down
Loading