Skip to content

Commit

Permalink
fix time quantum
Browse files Browse the repository at this point in the history
  • Loading branch information
gernest committed Oct 20, 2024
1 parent 3a201a3 commit afcfb41
Showing 1 changed file with 30 additions and 17 deletions.
47 changes: 30 additions & 17 deletions internal/timeseries/timeseries_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/cockroachdb/pebble"
"github.com/vinceanalytics/vince/internal/compute"
"github.com/vinceanalytics/vince/internal/encoding"
"github.com/vinceanalytics/vince/internal/models"
"github.com/vinceanalytics/vince/internal/roaring"
Expand Down Expand Up @@ -64,30 +65,27 @@ func (b *batch) save() error {
ba.Close()
return err
}
fmt.Println("saved ", b.events)
return ba.Commit(pebble.Sync)
}

func (b *batch) flush(ba *pebble.Batch) error {
key := make([]byte, encoding.BitmapKeySize)
sv := func(bm *roaring.Bitmap) error {
v, done, err := ba.Get(key)
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return ba.Set(key, bm.ToBuffer(), nil)
}
return err
}
bm.Or(roaring.FromBuffer(v))
err = ba.Set(key, bm.ToBuffer(), nil)
done.Close()
return err
sv := func(f models.Field, view uint64, bm *roaring.Bitmap) error {
ts := time.UnixMilli(int64(view)).UTC()
value := bm.ToBuffer()
return errors.Join(
ba.Merge(encoding.Bitmap(b.shard, 0, f, key), value, nil),
ba.Merge(encoding.Bitmap(b.shard, view, f, key), value, nil),
ba.Merge(encoding.Bitmap(b.shard, hour(ts), f, key), value, nil),
ba.Merge(encoding.Bitmap(b.shard, day(ts), f, key), value, nil),
ba.Merge(encoding.Bitmap(b.shard, week(ts), f, key), value, nil),
ba.Merge(encoding.Bitmap(b.shard, month(ts), f, key), value, nil),
)
}
for i := range b.mutex {
f := models.Mutex(i)
for view, bm := range b.mutex[i] {
encoding.Bitmap(b.shard, view, f, key)
err := sv(bm)
err := sv(f, view, bm)
if err != nil {
return fmt.Errorf("saving events bitmap %w", err)
}
Expand All @@ -96,8 +94,7 @@ func (b *batch) flush(ba *pebble.Batch) error {
for i := range b.bsi {
f := models.BSI(i)
for view, bm := range b.bsi[i] {
encoding.Bitmap(b.shard, view, f, key)
err := sv(bm)
err := sv(f, view, bm)
if err != nil {
return fmt.Errorf("saving events bitmap %w", err)
}
Expand Down Expand Up @@ -195,3 +192,19 @@ func (b *batch) getMutex(field models.Field) *roaring.Bitmap {
func (b *batch) tr(field models.Field, value []byte) uint64 {
return b.translate.Assign(field, value)
}

func hour(ts time.Time) uint64 {
return uint64(compute.Hour(ts).UnixMilli())
}

func day(ts time.Time) uint64 {
return uint64(compute.Date(ts).UnixMilli())
}

func week(ts time.Time) uint64 {
return uint64(compute.Week(ts).UnixMilli())
}

func month(ts time.Time) uint64 {
return uint64(compute.Month(ts).UnixMilli())
}

0 comments on commit afcfb41

Please sign in to comment.