Skip to content

Commit

Permalink
mv pkg/db pkg/pg
Browse files Browse the repository at this point in the history
Signed-off-by: hmoazzem <moazzem@edgeflare.io>
  • Loading branch information
hmoazzem committed Oct 28, 2024
1 parent fb8cbc7 commit 7296e1c
Show file tree
Hide file tree
Showing 11 changed files with 664 additions and 16 deletions.
2 changes: 1 addition & 1 deletion pkg/config/config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ peers:
- name: mqtt-default
connector: mqtt
config: # github.com/eclipse/paho.mqtt.golang.ClientOptions
Servers: ["tcp://localhost:1883"]
servers: ["tcp://localhost:1883"]
# username: ""
# password: ""
- name: kafka-default
Expand Down
2 changes: 1 addition & 1 deletion pkg/db/conn.go → pkg/pg/conn.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package db
package pg

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion pkg/db/insert.go → pkg/pg/insert.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package db
package pg

import (
"context"
Expand Down
68 changes: 68 additions & 0 deletions pkg/pg/pg_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package pg_test

import (
"context"
"os"
"testing"
"time"

"github.com/edgeflare/pgo/pkg/pg"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/stretchr/testify/require"
)

// Compile-time interface compliance checks
var (
_ pg.Conn = (*pgx.Conn)(nil)
_ pg.Conn = (*pgxpool.Pool)(nil)
)

var testConnString string

func init() {
// Use environment variable for test database connection
testConnString = os.Getenv("TEST_POSTGRES_CONN_STRING")
if testConnString == "" {
testConnString = "postgres://postgres:secret@localhost:5432/testdb?sslmode=disable"
}
}

// TestRunner holds common test resources for all pg package tests
type TestRunner struct {
ctx context.Context
conn pg.Conn
t testing.TB
}

// NewTestRunner creates a new test runner instance with a database connection
func NewTestRunner(t testing.TB) *TestRunner {
ctx := context.Background()
pm := pg.GetPoolManager()

// Setup test pool with notice handler
cfg := pg.PoolConfig{
Name: "test_pool",
ConnString: testConnString,
MaxConns: 5,
MinConns: 1,
MaxIdleTime: 5 * time.Minute,
MaxLifetime: 30 * time.Minute,
HealthCheck: 1 * time.Minute,
OnNotice: func(_ *pgconn.PgConn, n *pgconn.Notice) {
t.Logf("PostgreSQL %s: %s", n.Severity, n.Message)
},
}

require.NoError(t, pm.Add(ctx, cfg), "failed to create test pool")

pool, err := pm.Get("test_pool")
require.NoError(t, err, "failed to get test pool")

return &TestRunner{
conn: pool,
ctx: ctx,
t: t,
}
}
143 changes: 143 additions & 0 deletions pkg/pg/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package pg

import (
"context"
"fmt"
"sync"
"time"

"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
)

// PoolConfig defines parameters for a database connection pool.
type PoolConfig struct {
Name string // Unique identifier for the pool
ConnString string // PostgreSQL connection string
MaxConns int32 // Maximum number of connections (default: 4)
MinConns int32 // Minimum number of idle connections (default: 0)
MaxIdleTime time.Duration // Maximum time a connection can be idle (default: 30m)
MaxLifetime time.Duration // Maximum lifetime of a connection (default: 60m)
HealthCheck time.Duration // Interval between health checks (default: 1m)
OnNotice func(*pgconn.PgConn, *pgconn.Notice)
}

// PoolManager maintains a registry of named pools for database connections.
type PoolManager struct {
pools map[string]*pgxpool.Pool
mu sync.RWMutex
}

var (
instance *PoolManager
once sync.Once
)

// GetManager returns the singleton pool manager instance.
func GetPoolManager() *PoolManager {
once.Do(func() {
instance = &PoolManager{pools: make(map[string]*pgxpool.Pool)}
})
return instance
}

// Default pool configuration values
const (
defaultMaxConns int32 = 4 // pgxpool default
defaultMinConns int32 = 0 // pgxpool default
defaultMaxIdleTime time.Duration = 30 * time.Minute // pgxpool default
defaultMaxLifetime time.Duration = 60 * time.Minute // pgxpool default
defaultHealthCheck time.Duration = 1 * time.Minute // pgxpool default
)

// Add creates and registers a new connection pool with the given configuration.
func (m *PoolManager) Add(ctx context.Context, cfg PoolConfig) error {
m.mu.Lock()
defer m.mu.Unlock()

poolConfig, err := pgxpool.ParseConfig(cfg.ConnString)
if err != nil {
return fmt.Errorf("parse config: %w", err)
}

// Apply configuration with defaults
poolConfig.MaxConns = defaultMaxConns
if cfg.MaxConns > 0 {
poolConfig.MaxConns = cfg.MaxConns
}
poolConfig.MinConns = defaultMinConns
if cfg.MinConns >= 0 {
poolConfig.MinConns = cfg.MinConns
}
poolConfig.MaxConnIdleTime = defaultMaxIdleTime
if cfg.MaxIdleTime > 0 {
poolConfig.MaxConnIdleTime = cfg.MaxIdleTime
}
if cfg.MaxLifetime > 0 {
poolConfig.MaxConnLifetime = cfg.MaxLifetime
}
if cfg.HealthCheck > 0 {
poolConfig.HealthCheckPeriod = cfg.HealthCheck
}

pool, err := pgxpool.NewWithConfig(ctx, poolConfig)
if err != nil {
return fmt.Errorf("create pool: %w", err)
}

if err := pool.Ping(ctx); err != nil {
pool.Close()
return fmt.Errorf("ping database: %w", err)
}

m.pools[cfg.Name] = pool
return nil
}

// Get returns the connection pool for the given database name.
// It returns an error if the pool doesn't exist.
func (m *PoolManager) Get(name string) (*pgxpool.Pool, error) {
m.mu.RLock()
defer m.mu.RUnlock()

if pool, ok := m.pools[name]; ok {
return pool, nil
}
return nil, fmt.Errorf("pool %q not found", name)
}

// Close closes all managed connection pools.
func (m *PoolManager) Close() {
m.mu.Lock()
defer m.mu.Unlock()

for _, p := range m.pools {
p.Close()
}
}

// Remove closes and removes the named connection pool.
// It returns an error if the pool doesn't exist.
func (m *PoolManager) Remove(name string) error {
m.mu.Lock()
defer m.mu.Unlock()

if pool, ok := m.pools[name]; ok {
pool.Close()
delete(m.pools, name)
return nil
}
return fmt.Errorf("pool %q not found", name)
}

// List returns the names of all managed connection pools.
func (m *PoolManager) List() []string {
m.mu.RLock()
defer m.mu.RUnlock()

names := make([]string, 0, len(m.pools))
for name := range m.pools {
names = append(names, name)
}
return names
}
Loading

0 comments on commit 7296e1c

Please sign in to comment.