Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry refactor to allow for batching to retry. #617

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions src/pkg/egress/syslog/https.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewHTTPSWriter(
}
}

func (w *HTTPSWriter) sendHttpRequest(msg []byte, msgCount float64) error {
func (w *HTTPSWriter) sendHttpRequest(msg []byte) error {
req := fasthttp.AcquireRequest()
req.SetRequestURI(w.url.String())
req.Header.SetMethod("POST")
Expand All @@ -63,8 +63,6 @@ func (w *HTTPSWriter) sendHttpRequest(msg []byte, msgCount float64) error {
return fmt.Errorf("syslog Writer: Post responded with %d status code", resp.StatusCode())
}

w.egressMetric.Add(msgCount)

return nil
}

Expand All @@ -76,10 +74,11 @@ func (w *HTTPSWriter) Write(env *loggregator_v2.Envelope) error {
}

for _, msg := range msgs {
err = w.sendHttpRequest(msg, 1)
err = w.sendHttpRequest(msg)
if err != nil {
return err
}
w.egressMetric.Add(1)
}

return nil
Expand Down
16 changes: 12 additions & 4 deletions src/pkg/egress/syslog/https_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const BATCHSIZE = 256 * 1024

type HTTPSBatchWriter struct {
HTTPSWriter
*Retryer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why embed *Retryer rather than using a named field to encapsulate the new struct via composition?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that I would prefer composition in this case because I don't see a good reason to exposes the fields and methods of *Retryer directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was using a Pointer here, so that I could insert the retryer by creating it in the writer factory layer, so that the implementation and logic does not rely on the writers itself to propagate the settings through to the retryer.
Might be a shortcoming due to my limited understanding of Go idiomatic concepts.

msgs chan []byte
batchSize int
sendInterval time.Duration
Expand All @@ -27,6 +28,7 @@ func NewHTTPSBatchWriter(
tlsConf *tls.Config,
egressMetric metrics.Counter,
c *Converter,
retryer *Retryer,
) egress.WriteCloser {
client := httpClient(netConf, tlsConf)
binding.URL.Scheme = "https" // reset the scheme for usage to a valid http scheme
Expand All @@ -43,6 +45,7 @@ func NewHTTPSBatchWriter(
sendInterval: 1 * time.Second,
egrMsgCount: 0,
msgs: make(chan []byte),
Retryer: retryer,
}
go BatchWriter.startSender()
return BatchWriter
Expand Down Expand Up @@ -74,6 +77,13 @@ func (w *HTTPSBatchWriter) startSender() {
msgCount = 0
t.Reset(w.sendInterval)
}
sendBatch := func() {
err := w.Retryer.Retry(msgBatch.Bytes(), w.sendHttpRequest)
if err == nil {
w.egressMetric.Add(msgCount)
}
reset()
}
for {
select {
case msg := <-w.msgs:
Expand All @@ -84,14 +94,12 @@ func (w *HTTPSBatchWriter) startSender() {
} else {
msgCount++
if length >= w.batchSize {
w.sendHttpRequest(msgBatch.Bytes(), msgCount) //nolint:errcheck
reset()
sendBatch()
}
}
case <-t.C:
if msgBatch.Len() > 0 {
w.sendHttpRequest(msgBatch.Bytes(), msgCount) //nolint:errcheck
reset()
sendBatch()
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/pkg/egress/syslog/https_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,18 @@ var _ = Describe("HTTPS_batch", func() {
"test-app-id",
"test-hostname",
)
retryer := syslog.NewBackoffRetryer(
b,
syslog.ExponentialDuration,
2,
)
writer = syslog.NewHTTPSBatchWriter(
b,
netConf,
skipSSLTLSConfig,
&metricsHelpers.SpyMetric{},
c,
retryer,
)
})

Expand Down
52 changes: 52 additions & 0 deletions src/pkg/egress/syslog/retryer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package syslog

import (
"log"
"time"

"code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress"
)

// RetryWriter wraps a WriteCloser and will retry writes if the first fails.
type Retryer struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just represent Retryer as some new fields and methods in HTTPSBatchWriter since that's the only writer set to use it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion on this, but I think I lean toward just putting all this code into HTTPSBatchWriter so that it's all more self-contained.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about that as well, but when one would introduce the retryer in the way I built it here, it would have more than one benefit:

  1. Retrying after stringifying the rfsyslog message seemed beneficial performance wise.
  2. Having a connection aware retry logic for tls based approaches might result in a more efficient approach to solve connectivity issues (retries are only needed due to connectivity issues)
  3. the retry_writer retries parsing issues for syslog messages #612 is also present for tls/tcp writers. this could be fixed with a dedicated pr.

retryDuration RetryDuration
maxRetries int
binding *URLBinding
}

func NewBackoffRetryer(
urlBinding *URLBinding,
retryDuration RetryDuration,
maxRetries int,
) *Retryer {
nicklas-dohrn marked this conversation as resolved.
Show resolved Hide resolved
return &Retryer{
retryDuration: retryDuration,
maxRetries: maxRetries,
binding: urlBinding,
}
}

// Write will retry writes unitl maxRetries has been reached.
func (r *Retryer) Retry(message []byte, fn func(msg []byte) error) error {
logTemplate := "failed to write to %s, retrying in %s, err: %s"

var err error

for i := 0; i < r.maxRetries; i++ {
err = fn(message)
if err == nil {
return nil
}

if egress.ContextDone(r.binding.Context) {
return err
}

sleepDuration := r.retryDuration(i)
log.Printf(logTemplate, r.binding.URL.Host, sleepDuration, err)

time.Sleep(sleepDuration)
}

return err
}
6 changes: 6 additions & 0 deletions src/pkg/egress/syslog/writer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ func (f WriterFactory) NewWriter(ub *URLBinding) (egress.WriteCloser, error) {
}
converter := NewConverter(o...)

retryer := NewBackoffRetryer(
ub,
ExponentialDuration,
maxRetries)

var w egress.WriteCloser
switch ub.URL.Scheme {
case "https":
Expand All @@ -113,6 +118,7 @@ func (f WriterFactory) NewWriter(ub *URLBinding) (egress.WriteCloser, error) {
tlsCfg,
egressMetric,
converter,
retryer,
)
case "syslog":
w = NewTCPWriter(
Expand Down
Loading