From a29448e74309748221ad846d1f2db784968ad9ad Mon Sep 17 00:00:00 2001 From: alxtkr77 <3098237+alxtkr77@users.noreply.github.com> Date: Thu, 11 Jul 2019 11:04:49 +0300 Subject: [PATCH] Add ngx binary response handling for GetItems (#21) --- go.mod | 1 + go.sum | 5 + pkg/dataplane/http/context.go | 260 +++++++++++++++--- pkg/dataplane/http/headers.go | 7 + pkg/dataplane/requestresponse.go | 5 + pkg/dataplane/schemas/build | 12 + pkg/dataplane/schemas/clean | 3 + pkg/dataplane/schemas/go.capnp | 27 ++ pkg/dataplane/schemas/java/java.capnp | 28 ++ .../schemas/node/common/ExtAttrValue.capnp | 25 ++ .../schemas/node/common/StringWrapper.capnp | 16 ++ .../schemas/node/common/TimeSpec.capnp | 15 + .../node/common/VnObjectAttributeKeyMap.capnp | 13 + .../common/VnObjectAttributeValueMap.capnp | 17 ++ .../common/VnObjectItemsGetResponse.capnp | 54 ++++ .../node/common/VnObjectItemsScanCookie.capnp | 16 ++ 16 files changed, 463 insertions(+), 41 deletions(-) mode change 100644 => 100755 go.mod mode change 100644 => 100755 go.sum mode change 100644 => 100755 pkg/dataplane/http/context.go mode change 100644 => 100755 pkg/dataplane/http/headers.go mode change 100644 => 100755 pkg/dataplane/requestresponse.go create mode 100755 pkg/dataplane/schemas/build create mode 100755 pkg/dataplane/schemas/clean create mode 100755 pkg/dataplane/schemas/go.capnp create mode 100755 pkg/dataplane/schemas/java/java.capnp create mode 100755 pkg/dataplane/schemas/node/common/ExtAttrValue.capnp create mode 100755 pkg/dataplane/schemas/node/common/StringWrapper.capnp create mode 100755 pkg/dataplane/schemas/node/common/TimeSpec.capnp create mode 100755 pkg/dataplane/schemas/node/common/VnObjectAttributeKeyMap.capnp create mode 100755 pkg/dataplane/schemas/node/common/VnObjectAttributeValueMap.capnp create mode 100755 pkg/dataplane/schemas/node/common/VnObjectItemsGetResponse.capnp create mode 100755 pkg/dataplane/schemas/node/common/VnObjectItemsScanCookie.capnp diff --git a/go.mod b/go.mod old mode 100644 new mode 100755 index efa0f3a..cd1f92f --- a/go.mod +++ b/go.mod @@ -15,4 +15,5 @@ require ( github.com/valyala/fasthttp v1.2.0 go.uber.org/atomic v1.3.2 // indirect go.uber.org/multierr v1.1.0 // indirect + zombiezen.com/go/capnproto2 v2.17.0+incompatible ) diff --git a/go.sum b/go.sum old mode 100644 new mode 100755 index 4312369..2f1bf22 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/capnproto/go-capnproto2 v2.17.0+incompatible h1:vPbYlc2CBNdjzOMzHfwo7TbFNRBDaRKitlWiRs1riTw= +github.com/capnproto/go-capnproto2 v2.17.0+incompatible/go.mod h1:T3/pxeK0qevFRlAASYZe90Ozs+JmlQTNY+VLc6+lJHw= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -36,6 +38,9 @@ go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3 h1:czFLhve3vsQetD6JOJ8NZZvGQIXlnN3/yXxbT6/awxI= golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 h1:DH4skfRX4EBpamg7iV4ZlCpblAHI6s6TDM39bFZumv8= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +zombiezen.com/go/capnproto2 v2.17.0+incompatible h1:sIoKPFGNlM38Qh+PBLa9Wzg1j99oInS/Qlk+5N/CHa4= +zombiezen.com/go/capnproto2 v2.17.0+incompatible/go.mod h1:XO5Pr2SbXgqZwn0m0Ru54QBqpOf4K5AYBO+8LAOBQEQ= diff --git a/pkg/dataplane/http/context.go b/pkg/dataplane/http/context.go old mode 100644 new mode 100755 index 8affbea..99bd3d5 --- a/pkg/dataplane/http/context.go +++ b/pkg/dataplane/http/context.go @@ -16,13 +16,15 @@ import ( "strings" "sync/atomic" "time" + "io" "github.com/v3io/v3io-go/pkg/dataplane" "github.com/v3io/v3io-go/pkg/errors" - + "github.com/v3io/v3io-go/pkg/dataplane/schemas/node/common" "github.com/nuclio/errors" "github.com/nuclio/logger" "github.com/valyala/fasthttp" + "zombiezen.com/go/capnproto2" ) // TODO: Request should have a global pool @@ -206,6 +208,11 @@ func (c *context) GetItem(getItemInput *v3io.GetItemInput, return c.sendRequestToWorker(getItemInput, context, responseChan) } +type attributeValuesSection struct { + accumulatedPreviousSectionsLength int + data node_common_capnp.VnObjectAttributeValuePtr_List +} + // GetItemSync func (c *context) GetItemSync(getItemInput *v3io.GetItemInput) (*v3io.Response, error) { @@ -256,6 +263,7 @@ func (c *context) GetItems(getItemsInput *v3io.GetItemsInput, return c.sendRequestToWorker(getItemsInput, context, responseChan) } + // GetItemSync func (c *context) GetItemsSync(getItemsInput *v3io.GetItemsInput) (*v3io.Response, error) { @@ -308,7 +316,7 @@ func (c *context) GetItemsSync(getItemsInput *v3io.GetItemsInput) (*v3io.Respons "PUT", getItemsInput.Path, "", - getItemsHeaders, + getItemsHeadersCapnp, marshalledBody, false) @@ -316,47 +324,15 @@ func (c *context) GetItemsSync(getItemsInput *v3io.GetItemsInput) (*v3io.Respons return nil, err } - c.logger.DebugWithCtx(getItemsInput.Ctx, "Body", "body", string(response.Body())) - - getItemsResponse := struct { - Items []map[string]map[string]interface{} - NextMarker string - LastItemIncluded string - }{} - - // unmarshal the body into an ad hoc structure - err = json.Unmarshal(response.Body(), &getItemsResponse) - if err != nil { - return nil, err - } - - //validate getItems response to avoid infinite loop - if getItemsResponse.LastItemIncluded != "TRUE" && (getItemsResponse.NextMarker == "" || getItemsResponse.NextMarker == getItemsInput.Marker) { - errMsg := fmt.Sprintf("Invalid getItems response: lastItemIncluded=false and nextMarker='%s', "+ - "startMarker='%s', probably due to object size bigger than 2M. Query is: %+v", getItemsResponse.NextMarker, getItemsInput.Marker, getItemsInput) - c.logger.Warn(errMsg) - } - - getItemsOutput := v3io.GetItemsOutput{ - NextMarker: getItemsResponse.NextMarker, - Last: getItemsResponse.LastItemIncluded == "TRUE", - } - - // iterate through the items and decode them - for _, typedItem := range getItemsResponse.Items { - - item, err := c.decodeTypedAttributes(typedItem) - if err != nil { - return nil, err - } + contentType := string(response.HeaderPeek("Content-Type")) - getItemsOutput.Items = append(getItemsOutput.Items, item) + if contentType != "application/octet-capnp" { + c.logger.DebugWithCtx(getItemsInput.Ctx, "Body", "body", string(response.Body())) + response.Output, err = c.getItemsParseJSONResponse(response, getItemsInput) + } else { + response.Output, err = c.getItemsParseCAPNPResponse(response) } - - // attach the output to the response - response.Output = &getItemsOutput - - return response, nil + return response, err } // PutItem @@ -1141,3 +1117,205 @@ func (c *context) workerEntry(workerIndex int) { request.ResponseChan <- &request.RequestResponse.Response } } + +func readAllCapnpMessages(reader io.Reader) ([]*capnp.Message){ + var capnpMessages []*capnp.Message + for { + msg, err := capnp.NewDecoder(reader).Decode() + if err != nil { + break + } + capnpMessages = append(capnpMessages, msg) + } + return capnpMessages +} + +func getSectionAndIndex(values []attributeValuesSection, idx int) (section int, resIdx int){ + if len(values) == 1 { + return 0, idx + } + for i := 1; i idx { + return i,idx - values[i-1].accumulatedPreviousSectionsLength + } + } + return 0, idx +} + +func decodeCapnpAttributes(keyValues node_common_capnp.VnObjectItemsGetMappedKeyValuePair_List, values []attributeValuesSection, attributeNames []string) (map[string]interface{}, error) { + attributes := map[string]interface{}{} + for j := 0; j < keyValues.Len(); j++ { + attrPtr := keyValues.At(j) + valIdx := int(attrPtr.ValueMapIndex()) + attrIdx := int(attrPtr.KeyMapIndex()) + + attributeName := attributeNames[attrIdx] + sectIdx, valIdx := getSectionAndIndex(values, valIdx) + value, err := values[sectIdx].data.At(valIdx).Value() + if err != nil { + return attributes, errors.Wrapf(err,"values[%d].data.At(%d).Value",sectIdx, valIdx ) + } + switch value.Which() { + case node_common_capnp.ExtAttrValue_Which_qword: + attributes[attributeName] = int(value.Qword()) + case node_common_capnp.ExtAttrValue_Which_uqword: + attributes[attributeName] = int(value.Uqword()) + case node_common_capnp.ExtAttrValue_Which_blob: + attributes[attributeName], err = value.Blob() + case node_common_capnp.ExtAttrValue_Which_str: + attributes[attributeName], err = value.Str() + case node_common_capnp.ExtAttrValue_Which_dfloat: + attributes[attributeName] = value.Dfloat() + case node_common_capnp.ExtAttrValue_Which_boolean: + attributes[attributeName] = value.Boolean() + case node_common_capnp.ExtAttrValue_Which_notExists: + {} + default: + return attributes, errors.Errorf("getItemsCapnp: %s type for %s attribute is not expected", value.Which().String(), attributeName) + } + } + return attributes, nil +} + +func (c *context) getItemsParseJSONResponse(response *v3io.Response, getItemsInput *v3io.GetItemsInput) (*v3io.GetItemsOutput, error){ + + getItemsResponse := struct { + Items []map[string]map[string]interface{} + NextMarker string + LastItemIncluded string + }{} + + // unmarshal the body into an ad hoc structure + err := json.Unmarshal(response.Body(), &getItemsResponse) + if err != nil { + return nil, err + } + + //validate getItems response to avoid infinite loop + if getItemsResponse.LastItemIncluded != "TRUE" && (getItemsResponse.NextMarker == "" || getItemsResponse.NextMarker == getItemsInput.Marker) { + errMsg := fmt.Sprintf("Invalid getItems response: lastItemIncluded=false and nextMarker='%s', "+ + "startMarker='%s', probably due to object size bigger than 2M. Query is: %+v", getItemsResponse.NextMarker, getItemsInput.Marker, getItemsInput) + c.logger.Warn(errMsg) + } + + getItemsOutput := v3io.GetItemsOutput{ + NextMarker: getItemsResponse.NextMarker, + Last: getItemsResponse.LastItemIncluded == "TRUE", + } + + // iterate through the items and decode them + for _, typedItem := range getItemsResponse.Items { + + item, err := c.decodeTypedAttributes(typedItem) + if err != nil { + return nil, err + } + + getItemsOutput.Items = append(getItemsOutput.Items, item) + } + // attach the output to the response + return &getItemsOutput, nil +} + +func (c *context) getItemsParseCAPNPResponse(response *v3io.Response) (*v3io.GetItemsOutput, error){ + responseBodyReader := bytes.NewReader(response.Body()) + capnpSections := readAllCapnpMessages(responseBodyReader ) + if len(capnpSections) < 2 { + return nil, errors.Errorf("getItemsCapnp: Got only %v capnp sections. Expecting at least 2", len(capnpSections)) + } + cookie := string(response.HeaderPeek("X-v3io-cookie")) + getItemsOutput := v3io.GetItemsOutput{ + NextMarker: cookie, + Last: len(cookie) == 0, + } + if len(capnpSections) < 2 { + return nil, errors.Errorf("getItemsCapnp: Got only %v capnp sections. Expecting at least 2", len(capnpSections)) + } + + metadataPayload, err := node_common_capnp.ReadRootVnObjectItemsGetResponseMetadataPayload(capnpSections[len(capnpSections) - 1]) + if err != nil { + return nil, errors.Wrap(err,"ReadRootVnObjectItemsGetResponseMetadataPayload") + } + //Keys + attributeMap, err := metadataPayload.KeyMap() + if err != nil { + return nil, errors.Wrap(err,"metadataPayload.KeyMap") + } + attributeMapNames,err := attributeMap.Names() + if err != nil { + return nil, errors.Wrap(err,"attributeMap.Names") + } + attributeNamesPtr,err := attributeMapNames.Arr() + if err != nil { + return nil, errors.Wrap(err,"attributeMapNames.Arr") + } + //Values + valueMap, err := metadataPayload.ValueMap() + if err != nil { + return nil, errors.Wrap(err,"metadataPayload.ValueMap") + } + values, err := valueMap.Values() + if err != nil { + return nil, errors.Wrap(err,"valueMap.Values") + } + + // Items + items, err := metadataPayload.Items() + if err != nil { + return nil, errors.Wrap(err,"metadataPayload.Items") + } + valuesSections := make([]attributeValuesSection , len(capnpSections) - 1) + + accLength := 0 + //Additional data sections "in between" + for capnpSectionIndex:= 1;capnpSectionIndex < len(capnpSections) - 1; capnpSectionIndex++ { + data, err := node_common_capnp.ReadRootVnObjectAttributeValueMap(capnpSections[capnpSectionIndex]) + if err != nil { + return nil, errors.Wrap(err,"node_common_capnp.ReadRootVnObjectAttributeValueMap") + } + dv, err := data.Values() + if err != nil { + return nil, errors.Wrap(err,"data.Values") + } + accLength = accLength + dv.Len() + valuesSections[capnpSectionIndex-1].data = dv + valuesSections[capnpSectionIndex-1].accumulatedPreviousSectionsLength = accLength + } + accLength = accLength + values.Len() + valuesSections[len(capnpSections) - 2].data = values + valuesSections[len(capnpSections) - 2].accumulatedPreviousSectionsLength = accLength + + //Read in all the attribute names + attributeNamesNumber := attributeNamesPtr.Len() + attributeNames := make([]string,attributeNamesNumber) + for attributeIndex := 0; attributeIndex < attributeNamesNumber; attributeIndex++ { + attributeNames[attributeIndex], err = attributeNamesPtr.At(attributeIndex).Str() + if err != nil { + return nil, errors.Wrapf(err,"attributeNamesPtr.At(%d) size %d", attributeIndex, attributeNamesNumber) + } + } + + // iterate through the items and decode them + for itemIndex := 0; itemIndex < items.Len(); itemIndex++ { + itemPtr := items.At(itemIndex) + item, err := itemPtr.Item() + if err != nil { + return nil, errors.Wrap(err,"itemPtr.Item") + } + name, err := item.Name() + if err != nil { + return nil, errors.Wrap(err,"item.Name") + } + itemAttributes, err := item.Attrs() + if err != nil { + return nil, errors.Wrap(err,"item.Attrs") + } + ditem, err := decodeCapnpAttributes(itemAttributes, valuesSections, attributeNames) + if err != nil { + return nil, errors.Wrap(err,"decodeCapnpAttributes") + } + ditem["__name"] = name + getItemsOutput.Items = append(getItemsOutput.Items, ditem) + } + return &getItemsOutput, nil +} diff --git a/pkg/dataplane/http/headers.go b/pkg/dataplane/http/headers.go old mode 100644 new mode 100755 index bad2efa..7e5dae7 --- a/pkg/dataplane/http/headers.go +++ b/pkg/dataplane/http/headers.go @@ -36,6 +36,13 @@ var getItemsHeaders = map[string]string{ "X-v3io-function": getItemsFunctionName, } +// headers for update item +var getItemsHeadersCapnp = map[string]string{ + "Content-Type": "application/json", + "X-v3io-response-content-type": "capnp", + "X-v3io-function": getItemsFunctionName, +} + // headers for create stream var createStreamHeaders = map[string]string{ "Content-Type": "application/json", diff --git a/pkg/dataplane/requestresponse.go b/pkg/dataplane/requestresponse.go old mode 100644 new mode 100755 index 955cbec..d367ff8 --- a/pkg/dataplane/requestresponse.go +++ b/pkg/dataplane/requestresponse.go @@ -68,6 +68,11 @@ func (r *Response) Body() []byte { return r.HTTPResponse.Body() } +func (r *Response) HeaderPeek(key string) []byte { + return r.HTTPResponse.Header.Peek(key) +} + + func (r *Response) Request() *Request { return &r.RequestResponse.Request } diff --git a/pkg/dataplane/schemas/build b/pkg/dataplane/schemas/build new file mode 100755 index 0000000..9bcaaf9 --- /dev/null +++ b/pkg/dataplane/schemas/build @@ -0,0 +1,12 @@ +#!/bin/bash +# put the capnpc-go binary in /bin +export PATH=$PATH:${GOPATH}/bin + +capnp compile -I$GOPATH/src/github.com/iguazio/go-capnproto2/std -I$(pwd) -ogo node/common/ExtAttrValue.capnp +capnp compile -I$GOPATH/src/github.com/iguazio/go-capnproto2/std -I$(pwd) -ogo node/common/StringWrapper.capnp +capnp compile -I$GOPATH/src/github.com/iguazio/go-capnproto2/std -I$(pwd) -ogo node/common/TimeSpec.capnp +capnp compile -I$GOPATH/src/github.com/iguazio/go-capnproto2/std -I$(pwd) -ogo node/common/VnObjectAttributeKeyMap.capnp +capnp compile -I$GOPATH/src/github.com/iguazio/go-capnproto2/std -I$(pwd) -ogo node/common/VnObjectAttributeValueMap.capnp +capnp compile -I$GOPATH/src/github.com/iguazio/go-capnproto2/std -I$(pwd) -ogo node/common/VnObjectItemsGetResponse.capnp +capnp compile -I$GOPATH/src/github.com/iguazio/go-capnproto2/std -I$(pwd) -ogo node/common/VnObjectItemsScanCookie.capnp + diff --git a/pkg/dataplane/schemas/clean b/pkg/dataplane/schemas/clean new file mode 100755 index 0000000..71a01ee --- /dev/null +++ b/pkg/dataplane/schemas/clean @@ -0,0 +1,3 @@ +#!/bin/bash + +find . -name *.go -delete diff --git a/pkg/dataplane/schemas/go.capnp b/pkg/dataplane/schemas/go.capnp new file mode 100755 index 0000000..e37889a --- /dev/null +++ b/pkg/dataplane/schemas/go.capnp @@ -0,0 +1,27 @@ +@0xd12a1c51fedd6c88; + +annotation package(file) :Text; +# The Go package name for the generated file. + +annotation import(file) :Text; +# The Go import path that the generated file is accessible from. +# Used to generate import statements and check if two types are in the +# same package. + +annotation doc(struct, field, enum) :Text; +# Adds a doc comment to the generated code. + +# annotation tag(enumerant) :Text; +# Changes the string representation of the enum in the generated code. + +# annotation notag(enumerant) :Void; +# Removes the string representation of the enum in the generated code. + +# annotation customtype(field) :Text; +# OBSOLETE, not used by code generator. + +# annotation name(struct, field, union, enum, enumerant, interface, method, param, annotation, const, group) :Text; +# Used to rename the element in the generated code. + +$package("capnp"); + diff --git a/pkg/dataplane/schemas/java/java.capnp b/pkg/dataplane/schemas/java/java.capnp new file mode 100755 index 0000000..101bdb3 --- /dev/null +++ b/pkg/dataplane/schemas/java/java.capnp @@ -0,0 +1,28 @@ +# Copyright (c) 2013-2015 Sandstorm Development Group, Inc. and contributors +# Licensed under the MIT License: +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +@0xc5f1af96651f70ea; + +annotation package @0x9ee4c8f803b3b596 (file) : Text; +# Name of the package, such as "org.example.foo", in which the generated code will reside. + +annotation outerClassname @0x9b066bb4881f7cd3 (file) : Text; +# Name of the outer class that will wrap the generated code. \ No newline at end of file diff --git a/pkg/dataplane/schemas/node/common/ExtAttrValue.capnp b/pkg/dataplane/schemas/node/common/ExtAttrValue.capnp new file mode 100755 index 0000000..16544d7 --- /dev/null +++ b/pkg/dataplane/schemas/node/common/ExtAttrValue.capnp @@ -0,0 +1,25 @@ +@0x8a6e4e6e3e2db81e; +using Go = import "/go.capnp"; +$Go.package("node_common_capnp"); +$Go.import("github.com/v3io/v3io-go/internal/schemas/node/common"); + +using Java = import "/java/java.capnp"; +$Java.package("io.iguaz.v3io.daemon.client.api.capnp"); +$Java.outerClassname("ExtAttrValueOuter"); + +using import "/node/common/TimeSpec.capnp".TimeSpec; + +struct ExtAttrValue{ + union { + qword @0 : Int64; + uqword @1 : UInt64; + blob @2 : Data; + notExists @3 : Void; + str @4 : Text; + qwordIncrement @5 : Int64; + time @6 : TimeSpec; + dfloat @7 : Float64; + floatIncrement @8 : Float64; + boolean @9 : Bool; + } +} diff --git a/pkg/dataplane/schemas/node/common/StringWrapper.capnp b/pkg/dataplane/schemas/node/common/StringWrapper.capnp new file mode 100755 index 0000000..d738558 --- /dev/null +++ b/pkg/dataplane/schemas/node/common/StringWrapper.capnp @@ -0,0 +1,16 @@ +@0xdf50359faf84cbef; +using Go = import "/go.capnp"; +$Go.package("node_common_capnp"); +$Go.import("github.com/v3io/v3io-go/internal/schemas/node/common"); + +using Java = import "/java/java.capnp"; +$Java.package("io.iguaz.v3io.daemon.client.api.capnp"); +$Java.outerClassname("StringWrapperOuter"); + +struct StringWrapper { + str @0 : Text; +} + +struct StringWrapperList{ + arr @0 : List(StringWrapper); +} diff --git a/pkg/dataplane/schemas/node/common/TimeSpec.capnp b/pkg/dataplane/schemas/node/common/TimeSpec.capnp new file mode 100755 index 0000000..f52d481 --- /dev/null +++ b/pkg/dataplane/schemas/node/common/TimeSpec.capnp @@ -0,0 +1,15 @@ +@0xbcbc7bd29390d6e8; +using Go = import "/go.capnp"; +$Go.package("node_common_capnp"); +$Go.import("github.com/v3io/v3io-go/internal/schemas/node/common"); + +# Imports & Namespace settings +using Java = import "/java/java.capnp"; +$Java.package("io.iguaz.v3io.daemon.client.api.capnp"); +$Java.outerClassname("V3ioTimeSpec"); + +struct TimeSpec { + tvSec @0 : Int64; + tvNsec @1 : Int64; +} + diff --git a/pkg/dataplane/schemas/node/common/VnObjectAttributeKeyMap.capnp b/pkg/dataplane/schemas/node/common/VnObjectAttributeKeyMap.capnp new file mode 100755 index 0000000..158ab82 --- /dev/null +++ b/pkg/dataplane/schemas/node/common/VnObjectAttributeKeyMap.capnp @@ -0,0 +1,13 @@ +@0x986bf57944c8b89f; +using Go = import "/go.capnp"; +$Go.package("node_common_capnp"); +$Go.import("github.com/v3io/v3io-go/internal/schemas/node/common"); + +using Java = import "/java/java.capnp"; +using import "/node/common/StringWrapper.capnp".StringWrapperList; +$Java.package("io.iguaz.v3io.daemon.client.api.capnp"); +$Java.outerClassname("VnObjectAttributeKeyMapOuter"); + +struct VnObjectAttributeKeyMap { + names @0 : StringWrapperList; +} diff --git a/pkg/dataplane/schemas/node/common/VnObjectAttributeValueMap.capnp b/pkg/dataplane/schemas/node/common/VnObjectAttributeValueMap.capnp new file mode 100755 index 0000000..6741a9e --- /dev/null +++ b/pkg/dataplane/schemas/node/common/VnObjectAttributeValueMap.capnp @@ -0,0 +1,17 @@ +@0x90687959836864ab; +using Go = import "/go.capnp"; +$Go.package("node_common_capnp"); +$Go.import("github.com/v3io/v3io-go/internal/schemas/node/common"); + +using Java = import "/java/java.capnp"; +using import "/node/common/ExtAttrValue.capnp".ExtAttrValue; +$Java.package("io.iguaz.v3io.daemon.client.api.capnp"); +$Java.outerClassname("VnObjectAttributeValueMapOuter"); + +struct VnObjectAttributeValuePtr { + value @0 : ExtAttrValue; +} + +struct VnObjectAttributeValueMap { + values @0 : List(VnObjectAttributeValuePtr); +} \ No newline at end of file diff --git a/pkg/dataplane/schemas/node/common/VnObjectItemsGetResponse.capnp b/pkg/dataplane/schemas/node/common/VnObjectItemsGetResponse.capnp new file mode 100755 index 0000000..d3e7e75 --- /dev/null +++ b/pkg/dataplane/schemas/node/common/VnObjectItemsGetResponse.capnp @@ -0,0 +1,54 @@ +@0xdfe00955984fcb17; +using Go = import "/go.capnp"; +$Go.package("node_common_capnp"); +$Go.import("github.com/v3io/v3io-go/internal/schemas/node/common"); + + +# Imports & Namespace settings +using Java = import "/java/java.capnp"; +$Java.package("io.iguaz.v3io.daemon.client.api.capnp"); +$Java.outerClassname("VnObjectItemsGetResponse"); + +using import "/node/common/VnObjectItemsScanCookie.capnp".VnObjectItemsScanCookie; +using import "/node/common/VnObjectAttributeKeyMap.capnp".VnObjectAttributeKeyMap; +using import "/node/common/VnObjectAttributeValueMap.capnp".VnObjectAttributeValueMap; + +struct VnObjectItemsGetResponseHeader{ + marker @0 : Text; + scanState @1 : VnObjectItemsScanCookie; + hasMore @2 : Bool; + numItems @3 : UInt64; + numKeys @4 : UInt64; + numValues @5 : UInt64; +} + +struct VnObjectItemsGetMappedKeyValuePair { + keyMapIndex @0 :UInt64; + valueMapIndex @1 :UInt64; +} + +struct VnObjectItemsGetItem{ + name @0 :Text; + attrs @1 :List(VnObjectItemsGetMappedKeyValuePair); +} + +# Wrapper so that we can create orphan VnObjectItemsGetItem objects and then fill out a list of pointers +# to them. See https://capnproto.org/faq.html under "How do I resize a list?" (28/08/2016): +# "Keep in mind that you can use orphans to allocate sub-objects before you have a place to put them. But, also +# note that you cannot allocate elements of a struct list as orphans and then put them together as a list later, +# because struct lists are encoded as a flat array of struct values, not an array of pointers to struct values. +# You can, however, allocate any inner objects embedded within those structs as orphans." + +struct VnObjectItemsGetItemPtr{ + item @0: VnObjectItemsGetItem; +} + +struct VnObjectItemsGetResponseDataPayload{ + valueMap @0 :VnObjectAttributeValueMap; +} + +struct VnObjectItemsGetResponseMetadataPayload{ + valueMap @0 :VnObjectAttributeValueMap; + keyMap @1 :VnObjectAttributeKeyMap; + items @2 :List(VnObjectItemsGetItemPtr); +} \ No newline at end of file diff --git a/pkg/dataplane/schemas/node/common/VnObjectItemsScanCookie.capnp b/pkg/dataplane/schemas/node/common/VnObjectItemsScanCookie.capnp new file mode 100755 index 0000000..8731248 --- /dev/null +++ b/pkg/dataplane/schemas/node/common/VnObjectItemsScanCookie.capnp @@ -0,0 +1,16 @@ +@0xb56ec2d13b48b7cb; +using Go = import "/go.capnp"; +$Go.package("node_common_capnp"); +$Go.import("github.com/v3io/v3io-go/internal/schemas/node/common"); + +# Imports & Namespace settings +using Java = import "/java/java.capnp"; +$Java.package("io.iguaz.v3io.daemon.client.api.capnp"); +$Java.outerClassname("VnObjectItemsScanCookieOuter"); + +struct VnObjectItemsScanCookie { + sliceId @0 :UInt16; + inodeNumber @1 :UInt32; + clientSliceListPos @2 :UInt64; + clientSliceListEndPos @3 :UInt64; +}