diff --git a/cmd/smoothmq/server/server.go b/cmd/smoothmq/server/server.go index 926c595..660e0ff 100644 --- a/cmd/smoothmq/server/server.go +++ b/cmd/smoothmq/server/server.go @@ -15,7 +15,7 @@ import ( "github.com/poundifdef/smoothmq/dashboard" "github.com/poundifdef/smoothmq/models" "github.com/poundifdef/smoothmq/protocols/sqs" - "github.com/poundifdef/smoothmq/queue/sqlite" + "github.com/poundifdef/smoothmq/queue/database" "github.com/poundifdef/smoothmq/tenants/defaultmanager" "github.com/prometheus/client_golang/prometheus/promhttp" ) @@ -55,7 +55,7 @@ func Run(tm models.TenantManager, queue models.Queue, cfg config.ServerCommand) // Initialize default queue implementation if queue == nil { - queue = sqlite.NewSQLiteQueue(cfg.SQLite) + queue = database.NewQueue(cfg) } dashboardServer := dashboard.NewDashboard(queue, tm, cfg.Dashboard) diff --git a/config.yaml b/config.yaml index 07d1588..1bb6c4e 100644 --- a/config.yaml +++ b/config.yaml @@ -11,5 +11,15 @@ server: enabled: true port: 3000 - sqlite: - path: smoothmq.sqlite \ No newline at end of file +# One can use either the sqlite option or the db option to +# configure the database +# sqlite: +# path: smoothmq.sqlite + +# db: +# log-queries: false +# dsn: "host=db user=xxx password=xxx sslmode=disable TimeZone=America/Los_Angeles" +# driver: postgres +# If you use this option and wish to use sqlite please add the following options to the filename +# dsn: "smoothmq.sqlite?_journal_mode=WAL&_foreign_keys=off&_auto_vacuum=full" +# driver: sqlite \ No newline at end of file diff --git a/config/config.go b/config/config.go index 0238d72..480db4f 100644 --- a/config/config.go +++ b/config/config.go @@ -31,6 +31,7 @@ type ServerCommand struct { SQS SQSConfig `embed:"" prefix:"sqs-" envprefix:"Q_SQS_"` Dashboard DashboardConfig `embed:"" prefix:"dashboard-" envprefix:"Q_DASHBOARD_"` SQLite SQLiteConfig `embed:"" prefix:"sqlite-" envprefix:"Q_SQLITE_"` + DB DBConfig `embed:"" prefix:"db-" envprefix:"Q_DB_"` Metrics MetricsConfig `embed:"" prefix:"metrics-" name:"metrics" envprefix:"Q_METRICS_"` DisableTelemetry bool `name:"disable-telemetry" default:"false" env:"DISABLE_TELEMETRY"` @@ -53,6 +54,12 @@ type SQLiteConfig struct { Path string `name:"path" help:"Path of SQLite file" default:"smoothmq.sqlite" env:"PATH"` } +type DBConfig struct { + DSN string `name:"dsn" help:"GORM connection DSN" default:"" env:"DSN"` + Driver string `name:"driver" help:"DB driver to use" default:"sqlite" env:"DSN"` + LogQueries bool `name:"log-queries" default:"false" help:"Log queries" env:"LOG_QUERIES"` +} + type SQSConfig struct { Enabled bool `name:"enabled" default:"true" help:"Enable SQS protocol for queue" env:"ENABLED"` Port int `name:"port" default:"3001" help:"HTTP port for SQS protocol" env:"PORT"` diff --git a/go.mod b/go.mod index a9474af..b0f9497 100644 --- a/go.mod +++ b/go.mod @@ -39,6 +39,10 @@ require ( github.com/gofiber/template v1.8.3 // indirect github.com/gofiber/utils v1.1.0 // indirect github.com/google/uuid v1.5.0 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect + github.com/jackc/pgx/v5 v5.5.5 // indirect + github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect github.com/klauspost/compress v1.17.9 // indirect @@ -55,8 +59,11 @@ require ( github.com/tidwall/pretty v1.2.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/tcplisten v1.0.0 // indirect + golang.org/x/crypto v0.17.0 // indirect + golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.22.0 // indirect golang.org/x/text v0.16.0 // indirect google.golang.org/protobuf v1.33.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + gorm.io/driver/postgres v1.5.10 // indirect ) diff --git a/go.sum b/go.sum index 2a4f8ef..3b9cb4a 100644 --- a/go.sum +++ b/go.sum @@ -44,6 +44,7 @@ github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= @@ -63,6 +64,14 @@ github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw= +github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A= +github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= @@ -101,6 +110,9 @@ github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncj github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8= github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tidwall/gjson v1.17.1 h1:wlYEnwqAHgzmhNUFfw7Xalt2JzQvsMx2Se4PcoFCT/U= @@ -115,6 +127,10 @@ github.com/valyala/fasthttp v1.51.0 h1:8b30A5JlZ6C7AS81RsWjYMQmrZG6feChmgAolCl1S github.com/valyala/fasthttp v1.51.0/go.mod h1:oI2XroL+lI7vdXyYoQk03bXBThfFl2cVdIA3Xl7cH8g= github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8= github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= +golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= +golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -127,8 +143,11 @@ google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHh gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/driver/postgres v1.5.10 h1:7Lggqempgy496c0WfHXsYWxk3Th+ZcW66/21QhVFdeE= +gorm.io/driver/postgres v1.5.10/go.mod h1:DX3GReXH+3FPWGrrgffdvCk3DQ1dwDPdmbenSkweRGI= gorm.io/driver/sqlite v1.5.6 h1:fO/X46qn5NUEEOZtnjJRWRzZMe8nqJiQ9E+0hi+hKQE= gorm.io/driver/sqlite v1.5.6/go.mod h1:U+J8craQU6Fzkcvu8oLeAQmi50TkwPEhHDEjQZXDah4= gorm.io/gorm v1.25.11 h1:/Wfyg1B/je1hnDx3sMkX+gAlxrlZpn6X0BXRlwXlvHg= diff --git a/queue/sqlite/sqlite.go b/queue/database/database.go similarity index 77% rename from queue/sqlite/sqlite.go rename to queue/database/database.go index 5964b2b..b02882b 100644 --- a/queue/sqlite/sqlite.go +++ b/queue/database/database.go @@ -1,28 +1,22 @@ -package sqlite +package database import ( "errors" - "os" + "fmt" "sync" "time" + "github.com/bwmarrin/snowflake" "github.com/poundifdef/smoothmq/config" "github.com/poundifdef/smoothmq/models" - "github.com/rs/zerolog/log" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - - "github.com/bwmarrin/snowflake" - _ "github.com/mattn/go-sqlite3" - - "gorm.io/driver/sqlite" + "gorm.io/driver/postgres" "gorm.io/gorm" "gorm.io/gorm/clause" + "gorm.io/gorm/logger" ) -type SQLiteQueue struct { +type DBQueue struct { Filename string DBG *gorm.DB Mu *sync.Mutex @@ -30,13 +24,6 @@ type SQLiteQueue struct { ticker *time.Ticker } -var queueDiskSize = promauto.NewGauge( - prometheus.GaugeOpts{ - Name: "queue_disk_size", - Help: "Size of queue data on disk", - }, -) - type Queue struct { ID int64 `gorm:"primaryKey;autoIncrement:false"` TenantID int64 `gorm:"not null;index:idx_queue_name,priority:1,unique"` @@ -60,7 +47,7 @@ type Message struct { Message string `gorm:"not null"` - KV []KV `gorm:"foreignKey:TenantID,QueueID,MessageID;references:TenantID,QueueID,ID"` + KV []KV `gorm:"foreignKey:MessageID;references:ID"` } func (message *Message) ToModel() *models.Message { @@ -100,61 +87,77 @@ type RateLimit struct { N int `gorm:"not null;default:0"` } -func NewSQLiteQueue(cfg config.SQLiteConfig) *SQLiteQueue { +func NewQueue(cfg config.ServerCommand) *DBQueue { snow, err := snowflake.NewNode(1) if err != nil { log.Fatal().Err(err).Send() } - db, err := gorm.Open(sqlite.Open(cfg.Path+"?_journal_mode=WAL&_foreign_keys=off&_auto_vacuum=full"), &gorm.Config{TranslateError: true}) - if err != nil { - log.Fatal().Err(err).Send() + gcfg := &gorm.Config{TranslateError: true} + + if cfg.DB.LogQueries { + gcfg.Logger = logger.Default.LogMode(logger.Info) + } + + dbq := &DBQueue{ + Mu: &sync.Mutex{}, + snow: snow, + } + + if cfg.DB.DSN == "" { + NewSQLiteQueue(cfg, dbq, gcfg) + } else { + switch cfg.DB.Driver { + case "postgres": + NewDBQueue(cfg.DB, dbq, gcfg) + case "sqlite": + NewSQLiteQueue(cfg, dbq, gcfg) + } + } + + return dbq +} + +func NewDBQueue(cfg config.DBConfig, dbq *DBQueue, gcfg *gorm.Config) { + var err error + + switch cfg.Driver { + case "postgres": + dbq.DBG, err = gorm.Open(postgres.Open(cfg.DSN), gcfg) + default: + err = errors.New(fmt.Sprintf("Unknown database driver %s", cfg.Driver)) } - err = db.AutoMigrate(&Queue{}) if err != nil { log.Fatal().Err(err).Send() } - err = db.AutoMigrate(&Message{}) + dbq.Migrate() +} + +func (q *DBQueue) Migrate() { + err := q.DBG.AutoMigrate(&Queue{}) if err != nil { log.Fatal().Err(err).Send() } - err = db.AutoMigrate(&KV{}) + err = q.DBG.AutoMigrate(&Message{}) if err != nil { log.Fatal().Err(err).Send() } - err = db.AutoMigrate(&RateLimit{}) + err = q.DBG.AutoMigrate(&KV{}) if err != nil { log.Fatal().Err(err).Send() } - rc := &SQLiteQueue{ - Filename: cfg.Path, - DBG: db, - Mu: &sync.Mutex{}, - snow: snow, - ticker: time.NewTicker(1 * time.Second), + err = q.DBG.AutoMigrate(&RateLimit{}) + if err != nil { + log.Fatal().Err(err).Send() } - - go func() { - for { - select { - case <-rc.ticker.C: - stat, err := os.Stat(rc.Filename) - if err == nil { - queueDiskSize.Set(float64(stat.Size())) - } - } - } - }() - - return rc } -func (q *SQLiteQueue) CreateQueue(tenantId int64, properties models.QueueProperties) error { +func (q *DBQueue) CreateQueue(tenantId int64, properties models.QueueProperties) error { q.Mu.Lock() defer q.Mu.Unlock() @@ -178,7 +181,7 @@ func (q *SQLiteQueue) CreateQueue(tenantId int64, properties models.QueuePropert return res.Error } -func (q *SQLiteQueue) UpdateQueue(tenantId int64, queueName string, properties models.QueueProperties) error { +func (q *DBQueue) UpdateQueue(tenantId int64, queueName string, properties models.QueueProperties) error { q.Mu.Lock() defer q.Mu.Unlock() @@ -200,7 +203,7 @@ func (q *SQLiteQueue) UpdateQueue(tenantId int64, queueName string, properties m return res.Error } -func (q *SQLiteQueue) GetQueue(tenantId int64, queueName string) (models.QueueProperties, error) { +func (q *DBQueue) GetQueue(tenantId int64, queueName string) (models.QueueProperties, error) { queue := models.QueueProperties{} properties, err := q.getQueue(tenantId, queueName) @@ -216,7 +219,7 @@ func (q *SQLiteQueue) GetQueue(tenantId int64, queueName string) (models.QueuePr return queue, nil } -func (q *SQLiteQueue) DeleteQueue(tenantId int64, queueName string) error { +func (q *DBQueue) DeleteQueue(tenantId int64, queueName string) error { // Delete all messages with the queue, and then the queue itself queue, err := q.getQueue(tenantId, queueName) @@ -246,7 +249,7 @@ func (q *SQLiteQueue) DeleteQueue(tenantId int64, queueName string) error { return rc } -func (q *SQLiteQueue) ListQueues(tenantId int64) ([]string, error) { +func (q *DBQueue) ListQueues(tenantId int64) ([]string, error) { var queues []Queue res := q.DBG.Where("tenant_id = ?", tenantId).Select("name").Find(&queues) if res.Error != nil { @@ -261,7 +264,7 @@ func (q *SQLiteQueue) ListQueues(tenantId int64) ([]string, error) { return rc, nil } -func (q *SQLiteQueue) getQueue(tenantId int64, queueName string) (*Queue, error) { +func (q *DBQueue) getQueue(tenantId int64, queueName string) (*Queue, error) { rc := &Queue{} res := q.DBG.Where("tenant_id = ? AND name = ?", tenantId, queueName).First(rc) if res.RowsAffected != 1 { @@ -270,7 +273,7 @@ func (q *SQLiteQueue) getQueue(tenantId int64, queueName string) (*Queue, error) return rc, res.Error } -func (q *SQLiteQueue) Enqueue(tenantId int64, queueName string, message string, kv map[string]string, delay int) (int64, error) { +func (q *DBQueue) Enqueue(tenantId int64, queueName string, message string, kv map[string]string, delay int) (int64, error) { messageSnow := q.snow.Generate() messageId := messageSnow.Int64() @@ -318,7 +321,7 @@ func (q *SQLiteQueue) Enqueue(tenantId int64, queueName string, message string, } // Calculate how many messages to allow the user to dequeue based on queue's rate limit -func (q *SQLiteQueue) calculateRateLimit(queue *Queue, now int64, numToDequeue int) (int, error) { +func (q *DBQueue) calculateRateLimit(queue *Queue, now int64, numToDequeue int) (int, error) { maxToDequeue := numToDequeue bucketToCheck := now @@ -387,7 +390,7 @@ func (q *SQLiteQueue) calculateRateLimit(queue *Queue, now int64, numToDequeue i return maxToDequeue, nil } -func (q *SQLiteQueue) Dequeue(tenantId int64, queueName string, numToDequeue int, requeueIn int) ([]*models.Message, error) { +func (q *DBQueue) Dequeue(tenantId int64, queueName string, numToDequeue int, requeueIn int) ([]*models.Message, error) { queue, err := q.getQueue(tenantId, queueName) if err != nil { return nil, err @@ -415,7 +418,10 @@ func (q *SQLiteQueue) Dequeue(tenantId int64, queueName string, numToDequeue int var messages []Message - res := q.DBG.Preload("KV").Where( + tx := q.DBG.Begin() + defer tx.Rollback() + + res := tx.Clauses(clause.Locking{Strength: "UPDATE", Options: "SKIP LOCKED"}).Preload("KV").Where( "deliver_at <= ? AND delivered_at <= ? AND (tries < max_tries OR max_tries = -1) AND tenant_id = ? AND queue_id = ?", now, now, tenantId, queue.ID). Limit(maxToDequeue). @@ -440,49 +446,43 @@ func (q *SQLiteQueue) Dequeue(tenantId int64, queueName string, numToDequeue int messageIDs[i] = message.ID } - err = q.DBG.Transaction(func(tx *gorm.DB) error { - res = tx.Model(&Message{}).Where("tenant_id = ? AND queue_id = ? AND id in ?", tenantId, queue.ID, messageIDs). - UpdateColumns(map[string]any{ - "tries": gorm.Expr("tries+1"), - "delivered_at": now, - "deliver_at": gorm.Expr("?", now+int64(visibilityTimeout)), - }) + res = tx.Model(&Message{}).Where("tenant_id = ? AND queue_id = ? AND id in ?", tenantId, queue.ID, messageIDs). + UpdateColumns(map[string]any{ + "tries": gorm.Expr("tries+1"), + "delivered_at": now, + "deliver_at": gorm.Expr("?", now+int64(visibilityTimeout)), + }) - if res.Error != nil { - return res.Error - } - - if queue.RateLimit > 0 { - bucket := RateLimit{ - TenantID: tenantId, - QueueID: queue.ID, - Ts: now, - N: len(messageIDs), - } - res = tx.Clauses(clause.OnConflict{ - DoUpdates: clause.Assignments(map[string]interface{}{"n": gorm.Expr("n + ?", len(messageIDs))}), - }).Create(&bucket) + if res.Error != nil { + return nil, res.Error + } - if res.Error != nil && !errors.Is(res.Error, gorm.ErrDuplicatedKey) { - return res.Error - } + if queue.RateLimit > 0 { + bucket := RateLimit{ + TenantID: tenantId, + QueueID: queue.ID, + Ts: now, + N: len(messageIDs), } + res = tx.Clauses(clause.OnConflict{ + DoUpdates: clause.Assignments(map[string]interface{}{"n": gorm.Expr("n + ?", len(messageIDs))}), + }).Create(&bucket) - return nil - }) - - if err != nil { - return nil, err + if res.Error != nil && !errors.Is(res.Error, gorm.ErrDuplicatedKey) { + return nil, res.Error + } } for _, messageId := range messageIDs { log.Debug().Int64("message_id", messageId).Msg("Dequeued message") } + tx.Commit() + return rc, nil } -func (q *SQLiteQueue) Peek(tenantId int64, queueName string, messageId int64) *models.Message { +func (q *DBQueue) Peek(tenantId int64, queueName string, messageId int64) *models.Message { queue, err := q.getQueue(tenantId, queueName) if err != nil { return nil @@ -502,7 +502,7 @@ func (q *SQLiteQueue) Peek(tenantId int64, queueName string, messageId int64) *m return message.ToModel() } -func (q *SQLiteQueue) Stats(tenantId int64, queueName string) models.QueueStats { +func (q *DBQueue) Stats(tenantId int64, queueName string) models.QueueStats { queue, err := q.getQueue(tenantId, queueName) if err != nil { return models.QueueStats{} @@ -551,7 +551,7 @@ func (q *SQLiteQueue) Stats(tenantId int64, queueName string) models.QueueStats return stats } -func (q *SQLiteQueue) Filter(tenantId int64, queueName string, filterCriteria models.FilterCriteria) []int64 { +func (q *DBQueue) Filter(tenantId int64, queueName string, filterCriteria models.FilterCriteria) []int64 { var rc []int64 queue, err := q.getQueue(tenantId, queueName) @@ -585,14 +585,13 @@ func (q *SQLiteQueue) Filter(tenantId int64, queueName string, filterCriteria mo sql += " ) GROUP BY message_id HAVING count(*) = ? LIMIT 10" sql += " ) " - } + for k, v := range filterCriteria.KV { + args = append(args, k, v, tenantId, queue.ID) + } - for k, v := range filterCriteria.KV { - args = append(args, k, v, tenantId, queue.ID) + args = append(args, len(filterCriteria.KV)) } - args = append(args, len(filterCriteria.KV)) - sql += "LIMIT 10" res := q.DBG.Raw(sql, args...).Scan(&rc) @@ -603,7 +602,7 @@ func (q *SQLiteQueue) Filter(tenantId int64, queueName string, filterCriteria mo return rc } -func (q *SQLiteQueue) Delete(tenantId int64, queueName string, messageId int64) error { +func (q *DBQueue) Delete(tenantId int64, queueName string, messageId int64) error { queue, err := q.getQueue(tenantId, queueName) if err != nil { return err @@ -631,7 +630,7 @@ func (q *SQLiteQueue) Delete(tenantId int64, queueName string, messageId int64) return err } -func (q *SQLiteQueue) Shutdown() error { +func (q *DBQueue) Shutdown() error { db, err := q.DBG.DB() if err != nil { return err diff --git a/queue/database/sqlite.go b/queue/database/sqlite.go new file mode 100644 index 0000000..0c792a1 --- /dev/null +++ b/queue/database/sqlite.go @@ -0,0 +1,67 @@ +package database + +import ( + "os" + "strings" + "time" + + "github.com/poundifdef/smoothmq/config" + + "github.com/rs/zerolog/log" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + _ "github.com/mattn/go-sqlite3" + + "gorm.io/driver/sqlite" + "gorm.io/gorm" +) + +var queueDiskSize = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "queue_disk_size", + Help: "Size of queue data on disk", + }, +) + +func NewSQLiteQueue(cfg config.ServerCommand, dbq *DBQueue, gcfg *gorm.Config) { + var err error + var path string + var args string + + if cfg.DB.DSN == "" { + path = cfg.SQLite.Path + args = "?_journal_mode=WAL&_foreign_keys=off&_auto_vacuum=full" + } else { + pieces := strings.Split(cfg.DB.DSN, "?") + path = pieces[0] + if len(pieces) == 2 { + args = "?" + pieces[1] + } + } + + dbq.Filename = path + + dbq.DBG, err = gorm.Open(sqlite.Open(path+args), gcfg) + + if err != nil { + log.Fatal().Err(err).Send() + } + + dbq.ticker = time.NewTicker(1 * time.Second) + + dbq.Migrate() + + go func() { + for { + select { + case <-dbq.ticker.C: + stat, err := os.Stat(dbq.Filename) + if err == nil { + queueDiskSize.Set(float64(stat.Size())) + } + } + } + }() +}