Skip to content

Commit

Permalink
[pinpoint-apm#182] retry unary RPC
Browse files Browse the repository at this point in the history
  • Loading branch information
feelform committed Jun 3, 2024
1 parent 9b07a43 commit d9e22ce
Show file tree
Hide file tree
Showing 8 changed files with 22 additions and 287 deletions.
2 changes: 1 addition & 1 deletion lib/client/data-sender-factory.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

'use strict'

const GrpcDataSender = require('./grpc-data-sender2')
const GrpcDataSender = require('./grpc-data-sender')
const DataSender = require('./data-sender')

let dataSender
Expand Down
48 changes: 16 additions & 32 deletions lib/client/grpc-data-sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ const grpc = require('@grpc/grpc-js')
const log = require('../utils/logger')
const services = require('../data/v1/Service_grpc_pb')
const dataConvertor = require('../data/grpc-data-convertor')
const pingIdGenerator = require('../context/sequence-generators').pingIdGenerator
const GrpcBidirectionalStream = require('./grpc-bidirectional-stream')
const GrpcClientSideStream = require('./grpc-client-side-stream')
const GrpcUnaryRPC = require('./grpc-unary-rpc')
const Scheduler = require('../utils/scheduler')
const makeAgentInformationMetadataInterceptor = require('./grpc/make-agent-information-metadata-interceptor')
const socketIdInterceptor = require('./grpc/socketid-interceptor')
const retryUnaryRequestInterceptor = require('./grpc/retry-unary-request-interceptor')

// AgentInfoSender.java
// refresh daily
Expand All @@ -37,56 +39,38 @@ class GrpcDataSender {
agentInfo,
config
) {
const headerInterceptor = function (options, nextCall) {
return new grpc.InterceptingCall(nextCall(options), {
start: function (metadata, listener, next) {
metadata.add('agentid', agentInfo.agentId)
metadata.add('applicationname', agentInfo.applicationName)
metadata.add('starttime', String(agentInfo.agentStartTime))
next(metadata, listener, next)
},
})
}
this.initializeClients(collectorIp, collectorTcpPort, headerInterceptor)
this.initializeSpanStream(collectorIp, collectorSpanPort, headerInterceptor, config)
this.initializeStatStream(collectorIp, collectorStatPort, headerInterceptor, config)
this.agentInfo = agentInfo
this.initializeClients(collectorIp, collectorTcpPort)
this.initializeSpanStream(collectorIp, collectorSpanPort, config)
this.initializeStatStream(collectorIp, collectorStatPort, config)
this.initializePingStream()
this.initializeAgentInfoScheduler()
}

initializeClients(collectorIp, collectorTcpPort, headerInterceptor) {
this.agentClient = new services.AgentClient(
collectorIp + ":" + collectorTcpPort,
grpc.credentials.createInsecure(), {
interceptors: [headerInterceptor, function (options, nextCall) {
return new grpc.InterceptingCall(nextCall(options), {
start: function (metadata, listener, next) {
metadata.add('socketid', `${pingIdGenerator.next}`)
next(metadata, listener, next)
}
})
}]
})
initializeClients(collectorIp, collectorTcpPort) {
this.agentClient = new services.AgentClient(collectorIp + ":" + collectorTcpPort, grpc.credentials.createInsecure(),
{ interceptors: [retryUnaryRequestInterceptor, makeAgentInformationMetadataInterceptor(this.agentInfo), socketIdInterceptor] })
this.requestAgentInfo = new GrpcUnaryRPC('requestAgentInfo', this.agentClient, this.agentClient.requestAgentInfo, DEFAULT_AGENT_INFO_SEND_INTERVAL_MS, DEFAULT_MAX_TRY_COUNT_PER_ATTEMPT)

this.metadataClient = new services.MetadataClient(collectorIp + ":" + collectorTcpPort, grpc.credentials.createInsecure(), { interceptors: [headerInterceptor] })
this.metadataClient = new services.MetadataClient(collectorIp + ":" + collectorTcpPort, grpc.credentials.createInsecure(),
{ interceptors: [retryUnaryRequestInterceptor, makeAgentInformationMetadataInterceptor(this.agentInfo)] })
this.requestApiMetaData = new GrpcUnaryRPC('requestApiMetaData', this.metadataClient, this.metadataClient.requestApiMetaData, DEFAULT_METADATA_RETRY_DELAY_MILLIS, DEFAULT_METADATA_RETRY_MAX_COUNT)
this.requestStringMetaData = new GrpcUnaryRPC('requestStringMetaData', this.metadataClient, this.metadataClient.requestStringMetaData, DEFAULT_METADATA_RETRY_DELAY_MILLIS, DEFAULT_METADATA_RETRY_MAX_COUNT)
this.requestSqlMetaData = new GrpcUnaryRPC('requestSqlMetaData', this.metadataClient, this.metadataClient.requestSqlMetaData, DEFAULT_METADATA_RETRY_DELAY_MILLIS, DEFAULT_METADATA_RETRY_MAX_COUNT)
this.requestSqlUidMetaData = new GrpcUnaryRPC('requestSqlUidMetaData', this.metadataClient, this.metadataClient.requestSqlUidMetaData, DEFAULT_METADATA_RETRY_DELAY_MILLIS, DEFAULT_METADATA_RETRY_MAX_COUNT)
}

initializeSpanStream(collectorIp, collectorSpanPort, headerInterceptor, config) {
this.spanClient = new services.SpanClient(collectorIp + ":" + collectorSpanPort, grpc.credentials.createInsecure(), { interceptors: [headerInterceptor] })
initializeSpanStream(collectorIp, collectorSpanPort, config) {
this.spanClient = new services.SpanClient(collectorIp + ":" + collectorSpanPort, grpc.credentials.createInsecure(), { interceptors: [makeAgentInformationMetadataInterceptor(this.agentInfo)] })

this.spanStream = new GrpcClientSideStream('spanStream', this.spanClient, this.spanClient.sendSpan)
if (config && config.streamDeadlineMinutesClientSide) {
this.spanStream.setDeadlineMinutes(config.streamDeadlineMinutesClientSide)
}
}

initializeStatStream(collectorIp, collectorStatPort, headerInterceptor, config) {
this.statClient = new services.StatClient(collectorIp + ":" + collectorStatPort, grpc.credentials.createInsecure(), { interceptors: [headerInterceptor] })
initializeStatStream(collectorIp, collectorStatPort, config) {
this.statClient = new services.StatClient(collectorIp + ":" + collectorStatPort, grpc.credentials.createInsecure(), { interceptors: [makeAgentInformationMetadataInterceptor(this.agentInfo)] })

this.statStream = new GrpcClientSideStream('statStream', this.statClient, this.statClient.sendAgentStat)
if (config && config.streamDeadlineMinutesClientSide) {
Expand Down
249 changes: 0 additions & 249 deletions lib/client/grpc-data-sender2.js

This file was deleted.

2 changes: 1 addition & 1 deletion test/client/grpc-data-sender-bidirectional-stream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
const test = require('tape')
const services = require('../../lib/data/v1/Service_grpc_pb')
const { log } = require('../test-helper')
const GrpcDataSender = require('../../lib/client/grpc-data-sender2')
const GrpcDataSender = require('../../lib/client/grpc-data-sender')
const GrpcServer = require('./grpc-server')
var _ = require('lodash')

Expand Down
2 changes: 1 addition & 1 deletion test/client/grpc-data-sender-client-side-stream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const test = require('tape')
const services = require('../../lib/data/v1/Service_grpc_pb')
const { Empty } = require('google-protobuf/google/protobuf/empty_pb')
const { log } = require('../test-helper')
const GrpcDataSender = require('../../lib/client/grpc-data-sender2')
const GrpcDataSender = require('../../lib/client/grpc-data-sender')
const GrpcServer = require('./grpc-server')
const Span = require('../../lib/context/span')
const GrpcClientSideStream = require('../../lib/client/grpc-client-side-stream')
Expand Down
2 changes: 1 addition & 1 deletion test/client/grpc-stream-deadline.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const { log } = require('../test-helper')

var _ = require('lodash')
const GrpcServer = require('./grpc-server')
const GrpcDataSender = require('../../lib/client/grpc-data-sender2')
const GrpcDataSender = require('../../lib/client/grpc-data-sender')

const spanMessages = require('../../lib/data/v1/Span_pb')

Expand Down
2 changes: 1 addition & 1 deletion test/client/grpc-unary-rpc.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const AgentInfo = require('../../lib/data/dto/agent-info')
const ApiMetaInfo = require('../../lib/data/dto/api-meta-info')
const StringMetaInfo = require('../../lib/data/dto/string-meta-info')
const DataSender = require('../../lib/client/data-sender')
const GrpcDataSender = require('../../lib/client/grpc-data-sender2')
const GrpcDataSender = require('../../lib/client/grpc-data-sender')
const MethodDescriptorBuilder = require('../../lib/context/method-descriptor-builder')
const MethodType = require('../../lib/constant/method-type')

Expand Down
2 changes: 1 addition & 1 deletion test/client/mock-grpc-data-sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/

'use strict'
const GrpcDataSender = require('../../lib/client/grpc-data-sender2')
const GrpcDataSender = require('../../lib/client/grpc-data-sender')
const GrpcUnaryRPC = require('../../lib/client/grpc-unary-rpc')

class MockgRPCDataSender extends GrpcDataSender {
Expand Down

0 comments on commit d9e22ce

Please sign in to comment.