Skip to content

Commit

Permalink
fixed linter issues
Browse files Browse the repository at this point in the history
  • Loading branch information
pkositsyn committed Sep 1, 2020
1 parent c867e95 commit c1bb45f
Show file tree
Hide file tree
Showing 14 changed files with 76 additions and 18 deletions.
1 change: 1 addition & 0 deletions kafka/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func Decode(buf []byte, in decoder) error {
return nil
}

// RealDecoder implements PacketDecoder
type RealDecoder struct {
raw []byte
off int
Expand Down
32 changes: 23 additions & 9 deletions kafka/fetch_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ type FetchRequest struct {
RackID string
}

// IsolationLevel is a setting for reliability
type IsolationLevel int8

// ExtractTopics returns a list of all topics from request
func (r *FetchRequest) ExtractTopics() []string {
var topics []string
for k := range r.blocks {
Expand All @@ -60,13 +62,15 @@ func (r *FetchRequest) ExtractTopics() []string {
return topics
}

// GetRequestedBlocksCount returns a total amount of blocks from fetch request
func (r *FetchRequest) GetRequestedBlocksCount() (blocksCount int) {
for _, partition := range r.blocks {
blocksCount += len(partition)
}
return
}

// Decode retrieves kafka fetch request from packet
func (r *FetchRequest) Decode(pd PacketDecoder, version int16) (err error) {
r.Version = version

Expand All @@ -85,7 +89,8 @@ func (r *FetchRequest) Decode(pd PacketDecoder, version int16) (err error) {
}
}
if r.Version >= 4 {
isolation, err := pd.getInt8()
var isolation int8
isolation, err = pd.getInt8()
if err != nil {
return err
}
Expand All @@ -110,17 +115,20 @@ func (r *FetchRequest) Decode(pd PacketDecoder, version int16) (err error) {
}
r.blocks = make(map[string]map[int32]*fetchRequestBlock)
for i := 0; i < topicCount; i++ {
topic, err := pd.getString()
var topic string
topic, err = pd.getString()
if err != nil {
return err
}
partitionCount, err := pd.getArrayLength()
var partitionCount int
partitionCount, err = pd.getArrayLength()
if err != nil {
return err
}
r.blocks[topic] = make(map[int32]*fetchRequestBlock)
for j := 0; j < partitionCount; j++ {
partition, err := pd.getInt32()
var partition int32
partition, err = pd.getInt32()
if err != nil {
return err
}
Expand All @@ -133,24 +141,28 @@ func (r *FetchRequest) Decode(pd PacketDecoder, version int16) (err error) {
}

if r.Version >= 7 {
forgottenCount, err := pd.getArrayLength()
var forgottenCount int
forgottenCount, err = pd.getArrayLength()
if err != nil {
return err
}
r.forgotten = make(map[string][]int32)
for i := 0; i < forgottenCount; i++ {
topic, err := pd.getString()
var topic string
topic, err = pd.getString()
if err != nil {
return err
}
partitionCount, err := pd.getArrayLength()
var partitionCount int
partitionCount, err = pd.getArrayLength()
if err != nil {
return err
}
r.forgotten[topic] = make([]int32, partitionCount)

for j := 0; j < partitionCount; j++ {
partition, err := pd.getInt32()
var partition int32
partition, err = pd.getInt32()
if err != nil {
return err
}
Expand All @@ -169,7 +181,8 @@ func (r *FetchRequest) Decode(pd PacketDecoder, version int16) (err error) {
return nil
}

func (r *FetchRequest) SendClientMetrics(srcHost string) {
// CollectClientMetrics collects metrics associated with client
func (r *FetchRequest) CollectClientMetrics(srcHost string) {
metrics.RequestsCount.WithLabelValues(srcHost, "fetch").Inc()

blocksCount := r.GetRequestedBlocksCount()
Expand Down Expand Up @@ -211,6 +224,7 @@ func (r *FetchRequest) requiredVersion() Version {
}
}

// AddBlock adds message block to fetch request
func (r *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32) {
if r.blocks == nil {
r.blocks = make(map[string]map[int32]*fetchRequestBlock)
Expand Down
1 change: 1 addition & 0 deletions kafka/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type Message struct {
compressedSize int // used for computing the compression ratio metrics
}

// Decode decodes message from packet
func (m *Message) Decode(pd PacketDecoder) (err error) {
crc32Decoder := acquireCrc32Field(crcIEEE)
defer releaseCrc32Field(crc32Decoder)
Expand Down
4 changes: 4 additions & 0 deletions kafka/message_set.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package kafka

// MessageBlock represents a part of request with message
type MessageBlock struct {
Offset int64
Msg *Message
Expand All @@ -14,6 +15,7 @@ func (msb *MessageBlock) Messages() []*MessageBlock {
return []*MessageBlock{msb}
}

// Decode decodes message block from packet
func (msb *MessageBlock) Decode(pd PacketDecoder) (err error) {
if msb.Offset, err = pd.getInt64(); err != nil {
return err
Expand All @@ -38,12 +40,14 @@ func (msb *MessageBlock) Decode(pd PacketDecoder) (err error) {
return nil
}

// MessageSet is a replacement for RecordBatch in older versions
type MessageSet struct {
PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock
OverflowMessage bool // whether the set on the wire contained an overflow message
Messages []*MessageBlock
}

// Decode retrieves message set from packet
func (ms *MessageSet) Decode(pd PacketDecoder) (err error) {
ms.Messages = nil

Expand Down
2 changes: 2 additions & 0 deletions kafka/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type RecordHeader struct {
Value []byte
}

// Decode retrieves record header from packet
func (h *RecordHeader) Decode(pd PacketDecoder) (err error) {
if h.Key, err = pd.getVarintBytes(); err != nil {
return err
Expand All @@ -38,6 +39,7 @@ type Record struct {
length varintLengthField
}

// Decode decodes record from packet
func (r *Record) Decode(pd PacketDecoder) (err error) {
if err = pd.push(&r.length); err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions kafka/record_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func (e recordsArray) Decode(pd PacketDecoder) error {
return nil
}

// RecordBatch are records from one kafka request
type RecordBatch struct {
FirstOffset int64
PartitionLeaderEpoch int32
Expand Down
9 changes: 8 additions & 1 deletion kafka/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@ var (
MaxRequestSize int32 = 100 * 1024 * 1024
)

// ProtocolBody represents body of kafka request
type ProtocolBody interface {
versionedDecoder
metrics.ClientMetricsSender
metrics.ClientMetricsCollector
key() int16
version() int16
requiredVersion() Version
}

// Request is a kafka request
type Request struct {
// Key is a Kafka api key - it defines kind of request (why it called api key?)
// List of api keys see here: https://kafka.apache.org/protocol#protocol_api_keys
Expand All @@ -42,6 +44,7 @@ type Request struct {
UsePreparedKeyVersion bool
}

// Decode decodes request from packet
func (r *Request) Decode(pd PacketDecoder) (err error) {
if !r.UsePreparedKeyVersion {
r.Key, err = pd.getInt16() // +2 bytes
Expand Down Expand Up @@ -86,18 +89,22 @@ func (r *Request) Decode(pd PacketDecoder) (err error) {
return r.Body.Decode(pd, r.Version)
}

// DecodeLength decodes length from packet
func DecodeLength(encoded []byte) int32 {
return int32(binary.BigEndian.Uint32(encoded[:4]))
}

// DecodeKey decodes key from packet. For terminology see kafka reference
func DecodeKey(encoded []byte) int16 {
return int16(binary.BigEndian.Uint16(encoded[4:6]))
}

// DecodeVersion descodes version from packet
func DecodeVersion(encoded []byte) int16 {
return int16(binary.BigEndian.Uint16(encoded[6:]))
}

// DecodeRequest decodes request from packets delivered by reader
func DecodeRequest(r io.Reader) (*Request, int, error) {
var (
needReadBytes = 8
Expand Down
11 changes: 9 additions & 2 deletions kafka/request_produce.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package kafka

import "github.com/d-ulyanov/kafka-sniffer/metrics"
import (
"github.com/d-ulyanov/kafka-sniffer/metrics"
)

// RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements
// it must see before responding. Any of the constants defined here are valid. On broker versions
Expand All @@ -9,6 +11,7 @@ import "github.com/d-ulyanov/kafka-sniffer/metrics"
// by setting the `min.isr` value in the brokers configuration).
type RequiredAcks int16

// ProduceRequest is a type of request in kafka
type ProduceRequest struct {
TransactionalID *string
RequiredAcks RequiredAcks
Expand All @@ -17,6 +20,7 @@ type ProduceRequest struct {
records map[string]map[int32]Records
}

// Decode decodes kafka produce request from packet
func (r *ProduceRequest) Decode(pd PacketDecoder, version int16) error {
r.Version = version

Expand Down Expand Up @@ -100,6 +104,7 @@ func (r *ProduceRequest) ExtractTopics() []string {
return out
}

// RecordsLen retrieves total size in bytes of all records in message
func (r *ProduceRequest) RecordsLen() (recordsLen int) {
for _, partition := range r.records {
for _, record := range partition {
Expand All @@ -114,6 +119,7 @@ func (r *ProduceRequest) RecordsLen() (recordsLen int) {
return
}

// RecordsSize retrieves total number of records in batch
func (r *ProduceRequest) RecordsSize() (recordsSize int) {
for _, partition := range r.records {
for _, record := range partition {
Expand All @@ -130,7 +136,8 @@ func (r *ProduceRequest) RecordsSize() (recordsSize int) {
return
}

func (r *ProduceRequest) SendClientMetrics(srcHost string) {
// CollectClientMetrics collects metrics associated with client
func (r *ProduceRequest) CollectClientMetrics(srcHost string) {
metrics.RequestsCount.WithLabelValues(srcHost, "produce").Inc()

batchSize := r.RecordsSize()
Expand Down
2 changes: 2 additions & 0 deletions kafka/timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package kafka

import "time"

// Timestamp is a structure representing UNIX timestamp
type Timestamp struct {
*time.Time
}

// Decode decodes timestamp from packet
func (t Timestamp) Decode(pd PacketDecoder) error {
millis, err := pd.getInt64()
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions kafka/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ type Encoder interface {
// as the Key or Value in a ProducerMessage.
type StringEncoder string

// Encode returns encoded bytes array
func (s StringEncoder) Encode() ([]byte, error) {
return []byte(s), nil
}

// Length returns length in bytes
func (s StringEncoder) Length() int {
return len(s)
}
Expand All @@ -31,10 +33,12 @@ func (s StringEncoder) Length() int {
// as the Key or Value in a ProducerMessage.
type ByteEncoder []byte

// Encode returns encoded bytes array
func (b ByteEncoder) Encode() ([]byte, error) {
return b, nil
}

// Length returns length in bytes
func (b ByteEncoder) Length() int {
return len(b)
}
Expand Down
10 changes: 7 additions & 3 deletions metrics/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,28 @@ package metrics
import "github.com/prometheus/client_golang/prometheus"

var (
// RequestsCount is a prometheus metric. See info field
RequestsCount = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "typed_requests_total",
Help: "Total requests to kafka by type",
}, []string{"client_ip", "request_type"})

// ProducerBatchLen is a prometheus metric. See info field
ProducerBatchLen = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "producer_batch_length",
Help: "Length of producer request batch to kafka",
}, []string{"client_ip"})

// ProducerBatchSize is a prometheus metric. See info field
ProducerBatchSize = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "producer_batch_size",
Help: "Total size of a batch in producer request to kafka",
}, []string{"client_ip"})

// BlocksRequested is a prometheus metric. See info field
BlocksRequested = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "blocks_requested",
Expand All @@ -32,7 +36,7 @@ func init() {
prometheus.MustRegister(RequestsCount, ProducerBatchLen, ProducerBatchSize, BlocksRequested)
}

// ClientMetricsSender is an interface, which allows to send metrics for concrete client
type ClientMetricsSender interface {
SendClientMetrics(srcHost string)
// ClientMetricsCollector is an interface, which allows to collect metrics for concrete client
type ClientMetricsCollector interface {
CollectClientMetrics(srcHost string)
}
4 changes: 4 additions & 0 deletions metrics/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Storage struct {
activeConnectionsTotal *metric
}

// NewStorage creates new Storage
func NewStorage(registerer prometheus.Registerer, expireTime time.Duration) *Storage {
var s = &Storage{
producerTopicRelationInfo: newMetric(prometheus.NewGaugeVec(prometheus.GaugeOpts{
Expand Down Expand Up @@ -47,14 +48,17 @@ func NewStorage(registerer prometheus.Registerer, expireTime time.Duration) *Sto
return s
}

// AddProducerTopicRelationInfo adds (producer, topic) pair to metrics
func (s *Storage) AddProducerTopicRelationInfo(producer, topic string) {
s.producerTopicRelationInfo.set(producer, topic)
}

// AddConsumerTopicRelationInfo adds (consumer, topic) pair to metrics
func (s *Storage) AddConsumerTopicRelationInfo(consumer, topic string) {
s.consumerTopicRelationInfo.set(consumer, topic)
}

// AddActiveConnectionsTotal adds incoming connection
func (s *Storage) AddActiveConnectionsTotal(clientIP string) {
s.activeConnectionsTotal.inc(clientIP)
}
Expand Down
Loading

0 comments on commit c1bb45f

Please sign in to comment.