DbPool Manager

The dbpool_manager service provides centralized management of database connection pools with support for multi-tenancy and named pool configurations. It allows dynamic creation and management of multiple database pools based on tenant IDs or custom names.

Table of Contents

Overview

Service Type: dbpool_manager

Interface: serviceapi.DbPoolManager

Key Features:

✓ Dynamic Pool Creation   - Create pools on-demand
✓ Multi-Tenancy Support   - Isolate tenant databases
✓ Named Pool Management   - Custom pool configurations
✓ DSN-Based Pooling       - Share pools across tenants
✓ Thread-Safe Operations  - Concurrent-safe pool access
✓ Automatic Cleanup       - Graceful shutdown handling

Configuration

Config Struct

// No specific configuration required
// Pools are created dynamically based on DSN strings

YAML Configuration

Basic Configuration:

services:
  db-pool-manager:
    type: dbpool_manager

With Default PostgreSQL Pool:

services:
  db-pool-manager:
    type: dbpool_manager
    config:
      pool_type: pgx  # Uses PostgreSQL pgx driver

Programmatic Configuration

import (
    "github.com/primadi/lokstra/lokstra_registry"
    "github.com/primadi/lokstra/services/dbpool_manager"
)

// Register service
dbpool_manager.Register()

// Create pool manager
manager := lokstra_registry.GetService[serviceapi.DbPoolManager]("db-pool-manager")

Registration

Basic Registration

import "github.com/primadi/lokstra/services/dbpool_manager"

func init() {
    dbpool_manager.Register()
}

Bulk Registration

import "github.com/primadi/lokstra/services"

func main() {
    // Registers all services including dbpool_manager
    services.RegisterAllServices()
}

Core Concepts

Pool Management Strategy

The DbPool Manager uses three main strategies for pool management:

  1. DSN-Based Pools - Shared pools identified by connection string
  2. Tenant-Based Pools - Tenant-specific database configurations
  3. Named Pools - Custom-named pool configurations

All three strategies share the same underlying pool instances when DSNs match, ensuring efficient resource usage.

Interface Overview

type DbPoolManager interface {
    // DSN-based pool management
    GetDsnPool(dsn string) (DbPool, error)
    
    // Tenant-based pool management
    SetTenantDsn(tenant string, dsn string, schema string)
    GetTenantDsn(tenant string) (string, string, error)
    GetTenantPool(tenant string) (DbPool, error)
    RemoveTenant(tenant string)
    AcquireTenantConn(ctx context.Context, tenant string) (DbConn, error)
    
    // Named pool management
    SetNamedDsn(name string, dsn string, schema string)
    GetNamedDsn(name string) (string, string, error)
    GetNamedPool(name string) (DbPool, error)
    RemoveNamed(name string)
    AcquireNamedConn(ctx context.Context, name string) (DbConn, error)
    
    // Shutdown
    Shutdown() error
}

DSN-Based Pool Management

Get or Create Pool

The GetDsnPool method returns an existing pool or creates a new one if it doesn’t exist.

import (
    "context"
    "github.com/primadi/lokstra/serviceapi"
)

manager := lokstra_registry.GetService[serviceapi.DbPoolManager]("db-pool-manager")

// Get or create pool for DSN
dsn := "postgres://user:pass@localhost:5432/mydb?sslmode=disable"
pool, err := manager.GetDsnPool(dsn)
if err != nil {
    log.Fatal(err)
}

// Acquire connection from pool
conn, err := pool.Acquire(context.Background(), "public")
if err != nil {
    log.Fatal(err)
}
defer conn.Release()

// Use connection
rows, err := conn.Query(context.Background(), "SELECT * FROM users")

Pool Sharing

Pools with identical DSNs are automatically shared:

// First call creates the pool
pool1, _ := manager.GetDsnPool("postgres://localhost/db1")

// Second call with same DSN returns the same pool instance
pool2, _ := manager.GetDsnPool("postgres://localhost/db1")

// pool1 == pool2 (same instance)

Tenant-Based Pool Management

Setting Tenant Configuration

Configure database connections for specific tenants:

manager := lokstra_registry.GetService[serviceapi.DbPoolManager]("db-pool-manager")

// Configure tenant database
manager.SetTenantDsn(
    "tenant-123",                                           // Tenant ID
    "postgres://user:pass@localhost:5432/tenant_db",       // DSN
    "tenant_123",                                          // Schema name
)

manager.SetTenantDsn(
    "tenant-456",
    "postgres://user:pass@localhost:5432/tenant_db",
    "tenant_456",
)

Getting Tenant Pool

// Get pool for specific tenant
pool, err := manager.GetTenantPool("tenant-123")
if err != nil {
    log.Fatal(err)
}

// Acquire connection
conn, err := pool.Acquire(context.Background(), "tenant_123")
if err != nil {
    log.Fatal(err)
}
defer conn.Release()

The AcquireTenantConn method is the recommended way to get tenant connections:

// Acquire connection with tenant context automatically set
conn, err := manager.AcquireTenantConn(context.Background(), "tenant-123")
if err != nil {
    log.Fatal(err)
}
defer conn.Release()

// Connection has:
// - Correct schema set
// - Tenant context set for RLS
// - Ready to use immediately
rows, err := conn.Query(context.Background(), "SELECT * FROM users")

Getting Tenant Configuration

// Retrieve tenant DSN and schema
dsn, schema, err := manager.GetTenantDsn("tenant-123")
if err != nil {
    log.Printf("Tenant not configured: %v", err)
    return
}

log.Printf("Tenant DSN: %s, Schema: %s", dsn, schema)

Removing Tenant

// Remove tenant configuration
manager.RemoveTenant("tenant-123")

// Pool is not removed (might be used by other tenants)
// Only the tenant->DSN mapping is removed

Named Pool Management

Setting Named Configuration

Named pools allow custom configurations with meaningful names:

manager := lokstra_registry.GetService[serviceapi.DbPoolManager]("db-pool-manager")

// Configure analytics database
manager.SetNamedDsn(
    "analytics",                                     // Pool name
    "postgres://user:pass@analytics-db:5432/stats", // DSN
    "public",                                        // Schema
)

// Configure reporting database
manager.SetNamedDsn(
    "reporting",
    "postgres://user:pass@reporting-db:5432/reports",
    "public",
)

// Configure read-replica
manager.SetNamedDsn(
    "read-replica",
    "postgres://user:pass@replica:5432/mydb",
    "public",
)

Getting Named Pool

// Get pool by name
pool, err := manager.GetNamedPool("analytics")
if err != nil {
    log.Fatal(err)
}

// Acquire connection
conn, err := pool.Acquire(context.Background(), "public")
if err != nil {
    log.Fatal(err)
}
defer conn.Release()
// Acquire connection with schema automatically set
conn, err := manager.AcquireNamedConn(context.Background(), "analytics")
if err != nil {
    log.Fatal(err)
}
defer conn.Release()

// Use for analytics queries
stats, err := conn.SelectManyRowMap(context.Background(),
    "SELECT date, count, revenue FROM daily_stats WHERE date >= $1",
    startDate,
)

Getting Named Configuration

// Retrieve named pool configuration
dsn, schema, err := manager.GetNamedDsn("analytics")
if err != nil {
    log.Printf("Named pool not found: %v", err)
    return
}

log.Printf("Analytics DSN: %s, Schema: %s", dsn, schema)

Removing Named Pool

// Remove named pool configuration
manager.RemoveNamed("analytics")

// Pool is not removed (might be used by other names/tenants)
// Only the name->DSN mapping is removed

Advanced Features

Custom Pool Factory

You can create a pool manager with a custom pool factory function:

import "github.com/primadi/lokstra/services/dbpool_manager"

// Custom factory function
customFactory := func(dsn string) (serviceapi.DbPool, error) {
    // Custom pool creation logic
    log.Printf("Creating pool for: %s", dsn)
    
    // Use custom configuration
    cfg := &CustomPoolConfig{
        DSN:            dsn,
        MaxConnections: 50,
        MinConnections: 5,
    }
    
    return NewCustomPool(cfg)
}

// Create manager with custom factory
manager := dbpool_manager.NewPoolManager(customFactory)

Pool Reuse Across Strategies

Pools are automatically shared when DSNs match:

// Tenant and named pools share the same DSN
manager.SetTenantDsn("tenant-1", "postgres://localhost/db", "schema1")
manager.SetNamedDsn("main", "postgres://localhost/db", "schema2")

// Both use the same underlying pool
tenantPool, _ := manager.GetTenantPool("tenant-1")
namedPool, _ := manager.GetNamedPool("main")

// tenantPool and namedPool share the same connection pool
// Only the schema differs when acquiring connections

Graceful Shutdown

The pool manager implements graceful shutdown:

// Shutdown all managed pools
if err := manager.Shutdown(); err != nil {
    log.Printf("Error during shutdown: %v", err)
}

// All pools are closed
// All active connections are released

Thread-Safe Operations

All operations are thread-safe using sync.Map:

// Safe to call from multiple goroutines
go manager.SetTenantDsn("tenant-1", dsn1, "schema1")
go manager.SetTenantDsn("tenant-2", dsn2, "schema2")
go manager.AcquireTenantConn(ctx, "tenant-1")
go manager.AcquireTenantConn(ctx, "tenant-2")

Best Practices

Pool Management

 DO: Use tenant-based pools for multi-tenant applications
manager.SetTenantDsn("tenant-id", dsn, schema)
conn, _ := manager.AcquireTenantConn(ctx, "tenant-id")

 DO: Use named pools for different database purposes
manager.SetNamedDsn("analytics", analyticsDsn, "public")
manager.SetNamedDsn("cache", cacheDsn, "public")

 DO: Share pools with identical DSNs
// Same DSN = same pool = efficient resource usage
manager.SetTenantDsn("tenant-1", dsn, "schema1")
manager.SetTenantDsn("tenant-2", dsn, "schema2")

 DON'T: Create unnecessary pools
// BAD: Different names for same database
manager.SetNamedDsn("pool1", dsn, "public")
manager.SetNamedDsn("pool2", dsn, "public")
// Instead, use the same name or rely on DSN-based sharing

Connection Acquisition

 DO: Use AcquireTenantConn for tenant connections
conn, _ := manager.AcquireTenantConn(ctx, "tenant-id")
// Schema and tenant context automatically set

 DO: Use AcquireNamedConn for named pools
conn, _ := manager.AcquireNamedConn(ctx, "analytics")
// Schema automatically set

 DO: Always release connections
conn, _ := manager.AcquireTenantConn(ctx, "tenant-id")
defer conn.Release()

 DON'T: Manually manage schema/tenant context
// BAD: Manual management is error-prone
pool, _ := manager.GetTenantPool("tenant-id")
conn, _ := pool.Acquire(ctx, "wrong_schema") // Easy to make mistakes

Configuration Management

 DO: Configure tenants at application startup
func initializeTenants() {
    manager := lokstra_registry.GetService[serviceapi.DbPoolManager]("db-pool-manager")
    
    tenants := []Tenant{
        {ID: "tenant-1", DSN: dsn1, Schema: "schema1"},
        {ID: "tenant-2", DSN: dsn2, Schema: "schema2"},
    }
    
    for _, t := range tenants {
        manager.SetTenantDsn(t.ID, t.DSN, t.Schema)
    }
}

 DO: Configure named pools for different purposes
manager.SetNamedDsn("main", mainDsn, "public")
manager.SetNamedDsn("analytics", analyticsDsn, "public")
manager.SetNamedDsn("read-replica", replicaDsn, "public")

 DO: Remove tenant configurations when tenant is deleted
manager.RemoveTenant("deleted-tenant-id")

 DON'T: Configure tenants dynamically for every request
// BAD: Performance overhead
func handler(tenantID string) {
    manager.SetTenantDsn(tenantID, dsn, schema) // Repeated configuration
    conn, _ := manager.AcquireTenantConn(ctx, tenantID)
}
// Instead, configure once at startup or on tenant creation

Error Handling

 DO: Check for configuration errors
conn, err := manager.AcquireTenantConn(ctx, tenantID)
if err != nil {
    if err.Error() == "tenant pool not found: "+tenantID {
        return ErrTenantNotConfigured
    }
    return err
}

 DO: Validate tenant existence before use
dsn, schema, err := manager.GetTenantDsn(tenantID)
if err != nil {
    return fmt.Errorf("tenant %s not found", tenantID)
}

 DON'T: Ignore configuration errors
conn, _ := manager.AcquireTenantConn(ctx, tenantID) // BAD: Ignoring errors

Examples

Multi-Tenant Application

package main

import (
    "context"
    "fmt"
    "github.com/primadi/lokstra/lokstra_registry"
    "github.com/primadi/lokstra/serviceapi"
)

type TenantService struct {
    poolManager serviceapi.DbPoolManager
}

func NewTenantService() *TenantService {
    return &TenantService{
        poolManager: lokstra_registry.GetService[serviceapi.DbPoolManager]("db-pool-manager"),
    }
}

// Initialize tenant database configuration
func (s *TenantService) AddTenant(tenantID, dsn, schema string) error {
    s.poolManager.SetTenantDsn(tenantID, dsn, schema)
    
    // Test connection
    conn, err := s.poolManager.AcquireTenantConn(context.Background(), tenantID)
    if err != nil {
        s.poolManager.RemoveTenant(tenantID)
        return fmt.Errorf("failed to connect tenant database: %w", err)
    }
    defer conn.Release()
    
    if err := conn.Ping(context.Background()); err != nil {
        s.poolManager.RemoveTenant(tenantID)
        return fmt.Errorf("tenant database ping failed: %w", err)
    }
    
    return nil
}

// Get users for specific tenant
func (s *TenantService) GetTenantUsers(ctx context.Context, tenantID string) ([]User, error) {
    // Acquire tenant-specific connection
    conn, err := s.poolManager.AcquireTenantConn(ctx, tenantID)
    if err != nil {
        return nil, fmt.Errorf("failed to acquire tenant connection: %w", err)
    }
    defer conn.Release()
    
    // Query with automatic RLS filtering
    mapper := func(row serviceapi.Row) (any, error) {
        var user User
        err := row.Scan(&user.ID, &user.Name, &user.Email)
        return user, err
    }
    
    result, err := conn.SelectManyWithMapper(ctx, mapper,
        "SELECT id, name, email FROM users WHERE active = true",
    )
    
    if err != nil {
        return nil, err
    }
    
    return result.([]User), nil
}

// Remove tenant
func (s *TenantService) RemoveTenant(tenantID string) {
    s.poolManager.RemoveTenant(tenantID)
}

Named Pool Usage

package repository

import (
    "context"
    "github.com/primadi/lokstra/serviceapi"
)

type AnalyticsRepository struct {
    poolManager serviceapi.DbPoolManager
}

func NewAnalyticsRepository(manager serviceapi.DbPoolManager) *AnalyticsRepository {
    return &AnalyticsRepository{
        poolManager: manager,
    }
}

// Initialize analytics database
func (r *AnalyticsRepository) Initialize() error {
    // Configure analytics pool
    r.poolManager.SetNamedDsn(
        "analytics",
        "postgres://user:pass@analytics-db:5432/stats",
        "public",
    )
    
    // Test connection
    conn, err := r.poolManager.AcquireNamedConn(context.Background(), "analytics")
    if err != nil {
        return err
    }
    defer conn.Release()
    
    return conn.Ping(context.Background())
}

// Get daily statistics
func (r *AnalyticsRepository) GetDailyStats(ctx context.Context, date string) ([]Stat, error) {
    conn, err := r.poolManager.AcquireNamedConn(ctx, "analytics")
    if err != nil {
        return nil, err
    }
    defer conn.Release()
    
    rows, err := conn.SelectManyRowMap(ctx,
        `SELECT metric, value, timestamp 
         FROM daily_stats 
         WHERE date = $1 
         ORDER BY timestamp`,
        date,
    )
    
    if err != nil {
        return nil, err
    }
    
    stats := make([]Stat, len(rows))
    for i, row := range rows {
        stats[i] = Stat{
            Metric:    row["metric"].(string),
            Value:     row["value"].(float64),
            Timestamp: row["timestamp"].(time.Time),
        }
    }
    
    return stats, nil
}

Mixed Strategy Application

package main

import (
    "context"
    "github.com/primadi/lokstra/serviceapi"
)

type DatabaseService struct {
    poolManager serviceapi.DbPoolManager
}

func (s *DatabaseService) Initialize() error {
    // Configure main application database
    s.poolManager.SetNamedDsn(
        "main",
        "postgres://user:pass@localhost:5432/app_db",
        "public",
    )
    
    // Configure read replica
    s.poolManager.SetNamedDsn(
        "read-replica",
        "postgres://user:pass@replica:5432/app_db",
        "public",
    )
    
    // Configure analytics database
    s.poolManager.SetNamedDsn(
        "analytics",
        "postgres://user:pass@analytics:5432/stats_db",
        "public",
    )
    
    // Configure multi-tenant databases
    tenants := []struct {
        ID     string
        DSN    string
        Schema string
    }{
        {"tenant-1", "postgres://localhost:5432/tenant_db", "tenant_1"},
        {"tenant-2", "postgres://localhost:5432/tenant_db", "tenant_2"},
        {"tenant-3", "postgres://localhost:5432/tenant_db", "tenant_3"},
    }
    
    for _, t := range tenants {
        s.poolManager.SetTenantDsn(t.ID, t.DSN, t.Schema)
    }
    
    return nil
}

// Write to main database
func (s *DatabaseService) CreateUser(ctx context.Context, user *User) error {
    conn, err := s.poolManager.AcquireNamedConn(ctx, "main")
    if err != nil {
        return err
    }
    defer conn.Release()
    
    return conn.QueryRow(ctx,
        "INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id",
        user.Name, user.Email,
    ).Scan(&user.ID)
}

// Read from replica
func (s *DatabaseService) GetUsers(ctx context.Context) ([]User, error) {
    conn, err := s.poolManager.AcquireNamedConn(ctx, "read-replica")
    if err != nil {
        return nil, err
    }
    defer conn.Release()
    
    rows, err := conn.SelectManyRowMap(ctx, "SELECT id, name, email FROM users")
    if err != nil {
        return nil, err
    }
    
    users := make([]User, len(rows))
    for i, row := range rows {
        users[i] = User{
            ID:    row["id"].(int),
            Name:  row["name"].(string),
            Email: row["email"].(string),
        }
    }
    
    return users, nil
}

// Write analytics data
func (s *DatabaseService) LogEvent(ctx context.Context, event *Event) error {
    conn, err := s.poolManager.AcquireNamedConn(ctx, "analytics")
    if err != nil {
        return err
    }
    defer conn.Release()
    
    _, err = conn.Exec(ctx,
        "INSERT INTO events (type, data, timestamp) VALUES ($1, $2, $3)",
        event.Type, event.Data, event.Timestamp,
    )
    
    return err
}

// Tenant-specific operation
func (s *DatabaseService) GetTenantData(ctx context.Context, tenantID string) ([]Data, error) {
    conn, err := s.poolManager.AcquireTenantConn(ctx, tenantID)
    if err != nil {
        return nil, err
    }
    defer conn.Release()
    
    rows, err := conn.SelectManyRowMap(ctx, "SELECT * FROM tenant_data")
    if err != nil {
        return nil, err
    }
    
    // Process rows...
    return processRows(rows), nil
}

Dynamic Tenant Onboarding

package service

import (
    "context"
    "fmt"
)

type TenantOnboardingService struct {
    poolManager serviceapi.DbPoolManager
}

// Onboard new tenant
func (s *TenantOnboardingService) OnboardTenant(ctx context.Context, tenant *Tenant) error {
    // Configure tenant database
    s.poolManager.SetTenantDsn(tenant.ID, tenant.DSN, tenant.Schema)
    
    // Acquire connection to verify
    conn, err := s.poolManager.AcquireTenantConn(ctx, tenant.ID)
    if err != nil {
        s.poolManager.RemoveTenant(tenant.ID)
        return fmt.Errorf("failed to connect: %w", err)
    }
    defer conn.Release()
    
    // Run migrations/setup
    if err := s.runTenantSetup(ctx, conn, tenant); err != nil {
        s.poolManager.RemoveTenant(tenant.ID)
        return fmt.Errorf("setup failed: %w", err)
    }
    
    return nil
}

// Offboard tenant
func (s *TenantOnboardingService) OffboardTenant(ctx context.Context, tenantID string) error {
    // Remove tenant configuration
    s.poolManager.RemoveTenant(tenantID)
    
    // Note: Underlying pool is not removed if other tenants share the same DSN
    // This is the desired behavior for resource efficiency
    
    return nil
}

// Migrate tenant to new database
func (s *TenantOnboardingService) MigrateTenant(ctx context.Context, 
    tenantID, newDSN, newSchema string) error {
    
    // Get old configuration
    oldDSN, oldSchema, err := s.poolManager.GetTenantDsn(tenantID)
    if err != nil {
        return err
    }
    
    // Update to new configuration
    s.poolManager.SetTenantDsn(tenantID, newDSN, newSchema)
    
    // Verify new connection
    conn, err := s.poolManager.AcquireTenantConn(ctx, tenantID)
    if err != nil {
        // Rollback to old configuration
        s.poolManager.SetTenantDsn(tenantID, oldDSN, oldSchema)
        return fmt.Errorf("failed to connect to new database: %w", err)
    }
    defer conn.Release()
    
    if err := conn.Ping(ctx); err != nil {
        // Rollback to old configuration
        s.poolManager.SetTenantDsn(tenantID, oldDSN, oldSchema)
        return fmt.Errorf("new database ping failed: %w", err)
    }
    
    return nil
}

func (s *TenantOnboardingService) runTenantSetup(ctx context.Context, 
    conn serviceapi.DbConn, tenant *Tenant) error {
    
    // Create tables, indexes, etc.
    migrations := []string{
        "CREATE TABLE IF NOT EXISTS users (id SERIAL PRIMARY KEY, name TEXT, email TEXT)",
        "CREATE TABLE IF NOT EXISTS orders (id SERIAL PRIMARY KEY, user_id INT, amount DECIMAL)",
        "CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)",
    }
    
    for _, migration := range migrations {
        if _, err := conn.Exec(ctx, migration); err != nil {
            return fmt.Errorf("migration failed: %w", err)
        }
    }
    
    return nil
}

Next: Metrics Service - Prometheus metrics integration