From 56fea7fb49c78ecac6cd491c319d51bfec6794c2 Mon Sep 17 00:00:00 2001 From: John W Higgins Date: Mon, 25 Nov 2024 23:40:09 -0800 Subject: [PATCH 1/4] Add GORM dsn support Preserve current SQLite config options while adding a section for DSN based connections. Currently only adding PostgreSQL, however, any GORM driver should be available to be added. config.yaml updated to show examples of both ways to specify database connections. --- cmd/smoothmq/server/server.go | 4 +- config.yaml | 14 +- config/config.go | 7 + go.mod | 7 + go.sum | 19 +++ .../sqlite.go => database/database.go} | 125 +++++++++--------- queue/database/sqlite.go | 67 ++++++++++ 7 files changed, 178 insertions(+), 65 deletions(-) rename queue/{sqlite/sqlite.go => database/database.go} (84%) create mode 100644 queue/database/sqlite.go diff --git a/cmd/smoothmq/server/server.go b/cmd/smoothmq/server/server.go index 926c595..660e0ff 100644 --- a/cmd/smoothmq/server/server.go +++ b/cmd/smoothmq/server/server.go @@ -15,7 +15,7 @@ import ( "github.com/poundifdef/smoothmq/dashboard" "github.com/poundifdef/smoothmq/models" "github.com/poundifdef/smoothmq/protocols/sqs" - "github.com/poundifdef/smoothmq/queue/sqlite" + "github.com/poundifdef/smoothmq/queue/database" "github.com/poundifdef/smoothmq/tenants/defaultmanager" "github.com/prometheus/client_golang/prometheus/promhttp" ) @@ -55,7 +55,7 @@ func Run(tm models.TenantManager, queue models.Queue, cfg config.ServerCommand) // Initialize default queue implementation if queue == nil { - queue = sqlite.NewSQLiteQueue(cfg.SQLite) + queue = database.NewQueue(cfg) } dashboardServer := dashboard.NewDashboard(queue, tm, cfg.Dashboard) diff --git a/config.yaml b/config.yaml index 07d1588..1bb6c4e 100644 --- a/config.yaml +++ b/config.yaml @@ -11,5 +11,15 @@ server: enabled: true port: 3000 - sqlite: - path: smoothmq.sqlite \ No newline at end of file +# One can use either the sqlite option or the db option to +# configure the database +# sqlite: +# path: smoothmq.sqlite + +# db: +# log-queries: false +# dsn: "host=db user=xxx password=xxx sslmode=disable TimeZone=America/Los_Angeles" +# driver: postgres +# If you use this option and wish to use sqlite please add the following options to the filename +# dsn: "smoothmq.sqlite?_journal_mode=WAL&_foreign_keys=off&_auto_vacuum=full" +# driver: sqlite \ No newline at end of file diff --git a/config/config.go b/config/config.go index 0238d72..480db4f 100644 --- a/config/config.go +++ b/config/config.go @@ -31,6 +31,7 @@ type ServerCommand struct { SQS SQSConfig `embed:"" prefix:"sqs-" envprefix:"Q_SQS_"` Dashboard DashboardConfig `embed:"" prefix:"dashboard-" envprefix:"Q_DASHBOARD_"` SQLite SQLiteConfig `embed:"" prefix:"sqlite-" envprefix:"Q_SQLITE_"` + DB DBConfig `embed:"" prefix:"db-" envprefix:"Q_DB_"` Metrics MetricsConfig `embed:"" prefix:"metrics-" name:"metrics" envprefix:"Q_METRICS_"` DisableTelemetry bool `name:"disable-telemetry" default:"false" env:"DISABLE_TELEMETRY"` @@ -53,6 +54,12 @@ type SQLiteConfig struct { Path string `name:"path" help:"Path of SQLite file" default:"smoothmq.sqlite" env:"PATH"` } +type DBConfig struct { + DSN string `name:"dsn" help:"GORM connection DSN" default:"" env:"DSN"` + Driver string `name:"driver" help:"DB driver to use" default:"sqlite" env:"DSN"` + LogQueries bool `name:"log-queries" default:"false" help:"Log queries" env:"LOG_QUERIES"` +} + type SQSConfig struct { Enabled bool `name:"enabled" default:"true" help:"Enable SQS protocol for queue" env:"ENABLED"` Port int `name:"port" default:"3001" help:"HTTP port for SQS protocol" env:"PORT"` diff --git a/go.mod b/go.mod index a9474af..b0f9497 100644 --- a/go.mod +++ b/go.mod @@ -39,6 +39,10 @@ require ( github.com/gofiber/template v1.8.3 // indirect github.com/gofiber/utils v1.1.0 // indirect github.com/google/uuid v1.5.0 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect + github.com/jackc/pgx/v5 v5.5.5 // indirect + github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect github.com/klauspost/compress v1.17.9 // indirect @@ -55,8 +59,11 @@ require ( github.com/tidwall/pretty v1.2.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/tcplisten v1.0.0 // indirect + golang.org/x/crypto v0.17.0 // indirect + golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.22.0 // indirect golang.org/x/text v0.16.0 // indirect google.golang.org/protobuf v1.33.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + gorm.io/driver/postgres v1.5.10 // indirect ) diff --git a/go.sum b/go.sum index 2a4f8ef..3b9cb4a 100644 --- a/go.sum +++ b/go.sum @@ -44,6 +44,7 @@ github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= @@ -63,6 +64,14 @@ github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw= +github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A= +github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= @@ -101,6 +110,9 @@ github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncj github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8= github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tidwall/gjson v1.17.1 h1:wlYEnwqAHgzmhNUFfw7Xalt2JzQvsMx2Se4PcoFCT/U= @@ -115,6 +127,10 @@ github.com/valyala/fasthttp v1.51.0 h1:8b30A5JlZ6C7AS81RsWjYMQmrZG6feChmgAolCl1S github.com/valyala/fasthttp v1.51.0/go.mod h1:oI2XroL+lI7vdXyYoQk03bXBThfFl2cVdIA3Xl7cH8g= github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8= github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= +golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= +golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -127,8 +143,11 @@ google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHh gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/driver/postgres v1.5.10 h1:7Lggqempgy496c0WfHXsYWxk3Th+ZcW66/21QhVFdeE= +gorm.io/driver/postgres v1.5.10/go.mod h1:DX3GReXH+3FPWGrrgffdvCk3DQ1dwDPdmbenSkweRGI= gorm.io/driver/sqlite v1.5.6 h1:fO/X46qn5NUEEOZtnjJRWRzZMe8nqJiQ9E+0hi+hKQE= gorm.io/driver/sqlite v1.5.6/go.mod h1:U+J8craQU6Fzkcvu8oLeAQmi50TkwPEhHDEjQZXDah4= gorm.io/gorm v1.25.11 h1:/Wfyg1B/je1hnDx3sMkX+gAlxrlZpn6X0BXRlwXlvHg= diff --git a/queue/sqlite/sqlite.go b/queue/database/database.go similarity index 84% rename from queue/sqlite/sqlite.go rename to queue/database/database.go index 5964b2b..641b77d 100644 --- a/queue/sqlite/sqlite.go +++ b/queue/database/database.go @@ -1,28 +1,22 @@ -package sqlite +package database import ( "errors" - "os" + "fmt" "sync" "time" + "github.com/bwmarrin/snowflake" "github.com/poundifdef/smoothmq/config" "github.com/poundifdef/smoothmq/models" - "github.com/rs/zerolog/log" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - - "github.com/bwmarrin/snowflake" - _ "github.com/mattn/go-sqlite3" - - "gorm.io/driver/sqlite" + "gorm.io/driver/postgres" "gorm.io/gorm" "gorm.io/gorm/clause" + "gorm.io/gorm/logger" ) -type SQLiteQueue struct { +type DBQueue struct { Filename string DBG *gorm.DB Mu *sync.Mutex @@ -30,13 +24,6 @@ type SQLiteQueue struct { ticker *time.Ticker } -var queueDiskSize = promauto.NewGauge( - prometheus.GaugeOpts{ - Name: "queue_disk_size", - Help: "Size of queue data on disk", - }, -) - type Queue struct { ID int64 `gorm:"primaryKey;autoIncrement:false"` TenantID int64 `gorm:"not null;index:idx_queue_name,priority:1,unique"` @@ -100,61 +87,77 @@ type RateLimit struct { N int `gorm:"not null;default:0"` } -func NewSQLiteQueue(cfg config.SQLiteConfig) *SQLiteQueue { +func NewQueue(cfg config.ServerCommand) *DBQueue { snow, err := snowflake.NewNode(1) if err != nil { log.Fatal().Err(err).Send() } - db, err := gorm.Open(sqlite.Open(cfg.Path+"?_journal_mode=WAL&_foreign_keys=off&_auto_vacuum=full"), &gorm.Config{TranslateError: true}) - if err != nil { - log.Fatal().Err(err).Send() + gcfg := &gorm.Config{TranslateError: true} + + if cfg.DB.LogQueries { + gcfg.Logger = logger.Default.LogMode(logger.Info) + } + + dbq := &DBQueue{ + Mu: &sync.Mutex{}, + snow: snow, + } + + if cfg.DB.DSN == "" { + NewSQLiteQueue(cfg, dbq, gcfg) + } else { + switch cfg.DB.Driver { + case "postgres": + NewDBQueue(cfg.DB, dbq, gcfg) + case "sqlite": + NewSQLiteQueue(cfg, dbq, gcfg) + } + } + + return dbq +} + +func NewDBQueue(cfg config.DBConfig, dbq *DBQueue, gcfg *gorm.Config) { + var err error + + switch cfg.Driver { + case "postgres": + dbq.DBG, err = gorm.Open(postgres.Open(cfg.DSN), gcfg) + default: + err = errors.New(fmt.Sprintf("Unknown database driver %s", cfg.Driver)) } - err = db.AutoMigrate(&Queue{}) if err != nil { log.Fatal().Err(err).Send() } - err = db.AutoMigrate(&Message{}) + dbq.Migrate() +} + +func (q *DBQueue) Migrate() { + err := q.DBG.AutoMigrate(&Queue{}) if err != nil { log.Fatal().Err(err).Send() } - err = db.AutoMigrate(&KV{}) + err = q.DBG.AutoMigrate(&Message{}) if err != nil { log.Fatal().Err(err).Send() } - err = db.AutoMigrate(&RateLimit{}) + err = q.DBG.AutoMigrate(&KV{}) if err != nil { log.Fatal().Err(err).Send() } - rc := &SQLiteQueue{ - Filename: cfg.Path, - DBG: db, - Mu: &sync.Mutex{}, - snow: snow, - ticker: time.NewTicker(1 * time.Second), + err = q.DBG.AutoMigrate(&RateLimit{}) + if err != nil { + log.Fatal().Err(err).Send() } - - go func() { - for { - select { - case <-rc.ticker.C: - stat, err := os.Stat(rc.Filename) - if err == nil { - queueDiskSize.Set(float64(stat.Size())) - } - } - } - }() - - return rc } -func (q *SQLiteQueue) CreateQueue(tenantId int64, properties models.QueueProperties) error { +func (q *DBQueue) CreateQueue(tenantId int64, properties models.QueueProperties) error { q.Mu.Lock() defer q.Mu.Unlock() @@ -178,7 +181,7 @@ func (q *SQLiteQueue) CreateQueue(tenantId int64, properties models.QueuePropert return res.Error } -func (q *SQLiteQueue) UpdateQueue(tenantId int64, queueName string, properties models.QueueProperties) error { +func (q *DBQueue) UpdateQueue(tenantId int64, queueName string, properties models.QueueProperties) error { q.Mu.Lock() defer q.Mu.Unlock() @@ -200,7 +203,7 @@ func (q *SQLiteQueue) UpdateQueue(tenantId int64, queueName string, properties m return res.Error } -func (q *SQLiteQueue) GetQueue(tenantId int64, queueName string) (models.QueueProperties, error) { +func (q *DBQueue) GetQueue(tenantId int64, queueName string) (models.QueueProperties, error) { queue := models.QueueProperties{} properties, err := q.getQueue(tenantId, queueName) @@ -216,7 +219,7 @@ func (q *SQLiteQueue) GetQueue(tenantId int64, queueName string) (models.QueuePr return queue, nil } -func (q *SQLiteQueue) DeleteQueue(tenantId int64, queueName string) error { +func (q *DBQueue) DeleteQueue(tenantId int64, queueName string) error { // Delete all messages with the queue, and then the queue itself queue, err := q.getQueue(tenantId, queueName) @@ -246,7 +249,7 @@ func (q *SQLiteQueue) DeleteQueue(tenantId int64, queueName string) error { return rc } -func (q *SQLiteQueue) ListQueues(tenantId int64) ([]string, error) { +func (q *DBQueue) ListQueues(tenantId int64) ([]string, error) { var queues []Queue res := q.DBG.Where("tenant_id = ?", tenantId).Select("name").Find(&queues) if res.Error != nil { @@ -261,7 +264,7 @@ func (q *SQLiteQueue) ListQueues(tenantId int64) ([]string, error) { return rc, nil } -func (q *SQLiteQueue) getQueue(tenantId int64, queueName string) (*Queue, error) { +func (q *DBQueue) getQueue(tenantId int64, queueName string) (*Queue, error) { rc := &Queue{} res := q.DBG.Where("tenant_id = ? AND name = ?", tenantId, queueName).First(rc) if res.RowsAffected != 1 { @@ -270,7 +273,7 @@ func (q *SQLiteQueue) getQueue(tenantId int64, queueName string) (*Queue, error) return rc, res.Error } -func (q *SQLiteQueue) Enqueue(tenantId int64, queueName string, message string, kv map[string]string, delay int) (int64, error) { +func (q *DBQueue) Enqueue(tenantId int64, queueName string, message string, kv map[string]string, delay int) (int64, error) { messageSnow := q.snow.Generate() messageId := messageSnow.Int64() @@ -318,7 +321,7 @@ func (q *SQLiteQueue) Enqueue(tenantId int64, queueName string, message string, } // Calculate how many messages to allow the user to dequeue based on queue's rate limit -func (q *SQLiteQueue) calculateRateLimit(queue *Queue, now int64, numToDequeue int) (int, error) { +func (q *DBQueue) calculateRateLimit(queue *Queue, now int64, numToDequeue int) (int, error) { maxToDequeue := numToDequeue bucketToCheck := now @@ -387,7 +390,7 @@ func (q *SQLiteQueue) calculateRateLimit(queue *Queue, now int64, numToDequeue i return maxToDequeue, nil } -func (q *SQLiteQueue) Dequeue(tenantId int64, queueName string, numToDequeue int, requeueIn int) ([]*models.Message, error) { +func (q *DBQueue) Dequeue(tenantId int64, queueName string, numToDequeue int, requeueIn int) ([]*models.Message, error) { queue, err := q.getQueue(tenantId, queueName) if err != nil { return nil, err @@ -482,7 +485,7 @@ func (q *SQLiteQueue) Dequeue(tenantId int64, queueName string, numToDequeue int return rc, nil } -func (q *SQLiteQueue) Peek(tenantId int64, queueName string, messageId int64) *models.Message { +func (q *DBQueue) Peek(tenantId int64, queueName string, messageId int64) *models.Message { queue, err := q.getQueue(tenantId, queueName) if err != nil { return nil @@ -502,7 +505,7 @@ func (q *SQLiteQueue) Peek(tenantId int64, queueName string, messageId int64) *m return message.ToModel() } -func (q *SQLiteQueue) Stats(tenantId int64, queueName string) models.QueueStats { +func (q *DBQueue) Stats(tenantId int64, queueName string) models.QueueStats { queue, err := q.getQueue(tenantId, queueName) if err != nil { return models.QueueStats{} @@ -551,7 +554,7 @@ func (q *SQLiteQueue) Stats(tenantId int64, queueName string) models.QueueStats return stats } -func (q *SQLiteQueue) Filter(tenantId int64, queueName string, filterCriteria models.FilterCriteria) []int64 { +func (q *DBQueue) Filter(tenantId int64, queueName string, filterCriteria models.FilterCriteria) []int64 { var rc []int64 queue, err := q.getQueue(tenantId, queueName) @@ -603,7 +606,7 @@ func (q *SQLiteQueue) Filter(tenantId int64, queueName string, filterCriteria mo return rc } -func (q *SQLiteQueue) Delete(tenantId int64, queueName string, messageId int64) error { +func (q *DBQueue) Delete(tenantId int64, queueName string, messageId int64) error { queue, err := q.getQueue(tenantId, queueName) if err != nil { return err @@ -631,7 +634,7 @@ func (q *SQLiteQueue) Delete(tenantId int64, queueName string, messageId int64) return err } -func (q *SQLiteQueue) Shutdown() error { +func (q *DBQueue) Shutdown() error { db, err := q.DBG.DB() if err != nil { return err diff --git a/queue/database/sqlite.go b/queue/database/sqlite.go new file mode 100644 index 0000000..0c792a1 --- /dev/null +++ b/queue/database/sqlite.go @@ -0,0 +1,67 @@ +package database + +import ( + "os" + "strings" + "time" + + "github.com/poundifdef/smoothmq/config" + + "github.com/rs/zerolog/log" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + _ "github.com/mattn/go-sqlite3" + + "gorm.io/driver/sqlite" + "gorm.io/gorm" +) + +var queueDiskSize = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "queue_disk_size", + Help: "Size of queue data on disk", + }, +) + +func NewSQLiteQueue(cfg config.ServerCommand, dbq *DBQueue, gcfg *gorm.Config) { + var err error + var path string + var args string + + if cfg.DB.DSN == "" { + path = cfg.SQLite.Path + args = "?_journal_mode=WAL&_foreign_keys=off&_auto_vacuum=full" + } else { + pieces := strings.Split(cfg.DB.DSN, "?") + path = pieces[0] + if len(pieces) == 2 { + args = "?" + pieces[1] + } + } + + dbq.Filename = path + + dbq.DBG, err = gorm.Open(sqlite.Open(path+args), gcfg) + + if err != nil { + log.Fatal().Err(err).Send() + } + + dbq.ticker = time.NewTicker(1 * time.Second) + + dbq.Migrate() + + go func() { + for { + select { + case <-dbq.ticker.C: + stat, err := os.Stat(dbq.Filename) + if err == nil { + queueDiskSize.Set(float64(stat.Size())) + } + } + } + }() +} From f5f9c3e78ec81b67a0387bd8f18baac8d2b070d7 Mon Sep 17 00:00:00 2001 From: John W Higgins Date: Tue, 26 Nov 2024 11:14:56 -0800 Subject: [PATCH 2/4] Fix foreign key Only need the ID/Message_ID column. The tenant_id, queue_id columns are not index and therefore PostgreSQL will not process as a foreign key. --- queue/database/database.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/queue/database/database.go b/queue/database/database.go index 641b77d..1f019c9 100644 --- a/queue/database/database.go +++ b/queue/database/database.go @@ -47,7 +47,7 @@ type Message struct { Message string `gorm:"not null"` - KV []KV `gorm:"foreignKey:TenantID,QueueID,MessageID;references:TenantID,QueueID,ID"` + KV []KV `gorm:"foreignKey:MessageID;references:ID"` } func (message *Message) ToModel() *models.Message { From cbddb8f3538f8f4344a654fb944a0d23e80cac05 Mon Sep 17 00:00:00 2001 From: John W Higgins Date: Tue, 26 Nov 2024 11:16:21 -0800 Subject: [PATCH 3/4] Wrap entire dequeue process in transaction Extend transaction to include select statement to allow for FOR UPDATE/SKIP LOCKED clauses to properly dequeue rows with PostgreSQL. --- queue/database/database.go | 57 ++++++++++++++++++-------------------- 1 file changed, 27 insertions(+), 30 deletions(-) diff --git a/queue/database/database.go b/queue/database/database.go index 1f019c9..5a631bb 100644 --- a/queue/database/database.go +++ b/queue/database/database.go @@ -418,7 +418,10 @@ func (q *DBQueue) Dequeue(tenantId int64, queueName string, numToDequeue int, re var messages []Message - res := q.DBG.Preload("KV").Where( + tx := q.DBG.Begin() + defer tx.Rollback() + + res := tx.Clauses(clause.Locking{Strength: "UPDATE", Options: "SKIP LOCKED"}).Preload("KV").Where( "deliver_at <= ? AND delivered_at <= ? AND (tries < max_tries OR max_tries = -1) AND tenant_id = ? AND queue_id = ?", now, now, tenantId, queue.ID). Limit(maxToDequeue). @@ -443,45 +446,39 @@ func (q *DBQueue) Dequeue(tenantId int64, queueName string, numToDequeue int, re messageIDs[i] = message.ID } - err = q.DBG.Transaction(func(tx *gorm.DB) error { - res = tx.Model(&Message{}).Where("tenant_id = ? AND queue_id = ? AND id in ?", tenantId, queue.ID, messageIDs). - UpdateColumns(map[string]any{ - "tries": gorm.Expr("tries+1"), - "delivered_at": now, - "deliver_at": gorm.Expr("?", now+int64(visibilityTimeout)), - }) - - if res.Error != nil { - return res.Error - } + res = tx.Model(&Message{}).Where("tenant_id = ? AND queue_id = ? AND id in ?", tenantId, queue.ID, messageIDs). + UpdateColumns(map[string]any{ + "tries": gorm.Expr("tries+1"), + "delivered_at": now, + "deliver_at": gorm.Expr("?", now+int64(visibilityTimeout)), + }) - if queue.RateLimit > 0 { - bucket := RateLimit{ - TenantID: tenantId, - QueueID: queue.ID, - Ts: now, - N: len(messageIDs), - } - res = tx.Clauses(clause.OnConflict{ - DoUpdates: clause.Assignments(map[string]interface{}{"n": gorm.Expr("n + ?", len(messageIDs))}), - }).Create(&bucket) + if res.Error != nil { + return nil, res.Error + } - if res.Error != nil && !errors.Is(res.Error, gorm.ErrDuplicatedKey) { - return res.Error - } + if queue.RateLimit > 0 { + bucket := RateLimit{ + TenantID: tenantId, + QueueID: queue.ID, + Ts: now, + N: len(messageIDs), } + res = tx.Clauses(clause.OnConflict{ + DoUpdates: clause.Assignments(map[string]interface{}{"n": gorm.Expr("n + ?", len(messageIDs))}), + }).Create(&bucket) - return nil - }) - - if err != nil { - return nil, err + if res.Error != nil && !errors.Is(res.Error, gorm.ErrDuplicatedKey) { + return nil, res.Error + } } for _, messageId := range messageIDs { log.Debug().Int64("message_id", messageId).Msg("Dequeued message") } + tx.Commit() + return rc, nil } From 04f5736a05816f4da7860a8c9ef5a5f054b68815 Mon Sep 17 00:00:00 2001 From: John W Higgins Date: Tue, 26 Nov 2024 11:17:51 -0800 Subject: [PATCH 4/4] Fix extraneous query parameter Move parameter addition into proper if clause. SQLite apparantly ignores extra parameters where other databases are more strict. --- queue/database/database.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/queue/database/database.go b/queue/database/database.go index 5a631bb..b02882b 100644 --- a/queue/database/database.go +++ b/queue/database/database.go @@ -585,14 +585,13 @@ func (q *DBQueue) Filter(tenantId int64, queueName string, filterCriteria models sql += " ) GROUP BY message_id HAVING count(*) = ? LIMIT 10" sql += " ) " - } + for k, v := range filterCriteria.KV { + args = append(args, k, v, tenantId, queue.ID) + } - for k, v := range filterCriteria.KV { - args = append(args, k, v, tenantId, queue.ID) + args = append(args, len(filterCriteria.KV)) } - args = append(args, len(filterCriteria.KV)) - sql += "LIMIT 10" res := q.DBG.Raw(sql, args...).Scan(&rc)