Skip to content

Commit

Permalink
Merge pull request #107 from rebuy-de/add-lokiutil
Browse files Browse the repository at this point in the history
CLOUD-3028 add lokiutil
  • Loading branch information
svenwltr authored Mar 15, 2022
2 parents c5f6f36 + 25e5ce0 commit a6b7b46
Show file tree
Hide file tree
Showing 3 changed files with 250 additions and 0 deletions.
74 changes: 74 additions & 0 deletions pkg/lokiutil/buf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package lokiutil

import (
"encoding/json"
"fmt"
"sort"
"strings"

"github.com/afiskon/promtail-client/logproto"
"github.com/rebuy-de/rebuy-go-sdk/v3/pkg/cmdutil"
"google.golang.org/protobuf/types/known/timestamppb"
)

func makeBatch(buffer map[string][]string) Batch {
result := []*logproto.Stream{}

for labels, messages := range buffer {
stream := logproto.Stream{
Labels: labels,
}

for _, message := range messages {
entry := logproto.Entry{
// We need to use "now" and not the message timestamp, because
// we need to guarantee message order.
Timestamp: timestamppb.Now(),
Line: message,
}
stream.Entries = append(stream.Entries, &entry)
}

result = append(result, &stream)
}

return result
}

func splitLabels(m Message, hostname string, keys []string) (string, string) {
labels := map[string]interface{}{
"project": cmdutil.Name,
"source": hostname,
}

for _, k := range keys {
value, ok := m[k]
if ok {
labels[k] = value
delete(m, k)
}
}

l := encodeLabels(labels)

p, err := json.Marshal(m)
if err != nil {
panic(err)
}

return string(l), string(p)
}

// Loki uses some weird format for their labels. Therefore we have to marshal
// it by ourselves.
func encodeLabels(labels map[string]interface{}) string {
parts := []string{}

for k, v := range labels {
parts = append(parts, fmt.Sprintf("%s=%#v", k, v))
}

sort.Strings(parts)

return fmt.Sprintf("{%s}", strings.Join(parts, ","))
}
169 changes: 169 additions & 0 deletions pkg/lokiutil/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package lokiutil

import (
"bytes"
"io/ioutil"
"net/http"
"os"
"time"

"github.com/afiskon/promtail-client/logproto"
"github.com/golang/protobuf/proto"
"github.com/golang/snappy"
"github.com/pkg/errors"
)

type Client struct {
url string
http *http.Client

maxSize int
maxWait time.Duration
keys []string

messages chan Message
errc chan error
stopped chan struct{}
}

func New(url string, maxSize int, maxWait time.Duration, keys []string) *Client {
c := &Client{
url: url,
http: new(http.Client),
maxWait: maxWait,
maxSize: maxSize,
keys: keys,

messages: make(chan Message, maxSize*2),
errc: make(chan error, 100),
stopped: make(chan struct{}),
}

return c
}

func (c *Client) Log(m Message) {
select {
case c.messages <- m:
default:
}
}

func (c *Client) Errc() <-chan error {
return c.errc
}

func (c *Client) err(err error) {
if err == nil {
return
}

select {
case c.errc <- err:
default:
}
}

// Stop sends the remaining messages and stops processing new ones.
func (c *Client) Stop() {
close(c.messages)
close(c.errc)
}

// Run processes the messages in the background. It needs to get stopped by
// Stop() to not lose messages. It is not controlled by a context, because
// logging should be the last component that gets stopped.
func (c *Client) Run() error {
var (
buffer = map[string][]string{}
size = 0
done = false
timer = time.NewTimer(c.maxWait)
)

defer close(c.stopped)

hostname, err := os.Hostname()
if err != nil {
return errors.WithStack(err)
}

for !done {
send := false
select {
case message, ok := <-c.messages:
if !ok {
// Channel is closed
done = true
break
}

l, p := splitLabels(message, hostname, c.keys)
messages := buffer[l]
messages = append(messages, p)
buffer[l] = messages

size++
if size >= c.maxSize {
send = true
}
case <-timer.C:
send = true
}

if send {
timer.Stop()
select {
case <-timer.C:
default:
}

if size > 0 {
batch := makeBatch(buffer)
err := c.sendBatch(batch)
c.err(err)
}

size = 0
buffer = map[string][]string{}
timer.Reset(c.maxWait)
}
}

return nil
}

func (c *Client) sendBatch(batch Batch) error {
buf, err := proto.Marshal(&logproto.PushRequest{
Streams: batch,
})
if err != nil {
return errors.WithStack(err)
}

buf = snappy.Encode(nil, buf)

req, err := http.NewRequest("POST", c.url, bytes.NewBuffer(buf))
if err != nil {
return errors.WithStack(err)
}

req.Header.Set("Content-Type", "application/x-protobuf")

resp, err := c.http.Do(req)
if err != nil {
return errors.WithStack(err)
}
defer resp.Body.Close()

resBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
return errors.WithStack(err)
}

if resp.StatusCode != 204 {
return errors.Errorf("unexpected HTTP status code %d: %s", resp.StatusCode, string(resBody))
}

return nil
}
7 changes: 7 additions & 0 deletions pkg/lokiutil/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package lokiutil

import "github.com/afiskon/promtail-client/logproto"

type Message = map[string]interface{}

type Batch = []*logproto.Stream

0 comments on commit a6b7b46

Please sign in to comment.