diff --git a/pkg/config/config.example.yaml b/pkg/config/config.example.yaml index a09f39c..01c3ebf 100644 --- a/pkg/config/config.example.yaml +++ b/pkg/config/config.example.yaml @@ -8,7 +8,7 @@ peers: - name: mqtt-default connector: mqtt config: # github.com/eclipse/paho.mqtt.golang.ClientOptions - Servers: ["tcp://localhost:1883"] + servers: ["tcp://localhost:1883"] # username: "" # password: "" - name: kafka-default diff --git a/pkg/db/conn.go b/pkg/pg/conn.go similarity index 99% rename from pkg/db/conn.go rename to pkg/pg/conn.go index c1291ea..6f67578 100644 --- a/pkg/db/conn.go +++ b/pkg/pg/conn.go @@ -1,4 +1,4 @@ -package db +package pg import ( "context" diff --git a/pkg/db/insert.go b/pkg/pg/insert.go similarity index 99% rename from pkg/db/insert.go rename to pkg/pg/insert.go index dc02bca..88c8754 100644 --- a/pkg/db/insert.go +++ b/pkg/pg/insert.go @@ -1,4 +1,4 @@ -package db +package pg import ( "context" diff --git a/pkg/pg/pg_test.go b/pkg/pg/pg_test.go new file mode 100644 index 0000000..540a7c3 --- /dev/null +++ b/pkg/pg/pg_test.go @@ -0,0 +1,68 @@ +package pg_test + +import ( + "context" + "os" + "testing" + "time" + + "github.com/edgeflare/pgo/pkg/pg" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/stretchr/testify/require" +) + +// Compile-time interface compliance checks +var ( + _ pg.Conn = (*pgx.Conn)(nil) + _ pg.Conn = (*pgxpool.Pool)(nil) +) + +var testConnString string + +func init() { + // Use environment variable for test database connection + testConnString = os.Getenv("TEST_POSTGRES_CONN_STRING") + if testConnString == "" { + testConnString = "postgres://postgres:secret@localhost:5432/testdb?sslmode=disable" + } +} + +// TestRunner holds common test resources for all pg package tests +type TestRunner struct { + ctx context.Context + conn pg.Conn + t testing.TB +} + +// NewTestRunner creates a new test runner instance with a database connection +func NewTestRunner(t testing.TB) *TestRunner { + ctx := context.Background() + pm := pg.GetPoolManager() + + // Setup test pool with notice handler + cfg := pg.PoolConfig{ + Name: "test_pool", + ConnString: testConnString, + MaxConns: 5, + MinConns: 1, + MaxIdleTime: 5 * time.Minute, + MaxLifetime: 30 * time.Minute, + HealthCheck: 1 * time.Minute, + OnNotice: func(_ *pgconn.PgConn, n *pgconn.Notice) { + t.Logf("PostgreSQL %s: %s", n.Severity, n.Message) + }, + } + + require.NoError(t, pm.Add(ctx, cfg), "failed to create test pool") + + pool, err := pm.Get("test_pool") + require.NoError(t, err, "failed to get test pool") + + return &TestRunner{ + conn: pool, + ctx: ctx, + t: t, + } +} diff --git a/pkg/pg/pool.go b/pkg/pg/pool.go new file mode 100644 index 0000000..474616c --- /dev/null +++ b/pkg/pg/pool.go @@ -0,0 +1,143 @@ +package pg + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/jackc/pgx/v5/pgconn" + "github.com/jackc/pgx/v5/pgxpool" +) + +// PoolConfig defines parameters for a database connection pool. +type PoolConfig struct { + Name string // Unique identifier for the pool + ConnString string // PostgreSQL connection string + MaxConns int32 // Maximum number of connections (default: 4) + MinConns int32 // Minimum number of idle connections (default: 0) + MaxIdleTime time.Duration // Maximum time a connection can be idle (default: 30m) + MaxLifetime time.Duration // Maximum lifetime of a connection (default: 60m) + HealthCheck time.Duration // Interval between health checks (default: 1m) + OnNotice func(*pgconn.PgConn, *pgconn.Notice) +} + +// PoolManager maintains a registry of named pools for database connections. +type PoolManager struct { + pools map[string]*pgxpool.Pool + mu sync.RWMutex +} + +var ( + instance *PoolManager + once sync.Once +) + +// GetManager returns the singleton pool manager instance. +func GetPoolManager() *PoolManager { + once.Do(func() { + instance = &PoolManager{pools: make(map[string]*pgxpool.Pool)} + }) + return instance +} + +// Default pool configuration values +const ( + defaultMaxConns int32 = 4 // pgxpool default + defaultMinConns int32 = 0 // pgxpool default + defaultMaxIdleTime time.Duration = 30 * time.Minute // pgxpool default + defaultMaxLifetime time.Duration = 60 * time.Minute // pgxpool default + defaultHealthCheck time.Duration = 1 * time.Minute // pgxpool default +) + +// Add creates and registers a new connection pool with the given configuration. +func (m *PoolManager) Add(ctx context.Context, cfg PoolConfig) error { + m.mu.Lock() + defer m.mu.Unlock() + + poolConfig, err := pgxpool.ParseConfig(cfg.ConnString) + if err != nil { + return fmt.Errorf("parse config: %w", err) + } + + // Apply configuration with defaults + poolConfig.MaxConns = defaultMaxConns + if cfg.MaxConns > 0 { + poolConfig.MaxConns = cfg.MaxConns + } + poolConfig.MinConns = defaultMinConns + if cfg.MinConns >= 0 { + poolConfig.MinConns = cfg.MinConns + } + poolConfig.MaxConnIdleTime = defaultMaxIdleTime + if cfg.MaxIdleTime > 0 { + poolConfig.MaxConnIdleTime = cfg.MaxIdleTime + } + if cfg.MaxLifetime > 0 { + poolConfig.MaxConnLifetime = cfg.MaxLifetime + } + if cfg.HealthCheck > 0 { + poolConfig.HealthCheckPeriod = cfg.HealthCheck + } + + pool, err := pgxpool.NewWithConfig(ctx, poolConfig) + if err != nil { + return fmt.Errorf("create pool: %w", err) + } + + if err := pool.Ping(ctx); err != nil { + pool.Close() + return fmt.Errorf("ping database: %w", err) + } + + m.pools[cfg.Name] = pool + return nil +} + +// Get returns the connection pool for the given database name. +// It returns an error if the pool doesn't exist. +func (m *PoolManager) Get(name string) (*pgxpool.Pool, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + if pool, ok := m.pools[name]; ok { + return pool, nil + } + return nil, fmt.Errorf("pool %q not found", name) +} + +// Close closes all managed connection pools. +func (m *PoolManager) Close() { + m.mu.Lock() + defer m.mu.Unlock() + + for _, p := range m.pools { + p.Close() + } +} + +// Remove closes and removes the named connection pool. +// It returns an error if the pool doesn't exist. +func (m *PoolManager) Remove(name string) error { + m.mu.Lock() + defer m.mu.Unlock() + + if pool, ok := m.pools[name]; ok { + pool.Close() + delete(m.pools, name) + return nil + } + return fmt.Errorf("pool %q not found", name) +} + +// List returns the names of all managed connection pools. +func (m *PoolManager) List() []string { + m.mu.RLock() + defer m.mu.RUnlock() + + names := make([]string, 0, len(m.pools)) + for name := range m.pools { + names = append(names, name) + } + return names +} diff --git a/pkg/pg/pool_test.go b/pkg/pg/pool_test.go new file mode 100644 index 0000000..eebd1bc --- /dev/null +++ b/pkg/pg/pool_test.go @@ -0,0 +1,176 @@ +package pg_test + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/edgeflare/pgo/pkg/pg" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + testConnStr = "postgres://postgres:secret@localhost:5432/testdb" + invalidConnStr = "postgres://invalid:invalid@localhost:5432/invalid_db" +) + +// getTestConfig returns a test configuration +func getTestConfig(name string) pg.PoolConfig { + return pg.PoolConfig{ + Name: name, + ConnString: testConnStr, + MaxConns: 5, + MinConns: 1, + MaxIdleTime: time.Minute * 5, + MaxLifetime: time.Minute * 30, + HealthCheck: time.Minute * 1, + } +} + +// PoolSuite holds resources for pool manager tests +type PoolSuite struct { + ctx context.Context + pm *pg.PoolManager + t testing.TB +} + +// NewPoolSuite creates a new pool test suite instance +func NewPoolSuite(t testing.TB) *PoolSuite { + return &PoolSuite{ + ctx: context.Background(), + pm: pg.GetPoolManager(), + t: t, + } +} + +func TestPoolManager(t *testing.T) { + suite := NewPoolSuite(t) + // defer suite.Cleanup() + + t.Run("singleton pattern", func(t *testing.T) { + manager1 := pg.GetPoolManager() + manager2 := pg.GetPoolManager() + + assert.Same(t, manager1, manager2, "should return same instance") + }) + + t.Run("pool lifecycle", func(t *testing.T) { + // Test pool creation + cfg := getTestConfig("lifecycle_test") + require.NoError(t, suite.pm.Add(suite.ctx, cfg)) + + // Test pool retrieval + p, err := suite.pm.Get("lifecycle_test") + require.NoError(t, err) + require.NotNil(t, p) + + // Verify pool is functional + var result int + require.NoError(t, p.QueryRow(suite.ctx, "SELECT 1").Scan(&result)) + assert.Equal(t, 1, result) + + // Test pool removal + require.NoError(t, suite.pm.Remove("lifecycle_test")) + _, err = suite.pm.Get("lifecycle_test") + assert.Error(t, err) + }) + + t.Run("configuration validation", func(t *testing.T) { + tests := []struct { + name string + cfg pg.PoolConfig + wantErr bool + }{ + { + name: "valid config", + cfg: getTestConfig("valid_config"), + wantErr: false, + }, + { + name: "invalid connection string", + cfg: pg.PoolConfig{ + Name: "invalid_conn", + ConnString: invalidConnStr, + }, + wantErr: true, + }, + { + name: "zero connections", + cfg: pg.PoolConfig{ + Name: "zero_conns", + ConnString: testConnStr, + MaxConns: 0, + }, + wantErr: false, // Should use default + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := suite.pm.Add(suite.ctx, tt.cfg) + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } + }) + + t.Run("concurrent operations", func(t *testing.T) { + var wg sync.WaitGroup + numGoroutines := 10 + errChan := make(chan error, numGoroutines) + + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + + name := fmt.Sprintf("concurrent_%d", id) + cfg := getTestConfig(name) + + // Test concurrent add + if err := suite.pm.Add(suite.ctx, cfg); err != nil { + errChan <- fmt.Errorf("failed to add pool %s: %w", name, err) + return + } + + // Test concurrent get + if _, err := suite.pm.Get(name); err != nil { + errChan <- fmt.Errorf("failed to get pool %s: %w", name, err) + return + } + }(i) + } + + wg.Wait() + close(errChan) + + for err := range errChan { + t.Errorf("concurrent operation error: %v", err) + } + + // Verify final state + pools := suite.pm.List() + assert.Equal(t, numGoroutines, len(pools)) + }) + + t.Run("pool listing", func(t *testing.T) { + // Add multiple pools + poolNames := []string{"list_test1", "list_test2", "list_test3"} + for _, name := range poolNames { + require.NoError(t, suite.pm.Add(suite.ctx, getTestConfig(name))) + } + + // Verify listing + list := suite.pm.List() + assert.Equal(t, len(poolNames), len(list)) + for _, name := range poolNames { + assert.Contains(t, list, name) + } + }) +} diff --git a/pkg/role/role.go b/pkg/pg/role/role.go similarity index 93% rename from pkg/role/role.go rename to pkg/pg/role/role.go index d07acef..4b75650 100644 --- a/pkg/role/role.go +++ b/pkg/pg/role/role.go @@ -8,7 +8,7 @@ import ( "strings" "time" - "github.com/edgeflare/pgo/pkg/db" + "github.com/edgeflare/pgo/pkg/pg" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" ) @@ -52,7 +52,7 @@ type Role struct { // List retrieves all PostgreSQL roles from the database. // // It queries the database using `SELECT * FROM pg_roles;` and returns a slice of Role, or any error encountered. -func List(ctx context.Context, conn db.Conn) ([]Role, error) { +func List(ctx context.Context, conn pg.Conn) ([]Role, error) { rows, err := conn.Query(ctx, roleSelectQuery) if err != nil { return nil, fmt.Errorf("failed to query roles: %w", err) @@ -77,19 +77,19 @@ func List(ctx context.Context, conn db.Conn) ([]Role, error) { // Create adds a new PostgreSQL role to the database based on the provided Role struct. // It returns an error if role already exists or the creation process fails -func Create(ctx context.Context, conn db.Conn, role Role) error { +func Create(ctx context.Context, conn pg.Conn, role Role) error { return createOrAlterRole(ctx, conn, role, true) } // Update modifies an existing PostgreSQL role in the database. // It updates the role's attributes based on the provided Role struct. // If the role does not exist, an error is returned. -func Update(ctx context.Context, conn db.Conn, role Role) error { +func Update(ctx context.Context, conn pg.Conn, role Role) error { return createOrAlterRole(ctx, conn, role, false) } // Get returns the role identified by roleName, or ErrRoleNotFound if it doesn't exists -func Get(ctx context.Context, conn db.Conn, roleName string) (*Role, error) { +func Get(ctx context.Context, conn pg.Conn, roleName string) (*Role, error) { var role *Role role, err := scanRole(conn.QueryRow(ctx, roleSelectQuery+` WHERE rolname = $1`, roleName)) if err != nil { @@ -103,7 +103,7 @@ func Get(ctx context.Context, conn db.Conn, roleName string) (*Role, error) { } // Delete removes a PostgreSQL role from the database. -func Delete(ctx context.Context, conn db.Conn, roleName string) error { +func Delete(ctx context.Context, conn pg.Conn, roleName string) error { deleteQuery := fmt.Sprintf("DROP ROLE IF EXISTS %s", pgx.Identifier{roleName}.Sanitize()) _, err := conn.Exec(ctx, deleteQuery) @@ -169,7 +169,7 @@ func addRoleAttributes(builder *strings.Builder, role Role) { } } -func alterRoleConfigAndPassword(ctx context.Context, conn db.Conn, role Role) error { +func alterRoleConfigAndPassword(ctx context.Context, conn pg.Conn, role Role) error { if len(role.Config) > 0 { configStr := strings.Join(role.Config, ", ") alterQuery := fmt.Sprintf("ALTER ROLE %s SET %s", pgx.Identifier{role.Name}.Sanitize(), configStr) @@ -189,7 +189,7 @@ func alterRoleConfigAndPassword(ctx context.Context, conn db.Conn, role Role) er } // createOrAlterRole constructs and executes a CREATE or ALTER ROLE query. -func createOrAlterRole(ctx context.Context, conn db.Conn, role Role, isCreate bool) error { +func createOrAlterRole(ctx context.Context, conn pg.Conn, role Role, isCreate bool) error { var queryBuilder strings.Builder if isCreate { queryBuilder.WriteString(fmt.Sprintf("CREATE ROLE %s", pgx.Identifier{role.Name}.Sanitize())) diff --git a/pkg/role/role_test.go b/pkg/pg/role/role_test.go similarity index 99% rename from pkg/role/role_test.go rename to pkg/pg/role/role_test.go index 9337e28..633a8f4 100644 --- a/pkg/role/role_test.go +++ b/pkg/pg/role/role_test.go @@ -7,7 +7,7 @@ import ( "testing" "time" - "github.com/edgeflare/pgo/pkg/role" + "github.com/edgeflare/pgo/pkg/pg/role" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxtest" diff --git a/pkg/pg/schema.go b/pkg/pg/schema.go new file mode 100644 index 0000000..547e0c2 --- /dev/null +++ b/pkg/pg/schema.go @@ -0,0 +1,151 @@ +package pg + +import ( + "context" + "fmt" +) + +// Table represents a database table. +type Table struct { + Schema string + Name string + Columns []Column + PrimaryKey []string + ForeignKeys []ForeignKey +} + +// Column represents a column in a table. +type Column struct { + Name string + DataType string + IsNullable bool + IsPrimaryKey bool +} + +// ForeignKey represents a foreign key relationship. +type ForeignKey struct { + Column string + ReferencedTable string + ReferencedColumn string +} + +// LoadSchema queries and returns the tables in the given schema. +func LoadSchema(ctx context.Context, conn Conn, schemaName string) (map[string]Table, error) { + cache := make(map[string]Table) + + // Query tables + rows, err := conn.Query(ctx, ` + SELECT table_schema, table_name + FROM information_schema.tables + WHERE table_schema = $1 AND table_type = 'BASE TABLE'; + `, schemaName) + if err != nil { + return nil, fmt.Errorf("failed to query tables: %w", err) + } + defer rows.Close() + + // Process tables + for rows.Next() { + var schema, tableName string + if err := rows.Scan(&schema, &tableName); err != nil { + return nil, err + } + + // Fetch columns for the table + columns, primaryKey, err := getColumns(ctx, conn, schema, tableName) + if err != nil { + return nil, err + } + + // Fetch foreign keys + foreignKeys, err := getForeignKeys(ctx, conn, schema, tableName) + if err != nil { + return nil, err + } + + cache[tableName] = Table{ + Schema: schema, + Name: tableName, + Columns: columns, + PrimaryKey: primaryKey, + ForeignKeys: foreignKeys, + } + } + + return cache, nil +} + +func getColumns(ctx context.Context, conn Conn, schema, table string) ([]Column, []string, error) { + rows, err := conn.Query(ctx, ` + SELECT + c.column_name, + c.data_type, + c.is_nullable = 'YES', + (EXISTS ( + SELECT 1 + FROM information_schema.table_constraints tc + JOIN information_schema.key_column_usage kcu + ON tc.constraint_name = kcu.constraint_name + AND tc.table_schema = kcu.table_schema + WHERE tc.constraint_type = 'PRIMARY KEY' + AND tc.table_schema = $1 + AND tc.table_name = $2 + AND kcu.column_name = c.column_name + )) AS is_primary_key + FROM information_schema.columns c + WHERE c.table_schema = $1 AND c.table_name = $2; + `, schema, table) + if err != nil { + return nil, nil, err + } + defer rows.Close() + + var columns []Column + var primaryKey []string + for rows.Next() { + var col Column + if err := rows.Scan(&col.Name, &col.DataType, &col.IsNullable, &col.IsPrimaryKey); err != nil { + return nil, nil, err + } + columns = append(columns, col) + if col.IsPrimaryKey { + primaryKey = append(primaryKey, col.Name) + } + } + + return columns, primaryKey, nil +} + +func getForeignKeys(ctx context.Context, conn Conn, schema, table string) ([]ForeignKey, error) { + rows, err := conn.Query(ctx, ` + SELECT + kcu.column_name, + ccu.table_name AS referenced_table, + ccu.column_name AS referenced_column + FROM information_schema.table_constraints AS tc + JOIN information_schema.key_column_usage AS kcu + ON tc.constraint_name = kcu.constraint_name + AND tc.table_schema = kcu.table_schema + JOIN information_schema.constraint_column_usage AS ccu + ON ccu.constraint_name = tc.constraint_name + AND ccu.table_schema = tc.table_schema + WHERE tc.constraint_type = 'FOREIGN KEY' + AND tc.table_schema = $1 + AND tc.table_name = $2; + `, schema, table) + if err != nil { + return nil, err + } + defer rows.Close() + + var foreignKeys []ForeignKey + for rows.Next() { + var fk ForeignKey + if err := rows.Scan(&fk.Column, &fk.ReferencedTable, &fk.ReferencedColumn); err != nil { + return nil, err + } + foreignKeys = append(foreignKeys, fk) + } + + return foreignKeys, nil +} diff --git a/pkg/pg/schema_test.go b/pkg/pg/schema_test.go new file mode 100644 index 0000000..a0a12d8 --- /dev/null +++ b/pkg/pg/schema_test.go @@ -0,0 +1,110 @@ +package pg_test + +import ( + "context" + "testing" + + "github.com/edgeflare/pgo/pkg/pg" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestLoadSchema(t *testing.T) { + suite := NewTestRunner(t) + + // Create test tables + _, err := suite.conn.Exec(suite.ctx, ` + DROP TABLE IF EXISTS test_orders; + DROP TABLE IF EXISTS test_users; + + CREATE TABLE test_users ( + id SERIAL PRIMARY KEY, + username VARCHAR(50) NOT NULL, + email VARCHAR(100) UNIQUE NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ); + + CREATE TABLE test_orders ( + id SERIAL PRIMARY KEY, + user_id INTEGER NOT NULL, + amount DECIMAL(10,2) NOT NULL, + status VARCHAR(20) NOT NULL, + FOREIGN KEY (user_id) REFERENCES test_users(id) + ); + `) + require.NoError(t, err) + + t.Run("load schema successfully", func(t *testing.T) { + tables, err := pg.LoadSchema(suite.ctx, suite.conn, "public") + require.NoError(t, err) + + // Verify test_users table + usersTable, exists := tables["test_users"] + require.True(t, exists) + assert.Equal(t, "public", usersTable.Schema) + assert.Equal(t, "test_users", usersTable.Name) + assert.Equal(t, []string{"id"}, usersTable.PrimaryKey) + + // Verify test_users columns + expectedUserColumns := map[string]pg.Column{ + "id": { + Name: "id", + DataType: "integer", + IsNullable: false, + IsPrimaryKey: true, + }, + "username": { + Name: "username", + DataType: "character varying", + IsNullable: false, + IsPrimaryKey: false, + }, + "email": { + Name: "email", + DataType: "character varying", + IsNullable: false, + IsPrimaryKey: false, + }, + "created_at": { + Name: "created_at", + DataType: "timestamp without time zone", + IsNullable: true, + IsPrimaryKey: false, + }, + } + + for _, col := range usersTable.Columns { + expected, exists := expectedUserColumns[col.Name] + require.True(t, exists) + assert.Equal(t, expected, col) + } + + // Verify test_orders table + ordersTable, exists := tables["test_orders"] + require.True(t, exists) + assert.Equal(t, "public", ordersTable.Schema) + assert.Equal(t, "test_orders", ordersTable.Name) + assert.Equal(t, []string{"id"}, ordersTable.PrimaryKey) + + // Verify foreign keys + require.Len(t, ordersTable.ForeignKeys, 1) + assert.Equal(t, pg.ForeignKey{ + Column: "user_id", + ReferencedTable: "test_users", + ReferencedColumn: "id", + }, ordersTable.ForeignKeys[0]) + }) + + t.Run("load schema with invalid schema name", func(t *testing.T) { + tables, err := pg.LoadSchema(suite.ctx, suite.conn, "nonexistent_schema") + require.NoError(t, err) + assert.Empty(t, tables) + }) + + t.Run("load schema with context cancellation", func(t *testing.T) { + ctx, cancel := context.WithCancel(suite.ctx) + cancel() // Cancel immediately + _, err := pg.LoadSchema(ctx, suite.conn, "public") + assert.Error(t, err) + }) +} diff --git a/pkg/pipeline/mqtt/postgres.go b/pkg/pipeline/mqtt/postgres.go index c35bd54..828d8db 100644 --- a/pkg/pipeline/mqtt/postgres.go +++ b/pkg/pipeline/mqtt/postgres.go @@ -9,7 +9,7 @@ import ( mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/edgeflare/pgo" - "github.com/edgeflare/pgo/pkg/db" + "github.com/edgeflare/pgo/pkg/pg" "github.com/edgeflare/pgo/pkg/x/logrepl" "go.uber.org/zap" ) @@ -26,7 +26,7 @@ func (c *Client) MessageToPostgres(client mqtt.Client, msg mqtt.Message) { return } - pgConn, connErr := pool.Acquire(context.Background()) + conn, connErr := pool.Acquire(context.Background()) if connErr != nil { c.logger.Error("Failed to acquire PostgreSQL connection", zap.Error(connErr)) return @@ -59,8 +59,8 @@ func (c *Client) MessageToPostgres(client mqtt.Client, msg mqtt.Message) { var err error switch operation { case logrepl.OperationInsert: - // err = c.insertRecord(ctx, pgConn, tableName, payload) - err = db.InsertRow(ctx, pgConn, tableName, msg.Payload()) + // err = c.insertRecord(ctx, conn, tableName, payload) + err = pg.InsertRow(ctx, conn, tableName, msg.Payload()) if err != nil { c.logger.Error("Failed to insert record", zap.Error(err)) return