Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cache pubsub topic and reuse it to publish future messages #290

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ Dispatchers handle vehicle data processing upon its arrival at Fleet Telemetry s
* Configure stream names directly by setting the streams config `"kinesis": { "streams": { *topic_name*: stream_name } }`
* Override stream names with env variables: KINESIS_STREAM_\*uppercase topic\* ex.: `KINESIS_STREAM_V`
* Google pubsub: Along with the required pubsub config (See ./test/integration/config.json for example), be sure to set the environment variable `GOOGLE_APPLICATION_CREDENTIALS`
* If you have already created relevant topics, you can use `topic_check_refresh_interval_seconds` to update server's behavior:-
* 0 (default) : Will always attempt to create and check existence of topic before publishing
* -1 : Will attempt to dispatch the message without checking the existence of topic. Set this to -1, if topics were created before deploying
* x : Will only attempt to check existence of topic every x seconds. If the topic was deleted, it will attempt to recreate after x seconds.
* ZMQ: Configure with the config.json file. See implementation here: [config/config.go](./config/config.go)
* Logger: This is a simple STDOUT logger that serializes the protos to json.

Expand Down
5 changes: 4 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ type Pubsub struct {
// GCP Project ID
ProjectID string `json:"gcp_project_id,omitempty"`

// TopicCheckRefreshIntervalSeconds -1: do not check if topic exists, 0: always check, non zero value signifies refresh interval for check
TopicCheckRefreshIntervalSeconds int `json:"topic_check_refresh_interval_seconds,omitempty"`

Publisher *pubsub.Client
}

Expand Down Expand Up @@ -281,7 +284,7 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.Handler, logger *l
if c.Pubsub == nil {
return nil, nil, errors.New("expected Pubsub to be configured")
}
googleProducer, err := googlepubsub.NewProducer(c.prometheusEnabled(), c.Pubsub.ProjectID, c.Namespace, c.MetricCollector, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.Pubsub], logger)
googleProducer, err := googlepubsub.NewProducer(c.prometheusEnabled(), c.Pubsub.ProjectID, c.Namespace, c.Pubsub.TopicCheckRefreshIntervalSeconds, c.MetricCollector, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.Pubsub], logger)
if err != nil {
return nil, nil, err
}
Expand Down
88 changes: 59 additions & 29 deletions datastore/googlepubsub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ import (

// Producer client to handle google pubsub interactions
type Producer struct {
pubsubClient *pubsub.Client
projectID string
namespace string
metricsCollector metrics.MetricCollector
prometheusEnabled bool
logger *logrus.Logger
airbrakeHandler *airbrake.Handler
ackChan chan (*telemetry.Record)
reliableAckTxTypes map[string]interface{}
pubsubClient *pubsub.Client
projectID string
namespace string
metricsCollector metrics.MetricCollector
prometheusEnabled bool
logger *logrus.Logger
airbrakeHandler *airbrake.Handler
ackChan chan (*telemetry.Record)
reliableAckTxTypes map[string]interface{}
topicCheckRefreshIntervalSeconds int
topicCheckMap map[string]time.Time
}

// Metrics stores metrics reported from this package
Expand Down Expand Up @@ -57,45 +59,74 @@ func configurePubsub(projectID string) (*pubsub.Client, error) {
}

// NewProducer establishes the pubsub connection and define the dispatch method
func NewProducer(prometheusEnabled bool, projectID string, namespace string, metricsCollector metrics.MetricCollector, airbrakeHandler *airbrake.Handler, ackChan chan (*telemetry.Record), reliableAckTxTypes map[string]interface{}, logger *logrus.Logger) (telemetry.Producer, error) {
func NewProducer(prometheusEnabled bool, projectID string, namespace string, topicCheckRefreshIntervalSeconds int, metricsCollector metrics.MetricCollector, airbrakeHandler *airbrake.Handler, ackChan chan (*telemetry.Record), reliableAckTxTypes map[string]interface{}, logger *logrus.Logger) (telemetry.Producer, error) {
registerMetricsOnce(metricsCollector)
pubsubClient, err := configurePubsub(projectID)
if err != nil {
return nil, fmt.Errorf("pubsub_connect_error %s", err)
}

p := &Producer{
projectID: projectID,
namespace: namespace,
pubsubClient: pubsubClient,
prometheusEnabled: prometheusEnabled,
metricsCollector: metricsCollector,
logger: logger,
airbrakeHandler: airbrakeHandler,
ackChan: ackChan,
reliableAckTxTypes: reliableAckTxTypes,
projectID: projectID,
namespace: namespace,
pubsubClient: pubsubClient,
prometheusEnabled: prometheusEnabled,
metricsCollector: metricsCollector,
logger: logger,
airbrakeHandler: airbrakeHandler,
ackChan: ackChan,
reliableAckTxTypes: reliableAckTxTypes,
topicCheckRefreshIntervalSeconds: topicCheckRefreshIntervalSeconds,
topicCheckMap: make(map[string]time.Time, 0),
}
p.logger.ActivityLog("pubsub_registered", logrus.LogInfo{"project": projectID, "namespace": namespace})
return p, nil
}

// Produce sends the record payload to pubsub
func (p *Producer) Produce(entry *telemetry.Record) {
ctx := context.Background()
func (p *Producer) topicForRecord(ctx context.Context, entry *telemetry.Record) *pubsub.Topic {
topicName := telemetry.BuildTopicName(p.namespace, entry.TxType)
topic := p.pubsubClient.Topic(topicName)
if p.topicCheckRefreshIntervalSeconds < 0 {
return topic
}

lastCheck, ok := p.topicCheckMap[topicName]
now := time.Now()

if ok && p.topicCheckRefreshIntervalSeconds < 0 {
return topic
}

if !ok || now.Sub(lastCheck) > time.Duration(p.topicCheckRefreshIntervalSeconds)*time.Second {
return p.upsertTopic(ctx, entry)
}

return topic
}

// upsertTopic creates topic if doesn't exist
func (p *Producer) upsertTopic(ctx context.Context, entry *telemetry.Record) *pubsub.Topic {
topicName := telemetry.BuildTopicName(p.namespace, entry.TxType)
logInfo := logrus.LogInfo{"topic_name": topicName, "txid": entry.Txid}
pubsubTopic, err := p.createTopicIfNotExists(ctx, topicName)

if err != nil {
p.ReportError("pubsub_topic_creation_error", err, logInfo)
metricsRegistry.notConnectedTotal.Inc(map[string]string{"record_type": entry.TxType})
return
return nil
}
p.topicCheckMap[topicName] = time.Now()
return pubsubTopic
}

if exists, err := pubsubTopic.Exists(ctx); !exists || err != nil {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I was going to ask if we actually needed this check too.

Copy link
Collaborator Author

@agbpatro agbpatro Jan 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah doesn't looks like we need this. Check seems to be redundant

p.ReportError("pubsub_topic_check_error", err, logInfo)
metricsRegistry.notConnectedTotal.Inc(map[string]string{"record_type": entry.TxType})
// Produce sends the record payload to pubsub
func (p *Producer) Produce(entry *telemetry.Record) {
ctx := context.Background()
topicName := telemetry.BuildTopicName(p.namespace, entry.TxType)
logInfo := logrus.LogInfo{"topic_name": topicName, "txid": entry.Txid}

pubsubTopic := p.topicForRecord(ctx, entry)
if pubsubTopic == nil {
p.logger.ActivityLog("topic_not_present", logInfo)
return
}

Expand All @@ -104,7 +135,7 @@ func (p *Producer) Produce(entry *telemetry.Record) {
Data: entry.Payload(),
Attributes: entry.Metadata(),
})
if _, err = result.Get(ctx); err != nil {
if _, err := result.Get(ctx); err != nil {
p.ReportError("pubsub_err", err, logInfo)
metricsRegistry.errorCount.Inc(map[string]string{"record_type": entry.TxType})
return
Expand Down Expand Up @@ -138,7 +169,6 @@ func (p *Producer) createTopicIfNotExists(ctx context.Context, topic string) (*p
if exists {
return pubsubTopic, nil
}

return p.pubsubClient.CreateTopic(ctx, topic)
}

Expand Down
Loading