From 6fdbe3b3821221d4dab102b9fe5e02308f032815 Mon Sep 17 00:00:00 2001 From: gernest Date: Thu, 8 Feb 2024 15:43:40 +0300 Subject: [PATCH] working realtime visitors --- camel/case.go | 17 +++++++++++++++++ db/common.go | 3 ++- filters/filters.go | 13 +++++++++++-- lsm/lsm.go | 3 ++- staples/arrow.go | 20 ++++---------------- staples/index.go | 3 ++- stats/breakdown.go | 2 +- stats/timeseries.go | 2 +- 8 files changed, 40 insertions(+), 23 deletions(-) create mode 100644 camel/case.go diff --git a/camel/case.go b/camel/case.go new file mode 100644 index 000000000..97c60a8ea --- /dev/null +++ b/camel/case.go @@ -0,0 +1,17 @@ +package camel + +import ( + "strings" + "unicode" +) + +func Case(name string) string { + first := true + return strings.Map(func(r rune) rune { + if first { + first = false + return unicode.ToLower(r) + } + return r + }, name) +} diff --git a/db/common.go b/db/common.go index c9233dc52..58c982dc7 100644 --- a/db/common.go +++ b/db/common.go @@ -3,12 +3,13 @@ package db import ( "github.com/apache/arrow/go/v15/arrow" "github.com/apache/arrow/go/v15/arrow/array" + "github.com/vinceanalytics/vince/camel" v1 "github.com/vinceanalytics/vince/gen/go/staples/v1" ) func Timestamps(r arrow.Record) (lo, hi int64) { for i := 0; i < int(r.NumCols()); i++ { - if r.ColumnName(i) == v1.Filters_Timestamp.String() { + if r.ColumnName(i) == camel.Case(v1.Filters_Timestamp.String()) { ts := r.Column(i).(*array.Int64) lo = ts.Value(0) hi = ts.Value(ts.Len() - 1) diff --git a/filters/filters.go b/filters/filters.go index a1f32bb97..87086bb42 100644 --- a/filters/filters.go +++ b/filters/filters.go @@ -2,10 +2,11 @@ package filters import ( "github.com/blevesearch/vellum/regexp" + "github.com/vinceanalytics/vince/camel" v1 "github.com/vinceanalytics/vince/gen/go/staples/v1" ) -var PropToProjection = map[v1.Property]v1.Filters_Projection{ +var propToProjection = map[v1.Property]v1.Filters_Projection{ v1.Property_event: v1.Filters_Event, v1.Property_page: v1.Filters_Path, v1.Property_entry_page: v1.Filters_EntryPage, @@ -28,6 +29,14 @@ var PropToProjection = map[v1.Property]v1.Filters_Projection{ v1.Property_city: v1.Filters_City, } +func Column(p v1.Property) string { + return camel.Case(propToProjection[p].String()) +} + +func Projection(p v1.Property) v1.Filters_Projection { + return propToProjection[p] +} + type CompiledFilter struct { Column string Op v1.Filter_OP @@ -49,7 +58,7 @@ func CompileFilters(f *v1.Filters) ([]*CompiledFilter, error) { func compileFilter(f *v1.Filter) (*CompiledFilter, error) { o := &CompiledFilter{ - Column: PropToProjection[f.Property].String(), + Column: Column(f.Property), Op: f.Op, } o.Value = []byte(f.Value) diff --git a/lsm/lsm.go b/lsm/lsm.go index 7a10af099..a416c3188 100644 --- a/lsm/lsm.go +++ b/lsm/lsm.go @@ -17,6 +17,7 @@ import ( "github.com/apache/arrow/go/v15/arrow/util" "github.com/docker/go-units" "github.com/oklog/ulid/v2" + "github.com/vinceanalytics/vince/camel" "github.com/vinceanalytics/vince/db" "github.com/vinceanalytics/vince/filters" v1 "github.com/vinceanalytics/vince/gen/go/staples/v1" @@ -173,7 +174,7 @@ func (lsm *Tree[T]) Scan( } project := make([]int, 0, len(fs.Projection)) for _, name := range fs.Projection { - col, ok := lsm.mapping[staples.Camel(name.String())] + col, ok := lsm.mapping[camel.Case(name.String())] if !ok { return nil, fmt.Errorf("column %s does not exist", name) } diff --git a/staples/arrow.go b/staples/arrow.go index c71508eae..070c82d26 100644 --- a/staples/arrow.go +++ b/staples/arrow.go @@ -3,12 +3,11 @@ package staples import ( "fmt" "reflect" - "strings" - "unicode" "github.com/apache/arrow/go/v15/arrow" "github.com/apache/arrow/go/v15/arrow/array" "github.com/apache/arrow/go/v15/arrow/memory" + "github.com/vinceanalytics/vince/camel" ) type Arrow[T any] struct { @@ -62,7 +61,7 @@ func build(r reflect.Type) (o []arrow.Field) { } if base, ok := baseTypes[typ.Kind()]; ok { o = append(o, arrow.Field{ - Name: Camel(f.Name), + Name: camel.Case(f.Name), Type: base, Nullable: f.Type.Kind() == reflect.Ptr || typ.Kind() == reflect.String, }) @@ -73,17 +72,6 @@ func build(r reflect.Type) (o []arrow.Field) { return } -func Camel(name string) string { - first := true - return strings.Map(func(r rune) rune { - if first { - first = false - return unicode.ToLower(r) - } - return r - }, name) -} - var baseTypes = map[reflect.Kind]arrow.DataType{ reflect.Bool: arrow.FixedWidthTypes.Boolean, reflect.String: &arrow.DictionaryType{ @@ -229,8 +217,8 @@ func NewTaker(mem memory.Allocator, as *arrow.Schema) (*array.RecordBuilder, fun fields[i] = take(b.Field(i)) } return b, func(v arrow.Record, columns []int, rows []uint32) { - for _, i := range columns { - fields[i](v.Column(i), rows) + for idx, col := range columns { + fields[idx](v.Column(col), rows) } } } diff --git a/staples/index.go b/staples/index.go index 7b5f882f3..63a92b5ba 100644 --- a/staples/index.go +++ b/staples/index.go @@ -7,6 +7,7 @@ import ( "github.com/RoaringBitmap/roaring" "github.com/apache/arrow/go/v15/arrow" "github.com/apache/arrow/go/v15/arrow/array" + "github.com/vinceanalytics/vince/camel" "github.com/vinceanalytics/vince/filters" "github.com/vinceanalytics/vince/index" "github.com/vinceanalytics/vince/logger" @@ -70,7 +71,7 @@ func NewIndex() *Index { if !f.IsExported() { continue } - idx.mapping[Camel(f.Name)] = r.Field(i).Interface().(*index.ColumnImpl) + idx.mapping[camel.Case(f.Name)] = r.Field(i).Interface().(*index.ColumnImpl) } return idx } diff --git a/stats/breakdown.go b/stats/breakdown.go index c68a573c4..b0f493f79 100644 --- a/stats/breakdown.go +++ b/stats/breakdown.go @@ -63,7 +63,7 @@ func BreakDown(w http.ResponseWriter, r *http.Request) { // TODO: run this concurrently for _, prop := range req.Property { var groups []*v1.BreakDown_Response_Group - for key, bitmap := range hashProp(mapping[filters.PropToProjection[prop].String()]) { + for key, bitmap := range hashProp(mapping[filters.Column(prop)]) { b.AppendValues(bitmap.ToArray(), nil) idx := b.NewUint32Array() var values []*v1.Value diff --git a/stats/timeseries.go b/stats/timeseries.go index c0e788a80..90afbb06e 100644 --- a/stats/timeseries.go +++ b/stats/timeseries.go @@ -173,7 +173,7 @@ func metricsToProjection(f *v1.Filters, me []v1.Metric, props ...v1.Property) { m := make(map[v1.Filters_Projection]struct{}) m[v1.Filters_Timestamp] = struct{}{} for _, p := range props { - m[filters.PropToProjection[p]] = struct{}{} + m[filters.Projection(p)] = struct{}{} } for _, v := range me { switch v {