Skip to content

Commit

Permalink
Update m3x and use code generated maps instead of ident.Hash based ma…
Browse files Browse the repository at this point in the history
…ps (#528)
  • Loading branch information
robskillington authored Apr 17, 2018
1 parent e05c089 commit 7e637f0
Show file tree
Hide file tree
Showing 98 changed files with 3,078 additions and 379 deletions.
1 change: 1 addition & 0 deletions .excludecoverage
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
_mock.go
_gen.go
_matcher.go
generated/
tools/
Expand Down
1 change: 1 addition & 0 deletions .excludefmt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
(gen-go/)
(.pb.go)
(_mock.go)
(_gen.go)
1 change: 1 addition & 0 deletions .excludelint
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
(vendor/)
(generated/)
(_mock.go)
(_gen.go)
1 change: 1 addition & 0 deletions .excludemetalint
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
vendor/
generated/
_mock.go
_gen.go
_todo.go
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ gopath_prefix := $(GOPATH)/src
license_dir := .ci/uber-licence
license_node_modules := $(license_dir)/node_modules
m3db_package := github.com/m3db/m3db
m3db_package_path := $(gopath_prefix)/$(m3db_package)
metalint_check := .ci/metalint.sh
metalint_config := .metalinter.json
metalint_exclude := .excludemetalint
Expand All @@ -23,6 +24,8 @@ thrift_rules_dir := generated/thrift
vendor_prefix := vendor
cache_policy ?= recently_read

include $(SELF_DIR)/generated-source-files.mk

BUILD := $(abspath ./bin)
GO_BUILD_LDFLAGS := $(shell $(abspath ./.ci/go-build-ldflags.sh) $(m3db_package))
LINUX_AMD64_ENV := GOOS=linux GOARCH=amd64 CGO_ENABLED=0
Expand Down Expand Up @@ -138,7 +141,7 @@ proto-gen: install-proto-bin install-license-bin
.PHONY: all-gen
# NB(prateek): order matters here, mock-gen needs to be last because we sometimes
# generate mocks for thrift/proto generated code.
all-gen: thrift-gen proto-gen mock-gen
all-gen: thrift-gen proto-gen mock-gen map-all-gen genny-all

.PHONY: metalint
metalint: install-metalinter install-linter-badtime
Expand Down
210 changes: 154 additions & 56 deletions client/host_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,64 +183,56 @@ func (q *queue) rotateOpsWithLock() []op {

func (q *queue) drain() {
var (
currWriteOpsByNamespace = make(map[ident.Hash][]op)
currBatchElementsByNamespace = make(map[ident.Hash][]*rpc.WriteBatchRawRequestElement)
currTaggedWriteOpsByNamespace = make(map[ident.Hash][]op)
currTaggedBatchElementsByNamespace = make(map[ident.Hash][]*rpc.WriteTaggedBatchRawRequestElement)
writeBatchSize = q.opts.WriteBatchSize()
currWriteOpsByNamespace namespaceWriteBatchOpsSlice
currTaggedWriteOpsByNamespace namespaceWriteTaggedBatchOpsSlice
writeBatchSize = q.opts.WriteBatchSize()
)

for ops := range q.drainIn {
var (
currWriteOps []op
currBatchElements []*rpc.WriteBatchRawRequestElement
currTaggedWriteOps []op
currTaggedBatchElements []*rpc.WriteTaggedBatchRawRequestElement
opsLen = len(ops)
)
opsLen := len(ops)
for i := 0; i < opsLen; i++ {
switch v := ops[i].(type) {
case *writeOperation:
namespace := v.namespace
namespaceKey := namespace.Hash()
currWriteOps = currWriteOpsByNamespace[namespaceKey]
currBatchElements = currBatchElementsByNamespace[namespaceKey]
if currWriteOps == nil {
currWriteOps = q.opsArrayPool.Get()
currBatchElements = q.writeBatchRawRequestElementArrayPool.Get()
idx := currWriteOpsByNamespace.indexOf(namespace)
if idx == -1 {
value := namespaceWriteBatchOps{
namespace: namespace,
opsArrayPool: q.opsArrayPool,
writeBatchRawRequestElementArrayPool: q.writeBatchRawRequestElementArrayPool,
}
idx = len(currWriteOpsByNamespace)
currWriteOpsByNamespace = append(currWriteOpsByNamespace, value)
}

currWriteOps = append(currWriteOps, ops[i])
currBatchElements = append(currBatchElements, &v.request)
currWriteOpsByNamespace[namespaceKey] = currWriteOps
currBatchElementsByNamespace[namespaceKey] = currBatchElements
currWriteOpsByNamespace.appendAt(idx, ops[i], &v.request)

if len(currWriteOps) == writeBatchSize {
if currWriteOpsByNamespace.lenAt(idx) == writeBatchSize {
// Reached write batch limit, write async and reset
q.asyncWrite(namespace, currWriteOps, currBatchElements)
currWriteOpsByNamespace[namespaceKey] = nil
currBatchElementsByNamespace[namespaceKey] = nil
q.asyncWrite(namespace, currWriteOpsByNamespace[idx].ops,
currWriteOpsByNamespace[idx].elems)
currWriteOpsByNamespace.resetAt(idx)
}
case *writeTaggedOperation:
namespace := v.namespace
namespaceKey := namespace.Hash()
currTaggedWriteOps = currTaggedWriteOpsByNamespace[namespaceKey]
currTaggedBatchElements = currTaggedBatchElementsByNamespace[namespaceKey]
if currTaggedWriteOps == nil {
currTaggedWriteOps = q.opsArrayPool.Get()
currTaggedBatchElements = q.writeTaggedBatchRawRequestElementArrayPool.Get()
idx := currTaggedWriteOpsByNamespace.indexOf(namespace)
if idx == -1 {
value := namespaceWriteTaggedBatchOps{
namespace: namespace,
opsArrayPool: q.opsArrayPool,
writeTaggedBatchRawRequestElementArrayPool: q.writeTaggedBatchRawRequestElementArrayPool,
}
idx = len(currTaggedWriteOpsByNamespace)
currTaggedWriteOpsByNamespace = append(currTaggedWriteOpsByNamespace, value)
}

currTaggedWriteOps = append(currTaggedWriteOps, ops[i])
currTaggedBatchElements = append(currTaggedBatchElements, &v.request)
currTaggedWriteOpsByNamespace[namespaceKey] = currTaggedWriteOps
currTaggedBatchElementsByNamespace[namespaceKey] = currTaggedBatchElements
currTaggedWriteOpsByNamespace.appendAt(idx, ops[i], &v.request)

if len(currTaggedWriteOps) == writeBatchSize {
if currTaggedWriteOpsByNamespace.lenAt(idx) == writeBatchSize {
// Reached write batch limit, write async and reset
q.asyncTaggedWrite(namespace, currTaggedWriteOps, currTaggedBatchElements)
currTaggedWriteOpsByNamespace[namespaceKey] = nil
currTaggedBatchElementsByNamespace[namespaceKey] = nil
q.asyncTaggedWrite(namespace, currTaggedWriteOpsByNamespace[idx].ops,
currTaggedWriteOpsByNamespace[idx].elems)
currTaggedWriteOpsByNamespace.resetAt(idx)
}
case *fetchBatchOp:
q.asyncFetch(v)
Expand All @@ -253,26 +245,28 @@ func (q *queue) drain() {
}

// If any outstanding write ops, async write
for _, writeOperations := range currWriteOpsByNamespace {
if len(writeOperations) > 0 {
namespace := writeOperations[0].(*writeOperation).namespace
namespaceKey := namespace.Hash()
q.asyncWrite(namespace, writeOperations, currBatchElementsByNamespace[namespaceKey])
currWriteOpsByNamespace[namespaceKey] = nil
currBatchElementsByNamespace[namespaceKey] = nil
for i, writeOps := range currWriteOpsByNamespace {
if len(writeOps.ops) > 0 {
q.asyncWrite(writeOps.namespace, writeOps.ops,
writeOps.elems)
}
// Zero the element
currWriteOpsByNamespace[i] = namespaceWriteBatchOps{}
}
// Reset the slice
currWriteOpsByNamespace = currWriteOpsByNamespace[:0]

// If any outstanding tagged write ops, async write
for _, writeOps := range currTaggedWriteOpsByNamespace {
if len(writeOps) > 0 {
namespace := writeOps[0].(*writeTaggedOperation).namespace
namespaceKey := namespace.Hash()
q.asyncTaggedWrite(namespace, writeOps, currTaggedBatchElementsByNamespace[namespaceKey])
currTaggedWriteOpsByNamespace[namespaceKey] = nil
currTaggedBatchElementsByNamespace[namespaceKey] = nil
for i, writeOps := range currTaggedWriteOpsByNamespace {
if len(writeOps.ops) > 0 {
q.asyncTaggedWrite(writeOps.namespace, writeOps.ops,
writeOps.elems)
}
// Zero the element
currTaggedWriteOpsByNamespace[i] = namespaceWriteTaggedBatchOps{}
}
// Reset the slice
currTaggedWriteOpsByNamespace = currTaggedWriteOpsByNamespace[:0]

if ops != nil {
q.opsArrayPool.Put(ops)
Expand All @@ -293,7 +287,7 @@ func (q *queue) asyncTaggedWrite(
// TODO(r): Use a worker pool to avoid creating new go routines for async writes
go func() {
req := q.writeTaggedBatchRawRequestPool.Get()
req.NameSpace = namespace.Data().Get()
req.NameSpace = namespace.Data().Bytes()
req.Elements = elems

// NB(r): Defer is slow in the hot path unfortunately
Expand Down Expand Up @@ -358,7 +352,7 @@ func (q *queue) asyncWrite(
// TODO(r): Use a worker pool to avoid creating new go routines for async writes
go func() {
req := q.writeBatchRawRequestPool.Get()
req.NameSpace = namespace.Data().Get()
req.NameSpace = namespace.Data().Bytes()
req.Elements = elems

// NB(r): Defer is slow in the hot path unfortunately
Expand Down Expand Up @@ -584,3 +578,107 @@ func errQueueUnknownOperation(hostID string) error {
func errQueueFetchNoResponse(hostID string) error {
return fmt.Errorf("host operation queue did not receive response for given fetch for host: %s", hostID)
}

// ops container types

type namespaceWriteBatchOps struct {
namespace ident.ID
opsArrayPool *opArrayPool
writeBatchRawRequestElementArrayPool writeBatchRawRequestElementArrayPool
ops []op
elems []*rpc.WriteBatchRawRequestElement
}

type namespaceWriteBatchOpsSlice []namespaceWriteBatchOps

func (s namespaceWriteBatchOpsSlice) indexOf(
namespace ident.ID,
) int {
idx := -1
for i := range s {
if s[i].namespace.Equal(namespace) {
return i
}
}
return idx
}

func (s namespaceWriteBatchOpsSlice) appendAt(
index int,
op op,
elem *rpc.WriteBatchRawRequestElement,
) {
if s[index].ops == nil {
s[index].ops = s[index].opsArrayPool.Get()
}
if s[index].elems == nil {
s[index].elems = s[index].writeBatchRawRequestElementArrayPool.Get()
}
s[index].ops = append(s[index].ops, op)
s[index].elems = append(s[index].elems, elem)
}

func (s namespaceWriteBatchOpsSlice) lenAt(
index int,
) int {
return len(s[index].ops)
}

func (s namespaceWriteBatchOpsSlice) resetAt(
index int,
) {
s[index].ops = nil
s[index].elems = nil
}

// TODO: use genny to make namespaceWriteBatchOps and namespaceWriteTaggedBatchOps
// share code (https://github.com/m3db/m3db/issues/531)
type namespaceWriteTaggedBatchOps struct {
namespace ident.ID
opsArrayPool *opArrayPool
writeTaggedBatchRawRequestElementArrayPool writeTaggedBatchRawRequestElementArrayPool
ops []op
elems []*rpc.WriteTaggedBatchRawRequestElement
}

type namespaceWriteTaggedBatchOpsSlice []namespaceWriteTaggedBatchOps

func (s namespaceWriteTaggedBatchOpsSlice) indexOf(
namespace ident.ID,
) int {
idx := -1
for i := range s {
if s[i].namespace.Equal(namespace) {
return i
}
}
return idx
}

func (s namespaceWriteTaggedBatchOpsSlice) appendAt(
index int,
op op,
elem *rpc.WriteTaggedBatchRawRequestElement,
) {
if s[index].ops == nil {
s[index].ops = s[index].opsArrayPool.Get()
}
if s[index].elems == nil {
s[index].elems = s[index].writeTaggedBatchRawRequestElementArrayPool.Get()
}
s[index].ops = append(s[index].ops, op)
s[index].elems = append(s[index].elems, elem)
}

func (s namespaceWriteTaggedBatchOpsSlice) lenAt(
index int,
) int {
return len(s[index].ops)
}

func (s namespaceWriteTaggedBatchOpsSlice) resetAt(
index int,
) {
s[index].ops = nil
s[index].elems = nil
}
Loading

0 comments on commit 7e637f0

Please sign in to comment.