Skip to content

Commit

Permalink
Initial implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
yunussandikci committed Sep 24, 2023
0 parents commit 2612d2b
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
*
!*/
!*.*
.idea
53 changes: 53 additions & 0 deletions example.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package main

import (
"fmt"
"github.com/yunussandikci/squeuelite-go/squeuelite"
"strconv"
"time"
)

func main() {
squeue, queueErr := squeuelite.New(squeuelite.Config{
Database: "my-db",
Queue: "default",
RequeueDuration: time.Minute,
})
if queueErr != nil {
panic(queueErr)
}

//Subscribe for Queue
go func() {
for item := range squeue.Subscribe(time.Second) {
fmt.Printf("%s\n", string(item.Payload))
if doneErr := squeue.Done(item); doneErr != nil {
return
}
}
}()

//Subscribe for Queue
go func() {
for item := range squeue.Subscribe(time.Second) {
fmt.Printf("%s\n", string(item.Payload))
if doneErr := squeue.Done(item); doneErr != nil {
return
}
}
}()

//Put Items into Queue
for i := 1; i < 100; i++ {
if putErr := squeue.Put(&squeuelite.Message{
Id: strconv.Itoa(i),
Payload: []byte(fmt.Sprintf("my-message-%d", i)),
AvailableAfter: time.Now().Add(2 * time.Second),
Priority: 10,
}); putErr != nil {
panic(putErr)
}
}

time.Sleep(1 * time.Minute)
}
15 changes: 15 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module github.com/yunussandikci/squeuelite-go

go 1.21

require (
github.com/google/uuid v1.3.1
gorm.io/driver/sqlite v1.5.3
gorm.io/gorm v1.25.4
)

require (
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/mattn/go-sqlite3 v1.14.17 // indirect
)
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4=
github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/mattn/go-sqlite3 v1.14.17 h1:mCRHCLDUBXgpKAqIKsaAaAsrAlbkeomtRFKXh2L6YIM=
github.com/mattn/go-sqlite3 v1.14.17/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
gorm.io/driver/sqlite v1.5.3 h1:7/0dUgX28KAcopdfbRWWl68Rflh6osa4rDh+m51KL2g=
gorm.io/driver/sqlite v1.5.3/go.mod h1:qxAuCol+2r6PannQDpOP1FP6ag3mKi4esLnB/jHed+4=
gorm.io/gorm v1.25.4 h1:iyNd8fNAe8W9dvtlgeRI5zSVZPsq3OpcTu37cYcpCmw=
gorm.io/gorm v1.25.4/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k=
42 changes: 42 additions & 0 deletions squeuelite/constructor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package squeuelite

import (
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"gorm.io/gorm/logger"
"strings"
"time"
)

var defaultRequeueDuration = time.Minute
var defaultQueue = "default"

func New(config Config) (*SQueueLite, error) {
db, openErr := gorm.Open(sqlite.Open(config.Database), &gorm.Config{
Logger: logger.Default.LogMode(logger.Silent),
})
if openErr != nil {
return nil, openErr
}

if config.RequeueDuration == 0 {
config.RequeueDuration = defaultRequeueDuration
}

if len(config.Queue) == 0 {
config.Queue = defaultQueue
}

instance := &SQueueLite{
db: db,
requeueDuration: config.RequeueDuration,
queue: config.Queue,
}

migrateErr := instance.getConnection().AutoMigrate(&Message{})
if migrateErr != nil && !strings.Contains(migrateErr.Error(), "already exist") {
return nil, migrateErr
}

return instance, nil
}
61 changes: 61 additions & 0 deletions squeuelite/squeuelite.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package squeuelite

import (
"fmt"
"github.com/google/uuid"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"time"
)

func (s *SQueueLite) getConnection() *gorm.DB {
return s.db.Table(fmt.Sprintf("queue_%s", s.queue))
}

func (s *SQueueLite) Put(message *Message) error {
if message.Id == "" {
message.Id = uuid.NewString()
}

return s.getConnection().Clauses(clause.Insert{Modifier: "OR IGNORE"}).Create(message).Error
}

func (s *SQueueLite) Pop() (*Message, error) {
var messages []Message

getErr := s.getConnection().Model(&messages).
Clauses(clause.Returning{}).
Where("id = (?)",
s.getConnection().Model(Message{}).Where("available_after < ?", time.Now()).
Select("id").
Limit(1)).
Update("available_after", time.Now().Add(s.requeueDuration)).Error

if len(messages) > 0 {
return &messages[0], nil
}

return nil, getErr
}

func (s *SQueueLite) Subscribe(pollingInterval time.Duration) <-chan Message {
subscription := make(chan Message)

go func() {
for {
message, popErr := s.Pop()
if popErr != nil || message == nil {
time.Sleep(pollingInterval)
continue
}

subscription <- *message
}
}()

return subscription
}

func (s *SQueueLite) Done(message Message) error {
return s.getConnection().Delete(&message).Error
}
25 changes: 25 additions & 0 deletions squeuelite/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package squeuelite

import (
"gorm.io/gorm"
"time"
)

type SQueueLite struct {
db *gorm.DB
requeueDuration time.Duration
queue string
}

type Config struct {
Database string
Queue string
RequeueDuration time.Duration
}

type Message struct {
Id string `gorm:"primarykey"`
Payload []byte
AvailableAfter time.Time
Priority uint8
}

0 comments on commit 2612d2b

Please sign in to comment.