Skip to content

Commit

Permalink
[#223] Supports Echo and Active Thread Count
Browse files Browse the repository at this point in the history
* active thread count matcher
  • Loading branch information
feelform committed Sep 12, 2024
1 parent f18dccf commit 23d64fa
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 45 deletions.
89 changes: 73 additions & 16 deletions lib/client/grpc-data-sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ class GrpcDataSender {
this.commandEchoCallArguments = callArguments
}

setCommandStreamActiveThreadCountCallArguments(callArguments) {
this.commandStreamActiveThreadCountCallArguments = callArguments
}

close() {
this.closeScheduler()
if (this.spanStream) {
Expand All @@ -66,8 +70,11 @@ class GrpcDataSender {
if (this.statClient) {
this.statClient.close()
}
if (this.profilerStream) {
this.profilerStream.end()
if (this.commandStream) {
this.commandStream.end()
}
if (this.activeThreadCountStream) {
this.activeThreadCountStream.end()
}
if (this.profilerClient) {
this.profilerClient.close()
Expand Down Expand Up @@ -136,8 +143,12 @@ class GrpcDataSender {
profilerBuilder.setGrpcServiceConfig(config.grpcServiceConfig.getProfiler())
}
this.profilerClient = new services.ProfilerCommandServiceClient(collectorIp + ":" + collectorTcpPort, grpc.credentials.createInsecure(), profilerBuilder.build())
this.profilerStream = new GrpcReadableStream(() => {
}

makeCommandStream() {
this.commandStream = new GrpcReadableStream(() => {
const writable = this.profilerClient.handleCommandV2()

writable.on('data', (cmdRequest) => {
const requestId = cmdRequest.getRequestid()
const command = cmdRequest.getCommandCase()
Expand All @@ -155,27 +166,47 @@ class GrpcDataSender {
const message = cmdRequest.getCommandecho().getMessage()
cmdEchoResponse.setMessage(message)

const callArguments = guardCallArguments(this.commandEchoCallArguments)
const metadata = callArguments.getMetadata()
let options = callArguments.getOptions()
const callback = callArguments.getCallback()
this.profilerClient.commandEcho(cmdEchoResponse, metadata, options, (err, response) => {
if (err) {
log.error(err)
}
if (callback) {
callback(err, response)
}
})
this.sendCommandEcho(cmdEchoResponse, this.commandEchoCallArguments)
},
'ACTIVE_THREAD_COUNT': () => {
const commonStreamResponse = new cmdMessages.PCmdStreamResponse()
commonStreamResponse.setResponseid(requestId)
commonStreamResponse.setSequenceid(1)
const stringValue = new wrappers.StringValue()
stringValue.setValue('')
commonStreamResponse.setMessage(stringValue)

const commandActiveThreadCountResponse = new cmdMessages.PCmdActiveThreadCountRes()
commandActiveThreadCountResponse.setCommonstreamresponse(commonStreamResponse)
commandActiveThreadCountResponse.setHistogramschematype(2)
commandActiveThreadCountResponse.addActivethreadcount(1)
commandActiveThreadCountResponse.setTimestamp(Date.now())

this.sendActiveThreadCount(commandActiveThreadCountResponse)
}
})
})
return writable
})
}

makeActiveThreadCountStream() {
this.activeThreadCountStream = new GrpcReadableStream(() => {
const callArguments = guardCallArguments(this.commandStreamActiveThreadCountCallArguments)
const metadata = callArguments.getMetadata()
let options = callArguments.getOptions()
const callback = callArguments.getCallback()
return this.profilerClient.commandStreamActiveThreadCount(metadata, options, (err, response) => {
if (err) {
log.error(err)
}
if (callback) {
callback(err, response)
}
})
})
}

agentInfoRefreshInterval() {
return DEFAULT_AGENT_INFO_REFRESH_INTERVAL_MS
}
Expand Down Expand Up @@ -331,11 +362,37 @@ class GrpcDataSender {
}

sendSupportedServicesCommand() {
if (!this.commandStream) {
this.makeCommandStream()
}

const pCmdMessage = CommandType.supportedServicesCommandMessage()
if (log.isDebug()) {
log.debug(`sendControlHandshake pCmdMessage: ${JSON.stringify(pCmdMessage.toObject())}`)
}
this.profilerStream.push(pCmdMessage)
this.commandStream.push(pCmdMessage)
}

sendCommandEcho(commandEchoResponse, callArguments) {
callArguments = guardCallArguments(callArguments)
const metadata = callArguments.getMetadata()
let options = callArguments.getOptions()
const callback = callArguments.getCallback()
this.profilerClient.commandEcho(commandEchoResponse, metadata, options, (err, response) => {
if (err) {
log.error(err)
}
if (callback) {
callback(err, response)
}
})
}

sendActiveThreadCount(commandActiveThreadCountResponse) {
if (!this.activeThreadCountStream) {
this.makeActiveThreadCountStream()
}
this.activeThreadCountStream.push(commandActiveThreadCountResponse)
}

sendPing() {
Expand Down
6 changes: 3 additions & 3 deletions lib/client/grpc-readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ class GrpcReadableStream {
// If an error occurs, it will be necessary to manually close each stream
// in order to prevent memory leaks.`
// for readable steam error memory leak prevention
if (this.writableSteam && typeof this.writableSteam.end === 'function') {
this.writableSteam.end()
if (this.writableStream && typeof this.writableStream.end === 'function') {
this.writableStream.end()
}
})

Expand All @@ -56,7 +56,7 @@ class GrpcReadableStream {
})

this.readableStream.pipe(writableStream)
this.writableSteam = writableStream
this.writableStream = writableStream
}

end() {
Expand Down
124 changes: 99 additions & 25 deletions test/client/grpc-data-sender.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,20 @@ function sendSpan(call, callback) {
callMetadata.push(call.metadata)
}

test('Should send span ', function (t) {
class DataSource extends DataSourceCallCountable {
constructor(collectorIp, collectorTcpPort, collectorStatPort, collectorSpanPort, agentInfo, config) {
super(collectorIp, collectorTcpPort, collectorStatPort, collectorSpanPort, agentInfo, config)
}

initializeClients() { }
initializeMetadataClients() { }
initializeStatStream() { }
initializePingStream() { }
initializeAgentInfoScheduler() { }
initializeProfilerClients() { }
}

test.skip('Should send span ', function (t) {
const expectedSpan = {
'traceId': {
'transactionId': {
Expand Down Expand Up @@ -159,7 +172,7 @@ test('Should send span ', function (t) {

const grpcDataSender = new MockGrpcDataSender('', 0, 0, 0, { agentId: 'agent', applicationName: 'applicationName', agentStartTime: 1234344 })

test('sendSpanChunk redis.SET.end', function (t) {
test.skip('sendSpanChunk redis.SET.end', function (t) {
let expectedSpanChunk = {
'agentId': 'express-node-sample-id',
'applicationName': 'express-node-sample-name',
Expand Down Expand Up @@ -288,7 +301,7 @@ test('sendSpanChunk redis.SET.end', function (t) {
})
})

test('sendSpanChunk redis.GET.end', (t) => {
test.skip('sendSpanChunk redis.GET.end', (t) => {
let expectedSpanChunk = {
'agentId': 'express-node-sample-id',
'applicationName': 'express-node-sample-name',
Expand Down Expand Up @@ -407,7 +420,7 @@ test('sendSpanChunk redis.GET.end', (t) => {
})
})

test('sendSpan', (t) => {
test.skip('sendSpan', (t) => {
let expectedSpanChunk = {
'traceId': {
'transactionId': {
Expand Down Expand Up @@ -712,7 +725,7 @@ test('sendSpan', (t) => {
})
})

test('sendStat', (t) => {
test.skip('sendStat', (t) => {
let expectedStat = {
'agentId': 'express-node-sample-id',
'agentStartTime': 1593058531421,
Expand Down Expand Up @@ -754,65 +767,91 @@ test('sendStat', (t) => {
t.equal(pCpuLoad.getSystemcpuload(), 0, 'cpu.system')
})

let requestId = 1
let requestId = 0
const handleCommandV2Service = (call) => {
const callRequests = getCallRequests()
const callMetadata = getMetadata()
callRequests.push(call.request)
callMetadata.push(call.metadata)

handleCommandCall = call

requestId++
serverCallWriter(CommandType.echo)
}

let handleCommandCall
const serverCallWriter = (commandType) => {
const result = new cmdMessage.PCmdRequest()
result.setRequestid(requestId++)
const message = new cmdMessage.PCmdEcho()
message.setMessage('echo')
result.setCommandecho(message)
call.write(result)
result.setRequestid(requestId)

if (commandType === CommandType.activeThreadCount) {
const commandActiveThreadCount = new cmdMessage.PCmdActiveThreadCount()
result.setCommandactivethreadcount(commandActiveThreadCount)
} else {
const message = new cmdMessage.PCmdEcho()
message.setMessage('echo')
result.setCommandecho(message)
}

handleCommandCall.write(result)
}

const commandEchoService = (call, callback) => {
let dataCallbackOnServerCall
const emptyResponseService = (call, callback) => {
call.on('data', (data) => {
if (typeof dataCallbackOnServerCall === 'function') {
dataCallbackOnServerCall(data)
}
})


const succeedOnRetryAttempt = call.metadata.get('succeed-on-retry-attempt')
const previousAttempts = call.metadata.get('grpc-previous-rpc-attempts')
const callRequests = getCallRequests()
const callMetadata = getMetadata()
// console.debug(`succeed-on-retry-attempt: ${succeedOnRetryAttempt[0]}, grpc-previous-rpc-attempts: ${previousAttempts[0]}`)
if (succeedOnRetryAttempt.length === 0 || (previousAttempts.length > 0 && previousAttempts[0] === succeedOnRetryAttempt[0])) {
callRequests.push(call.request)
callMetadata.push(call.metadata)
callback(null, new Empty())
callRequests.push(call.request)
callMetadata.push(call.metadata)
callback(null, new Empty())
} else {
const statusCode = call.metadata.get('respond-with-status')
const code = statusCode[0] ? Number.parseInt(statusCode[0]) : grpc.status.UNKNOWN
callback({ code: code, details: `Failed on retry ${previousAttempts[0] ?? 0}` })
const statusCode = call.metadata.get('respond-with-status')
const code = statusCode[0] ? Number.parseInt(statusCode[0]) : grpc.status.UNKNOWN
callback({ code: code, details: `Failed on retry ${previousAttempts[0] ?? 0}` })
}
}

class DataSource extends DataSourceCallCountable {
class ProfilerDataSource extends DataSourceCallCountable {
constructor(collectorIp, collectorTcpPort, collectorStatPort, collectorSpanPort, agentInfo, config) {
super(collectorIp, collectorTcpPort, collectorStatPort, collectorSpanPort, agentInfo, config)
}

initializeClients() { }
initializeMetadataClients() { }
initializeSpanStream() { }
initializeStatStream() { }
initializePingStream() { }
initializeAgentInfoScheduler() { }
}

test('sendSupportedServicesCommand', (t) => {
test('sendSupportedServicesCommand and commandEcho', (t) => {
t.plan(4)
dataCallbackOnServerCall = null
const server = new grpc.Server()
server.addService(services.ProfilerCommandServiceService, {
handleCommandV2: handleCommandV2Service,
commandEcho: commandEchoService
commandEcho: emptyResponseService
})

let dataSender
server.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, port) => {
dataSender = beforeSpecificOne(port, DataSource)
dataSender.sendSupportedServicesCommand()
dataSender = beforeSpecificOne(port, ProfilerDataSource)

const callArguments = new CallArgumentsBuilder(function (error, response) {
const callRequests = getCallRequests()
const commonResponse = callRequests[1].getCommonresponse()
t.equal(commonResponse.getResponseid(), 1, 'response id matches request id')
t.equal(commonResponse.getResponseid(), requestId, 'response id matches request id')
t.equal(commonResponse.getStatus(), 0, 'status is success')
t.equal(commonResponse.getMessage().getValue(), '', 'message is empty')

Expand All @@ -821,8 +860,43 @@ test('sendSupportedServicesCommand', (t) => {
afterOne(t)
}).build()
dataSender.setCommandEchoCallArguments(callArguments)
dataSender.sendSupportedServicesCommand()
})

t.teardown(() => {
dataSender.close()
server.forceShutdown()
})
})

test('CommandStreamActiveThreadCount', (t) => {
const server = new grpc.Server()
server.addService(services.ProfilerCommandServiceService, {
handleCommandV2: handleCommandV2Service,
commandEcho: emptyResponseService,
commandStreamActiveThreadCount: emptyResponseService
})
let dataSender
server.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, port) => {
dataSender = beforeSpecificOne(port, ProfilerDataSource)

dataCallbackOnServerCall = (data) => {
const commonStreamResponse = data.getCommonstreamresponse()
t.equal(commonStreamResponse.getResponseid(), requestId, 'response id matches request id')
t.equal(commonStreamResponse.getSequenceid(), 1, 'sequenceid is 1')
t.equal(commonStreamResponse.getMessage().getValue(), '', 'message is empty')

t.equal(data.getHistogramschematype(), 2, 'histogram schema type')
t.equal(data.getActivethreadcountList()[0], 1, 'active thread count')
afterOne(t)
}

const callArguments = new CallArgumentsBuilder(function (error, response) {
serverCallWriter(CommandType.activeThreadCount)
}).build()
dataSender.setCommandEchoCallArguments(callArguments)
dataSender.sendSupportedServicesCommand()
})

t.teardown(() => {
dataSender.close()
server.forceShutdown()
Expand Down
2 changes: 1 addition & 1 deletion test/client/mock-grpc-data-sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class MockGrpcDataSender extends GrpcDataSender {

initializeProfilerClients() {
let self = this
this.profilerStream = {
this.commandStream = {
write: function (pmessage) {
self.actualPCmdMessage = pmessage
},
Expand Down

0 comments on commit 23d64fa

Please sign in to comment.