DbPool Manager
The dbpool_manager service provides centralized management of database connection pools with support for multi-tenancy and named pool configurations. It allows dynamic creation and management of multiple database pools based on tenant IDs or custom names.
Table of Contents
- Overview
- Configuration
- Registration
- Core Concepts
- DSN-Based Pool Management
- Tenant-Based Pool Management
- Named Pool Management
- Advanced Features
- Best Practices
- Examples
Overview
Service Type: dbpool_manager
Interface: serviceapi.DbPoolManager
Key Features:
✓ Dynamic Pool Creation - Create pools on-demand
✓ Multi-Tenancy Support - Isolate tenant databases
✓ Named Pool Management - Custom pool configurations
✓ DSN-Based Pooling - Share pools across tenants
✓ Thread-Safe Operations - Concurrent-safe pool access
✓ Automatic Cleanup - Graceful shutdown handling
Configuration
Config Struct
// No specific configuration required
// Pools are created dynamically based on DSN strings
YAML Configuration
Basic Configuration:
services:
db-pool-manager:
type: dbpool_manager
With Default PostgreSQL Pool:
services:
db-pool-manager:
type: dbpool_manager
config:
pool_type: pgx # Uses PostgreSQL pgx driver
Programmatic Configuration
import (
"github.com/primadi/lokstra/lokstra_registry"
"github.com/primadi/lokstra/services/dbpool_manager"
)
// Register service
dbpool_manager.Register()
// Create pool manager
manager := lokstra_registry.GetService[serviceapi.DbPoolManager]("db-pool-manager")
Registration
Basic Registration
import "github.com/primadi/lokstra/services/dbpool_manager"
func init() {
dbpool_manager.Register()
}
Bulk Registration
import "github.com/primadi/lokstra/services"
func main() {
// Registers all services including dbpool_manager
services.RegisterAllServices()
}
Core Concepts
Pool Management Strategy
The DbPool Manager uses three main strategies for pool management:
- DSN-Based Pools - Shared pools identified by connection string
- Tenant-Based Pools - Tenant-specific database configurations
- Named Pools - Custom-named pool configurations
All three strategies share the same underlying pool instances when DSNs match, ensuring efficient resource usage.
Interface Overview
type DbPoolManager interface {
// DSN-based pool management
GetDsnPool(dsn string) (DbPool, error)
// Tenant-based pool management
SetTenantDsn(tenant string, dsn string, schema string)
GetTenantDsn(tenant string) (string, string, error)
GetTenantPool(tenant string) (DbPool, error)
RemoveTenant(tenant string)
AcquireTenantConn(ctx context.Context, tenant string) (DbConn, error)
// Named pool management
SetNamedDsn(name string, dsn string, schema string)
GetNamedDsn(name string) (string, string, error)
GetNamedPool(name string) (DbPool, error)
RemoveNamed(name string)
AcquireNamedConn(ctx context.Context, name string) (DbConn, error)
// Shutdown
Shutdown() error
}
DSN-Based Pool Management
Get or Create Pool
The GetDsnPool method returns an existing pool or creates a new one if it doesn’t exist.
import (
"context"
"github.com/primadi/lokstra/serviceapi"
)
manager := lokstra_registry.GetService[serviceapi.DbPoolManager]("db-pool-manager")
// Get or create pool for DSN
dsn := "postgres://user:pass@localhost:5432/mydb?sslmode=disable"
pool, err := manager.GetDsnPool(dsn)
if err != nil {
log.Fatal(err)
}
// Acquire connection from pool
conn, err := pool.Acquire(context.Background(), "public")
if err != nil {
log.Fatal(err)
}
defer conn.Release()
// Use connection
rows, err := conn.Query(context.Background(), "SELECT * FROM users")
Pool Sharing
Pools with identical DSNs are automatically shared:
// First call creates the pool
pool1, _ := manager.GetDsnPool("postgres://localhost/db1")
// Second call with same DSN returns the same pool instance
pool2, _ := manager.GetDsnPool("postgres://localhost/db1")
// pool1 == pool2 (same instance)
Tenant-Based Pool Management
Setting Tenant Configuration
Configure database connections for specific tenants:
manager := lokstra_registry.GetService[serviceapi.DbPoolManager]("db-pool-manager")
// Configure tenant database
manager.SetTenantDsn(
"tenant-123", // Tenant ID
"postgres://user:pass@localhost:5432/tenant_db", // DSN
"tenant_123", // Schema name
)
manager.SetTenantDsn(
"tenant-456",
"postgres://user:pass@localhost:5432/tenant_db",
"tenant_456",
)
Getting Tenant Pool
// Get pool for specific tenant
pool, err := manager.GetTenantPool("tenant-123")
if err != nil {
log.Fatal(err)
}
// Acquire connection
conn, err := pool.Acquire(context.Background(), "tenant_123")
if err != nil {
log.Fatal(err)
}
defer conn.Release()
Acquiring Tenant Connection (Recommended)
The AcquireTenantConn method is the recommended way to get tenant connections:
// Acquire connection with tenant context automatically set
conn, err := manager.AcquireTenantConn(context.Background(), "tenant-123")
if err != nil {
log.Fatal(err)
}
defer conn.Release()
// Connection has:
// - Correct schema set
// - Tenant context set for RLS
// - Ready to use immediately
rows, err := conn.Query(context.Background(), "SELECT * FROM users")
Getting Tenant Configuration
// Retrieve tenant DSN and schema
dsn, schema, err := manager.GetTenantDsn("tenant-123")
if err != nil {
log.Printf("Tenant not configured: %v", err)
return
}
log.Printf("Tenant DSN: %s, Schema: %s", dsn, schema)
Removing Tenant
// Remove tenant configuration
manager.RemoveTenant("tenant-123")
// Pool is not removed (might be used by other tenants)
// Only the tenant->DSN mapping is removed
Named Pool Management
Setting Named Configuration
Named pools allow custom configurations with meaningful names:
manager := lokstra_registry.GetService[serviceapi.DbPoolManager]("db-pool-manager")
// Configure analytics database
manager.SetNamedDsn(
"analytics", // Pool name
"postgres://user:pass@analytics-db:5432/stats", // DSN
"public", // Schema
)
// Configure reporting database
manager.SetNamedDsn(
"reporting",
"postgres://user:pass@reporting-db:5432/reports",
"public",
)
// Configure read-replica
manager.SetNamedDsn(
"read-replica",
"postgres://user:pass@replica:5432/mydb",
"public",
)
Getting Named Pool
// Get pool by name
pool, err := manager.GetNamedPool("analytics")
if err != nil {
log.Fatal(err)
}
// Acquire connection
conn, err := pool.Acquire(context.Background(), "public")
if err != nil {
log.Fatal(err)
}
defer conn.Release()
Acquiring Named Connection (Recommended)
// Acquire connection with schema automatically set
conn, err := manager.AcquireNamedConn(context.Background(), "analytics")
if err != nil {
log.Fatal(err)
}
defer conn.Release()
// Use for analytics queries
stats, err := conn.SelectManyRowMap(context.Background(),
"SELECT date, count, revenue FROM daily_stats WHERE date >= $1",
startDate,
)
Getting Named Configuration
// Retrieve named pool configuration
dsn, schema, err := manager.GetNamedDsn("analytics")
if err != nil {
log.Printf("Named pool not found: %v", err)
return
}
log.Printf("Analytics DSN: %s, Schema: %s", dsn, schema)
Removing Named Pool
// Remove named pool configuration
manager.RemoveNamed("analytics")
// Pool is not removed (might be used by other names/tenants)
// Only the name->DSN mapping is removed
Advanced Features
Custom Pool Factory
You can create a pool manager with a custom pool factory function:
import "github.com/primadi/lokstra/services/dbpool_manager"
// Custom factory function
customFactory := func(dsn string) (serviceapi.DbPool, error) {
// Custom pool creation logic
log.Printf("Creating pool for: %s", dsn)
// Use custom configuration
cfg := &CustomPoolConfig{
DSN: dsn,
MaxConnections: 50,
MinConnections: 5,
}
return NewCustomPool(cfg)
}
// Create manager with custom factory
manager := dbpool_manager.NewPoolManager(customFactory)
Pool Reuse Across Strategies
Pools are automatically shared when DSNs match:
// Tenant and named pools share the same DSN
manager.SetTenantDsn("tenant-1", "postgres://localhost/db", "schema1")
manager.SetNamedDsn("main", "postgres://localhost/db", "schema2")
// Both use the same underlying pool
tenantPool, _ := manager.GetTenantPool("tenant-1")
namedPool, _ := manager.GetNamedPool("main")
// tenantPool and namedPool share the same connection pool
// Only the schema differs when acquiring connections
Graceful Shutdown
The pool manager implements graceful shutdown:
// Shutdown all managed pools
if err := manager.Shutdown(); err != nil {
log.Printf("Error during shutdown: %v", err)
}
// All pools are closed
// All active connections are released
Thread-Safe Operations
All operations are thread-safe using sync.Map:
// Safe to call from multiple goroutines
go manager.SetTenantDsn("tenant-1", dsn1, "schema1")
go manager.SetTenantDsn("tenant-2", dsn2, "schema2")
go manager.AcquireTenantConn(ctx, "tenant-1")
go manager.AcquireTenantConn(ctx, "tenant-2")
Best Practices
Pool Management
✓ DO: Use tenant-based pools for multi-tenant applications
manager.SetTenantDsn("tenant-id", dsn, schema)
conn, _ := manager.AcquireTenantConn(ctx, "tenant-id")
✓ DO: Use named pools for different database purposes
manager.SetNamedDsn("analytics", analyticsDsn, "public")
manager.SetNamedDsn("cache", cacheDsn, "public")
✓ DO: Share pools with identical DSNs
// Same DSN = same pool = efficient resource usage
manager.SetTenantDsn("tenant-1", dsn, "schema1")
manager.SetTenantDsn("tenant-2", dsn, "schema2")
✗ DON'T: Create unnecessary pools
// BAD: Different names for same database
manager.SetNamedDsn("pool1", dsn, "public")
manager.SetNamedDsn("pool2", dsn, "public")
// Instead, use the same name or rely on DSN-based sharing
Connection Acquisition
✓ DO: Use AcquireTenantConn for tenant connections
conn, _ := manager.AcquireTenantConn(ctx, "tenant-id")
// Schema and tenant context automatically set
✓ DO: Use AcquireNamedConn for named pools
conn, _ := manager.AcquireNamedConn(ctx, "analytics")
// Schema automatically set
✓ DO: Always release connections
conn, _ := manager.AcquireTenantConn(ctx, "tenant-id")
defer conn.Release()
✗ DON'T: Manually manage schema/tenant context
// BAD: Manual management is error-prone
pool, _ := manager.GetTenantPool("tenant-id")
conn, _ := pool.Acquire(ctx, "wrong_schema") // Easy to make mistakes
Configuration Management
✓ DO: Configure tenants at application startup
func initializeTenants() {
manager := lokstra_registry.GetService[serviceapi.DbPoolManager]("db-pool-manager")
tenants := []Tenant{
{ID: "tenant-1", DSN: dsn1, Schema: "schema1"},
{ID: "tenant-2", DSN: dsn2, Schema: "schema2"},
}
for _, t := range tenants {
manager.SetTenantDsn(t.ID, t.DSN, t.Schema)
}
}
✓ DO: Configure named pools for different purposes
manager.SetNamedDsn("main", mainDsn, "public")
manager.SetNamedDsn("analytics", analyticsDsn, "public")
manager.SetNamedDsn("read-replica", replicaDsn, "public")
✓ DO: Remove tenant configurations when tenant is deleted
manager.RemoveTenant("deleted-tenant-id")
✗ DON'T: Configure tenants dynamically for every request
// BAD: Performance overhead
func handler(tenantID string) {
manager.SetTenantDsn(tenantID, dsn, schema) // Repeated configuration
conn, _ := manager.AcquireTenantConn(ctx, tenantID)
}
// Instead, configure once at startup or on tenant creation
Error Handling
✓ DO: Check for configuration errors
conn, err := manager.AcquireTenantConn(ctx, tenantID)
if err != nil {
if err.Error() == "tenant pool not found: "+tenantID {
return ErrTenantNotConfigured
}
return err
}
✓ DO: Validate tenant existence before use
dsn, schema, err := manager.GetTenantDsn(tenantID)
if err != nil {
return fmt.Errorf("tenant %s not found", tenantID)
}
✗ DON'T: Ignore configuration errors
conn, _ := manager.AcquireTenantConn(ctx, tenantID) // BAD: Ignoring errors
Examples
Multi-Tenant Application
package main
import (
"context"
"fmt"
"github.com/primadi/lokstra/lokstra_registry"
"github.com/primadi/lokstra/serviceapi"
)
type TenantService struct {
poolManager serviceapi.DbPoolManager
}
func NewTenantService() *TenantService {
return &TenantService{
poolManager: lokstra_registry.GetService[serviceapi.DbPoolManager]("db-pool-manager"),
}
}
// Initialize tenant database configuration
func (s *TenantService) AddTenant(tenantID, dsn, schema string) error {
s.poolManager.SetTenantDsn(tenantID, dsn, schema)
// Test connection
conn, err := s.poolManager.AcquireTenantConn(context.Background(), tenantID)
if err != nil {
s.poolManager.RemoveTenant(tenantID)
return fmt.Errorf("failed to connect tenant database: %w", err)
}
defer conn.Release()
if err := conn.Ping(context.Background()); err != nil {
s.poolManager.RemoveTenant(tenantID)
return fmt.Errorf("tenant database ping failed: %w", err)
}
return nil
}
// Get users for specific tenant
func (s *TenantService) GetTenantUsers(ctx context.Context, tenantID string) ([]User, error) {
// Acquire tenant-specific connection
conn, err := s.poolManager.AcquireTenantConn(ctx, tenantID)
if err != nil {
return nil, fmt.Errorf("failed to acquire tenant connection: %w", err)
}
defer conn.Release()
// Query with automatic RLS filtering
mapper := func(row serviceapi.Row) (any, error) {
var user User
err := row.Scan(&user.ID, &user.Name, &user.Email)
return user, err
}
result, err := conn.SelectManyWithMapper(ctx, mapper,
"SELECT id, name, email FROM users WHERE active = true",
)
if err != nil {
return nil, err
}
return result.([]User), nil
}
// Remove tenant
func (s *TenantService) RemoveTenant(tenantID string) {
s.poolManager.RemoveTenant(tenantID)
}
Named Pool Usage
package repository
import (
"context"
"github.com/primadi/lokstra/serviceapi"
)
type AnalyticsRepository struct {
poolManager serviceapi.DbPoolManager
}
func NewAnalyticsRepository(manager serviceapi.DbPoolManager) *AnalyticsRepository {
return &AnalyticsRepository{
poolManager: manager,
}
}
// Initialize analytics database
func (r *AnalyticsRepository) Initialize() error {
// Configure analytics pool
r.poolManager.SetNamedDsn(
"analytics",
"postgres://user:pass@analytics-db:5432/stats",
"public",
)
// Test connection
conn, err := r.poolManager.AcquireNamedConn(context.Background(), "analytics")
if err != nil {
return err
}
defer conn.Release()
return conn.Ping(context.Background())
}
// Get daily statistics
func (r *AnalyticsRepository) GetDailyStats(ctx context.Context, date string) ([]Stat, error) {
conn, err := r.poolManager.AcquireNamedConn(ctx, "analytics")
if err != nil {
return nil, err
}
defer conn.Release()
rows, err := conn.SelectManyRowMap(ctx,
`SELECT metric, value, timestamp
FROM daily_stats
WHERE date = $1
ORDER BY timestamp`,
date,
)
if err != nil {
return nil, err
}
stats := make([]Stat, len(rows))
for i, row := range rows {
stats[i] = Stat{
Metric: row["metric"].(string),
Value: row["value"].(float64),
Timestamp: row["timestamp"].(time.Time),
}
}
return stats, nil
}
Mixed Strategy Application
package main
import (
"context"
"github.com/primadi/lokstra/serviceapi"
)
type DatabaseService struct {
poolManager serviceapi.DbPoolManager
}
func (s *DatabaseService) Initialize() error {
// Configure main application database
s.poolManager.SetNamedDsn(
"main",
"postgres://user:pass@localhost:5432/app_db",
"public",
)
// Configure read replica
s.poolManager.SetNamedDsn(
"read-replica",
"postgres://user:pass@replica:5432/app_db",
"public",
)
// Configure analytics database
s.poolManager.SetNamedDsn(
"analytics",
"postgres://user:pass@analytics:5432/stats_db",
"public",
)
// Configure multi-tenant databases
tenants := []struct {
ID string
DSN string
Schema string
}{
{"tenant-1", "postgres://localhost:5432/tenant_db", "tenant_1"},
{"tenant-2", "postgres://localhost:5432/tenant_db", "tenant_2"},
{"tenant-3", "postgres://localhost:5432/tenant_db", "tenant_3"},
}
for _, t := range tenants {
s.poolManager.SetTenantDsn(t.ID, t.DSN, t.Schema)
}
return nil
}
// Write to main database
func (s *DatabaseService) CreateUser(ctx context.Context, user *User) error {
conn, err := s.poolManager.AcquireNamedConn(ctx, "main")
if err != nil {
return err
}
defer conn.Release()
return conn.QueryRow(ctx,
"INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id",
user.Name, user.Email,
).Scan(&user.ID)
}
// Read from replica
func (s *DatabaseService) GetUsers(ctx context.Context) ([]User, error) {
conn, err := s.poolManager.AcquireNamedConn(ctx, "read-replica")
if err != nil {
return nil, err
}
defer conn.Release()
rows, err := conn.SelectManyRowMap(ctx, "SELECT id, name, email FROM users")
if err != nil {
return nil, err
}
users := make([]User, len(rows))
for i, row := range rows {
users[i] = User{
ID: row["id"].(int),
Name: row["name"].(string),
Email: row["email"].(string),
}
}
return users, nil
}
// Write analytics data
func (s *DatabaseService) LogEvent(ctx context.Context, event *Event) error {
conn, err := s.poolManager.AcquireNamedConn(ctx, "analytics")
if err != nil {
return err
}
defer conn.Release()
_, err = conn.Exec(ctx,
"INSERT INTO events (type, data, timestamp) VALUES ($1, $2, $3)",
event.Type, event.Data, event.Timestamp,
)
return err
}
// Tenant-specific operation
func (s *DatabaseService) GetTenantData(ctx context.Context, tenantID string) ([]Data, error) {
conn, err := s.poolManager.AcquireTenantConn(ctx, tenantID)
if err != nil {
return nil, err
}
defer conn.Release()
rows, err := conn.SelectManyRowMap(ctx, "SELECT * FROM tenant_data")
if err != nil {
return nil, err
}
// Process rows...
return processRows(rows), nil
}
Dynamic Tenant Onboarding
package service
import (
"context"
"fmt"
)
type TenantOnboardingService struct {
poolManager serviceapi.DbPoolManager
}
// Onboard new tenant
func (s *TenantOnboardingService) OnboardTenant(ctx context.Context, tenant *Tenant) error {
// Configure tenant database
s.poolManager.SetTenantDsn(tenant.ID, tenant.DSN, tenant.Schema)
// Acquire connection to verify
conn, err := s.poolManager.AcquireTenantConn(ctx, tenant.ID)
if err != nil {
s.poolManager.RemoveTenant(tenant.ID)
return fmt.Errorf("failed to connect: %w", err)
}
defer conn.Release()
// Run migrations/setup
if err := s.runTenantSetup(ctx, conn, tenant); err != nil {
s.poolManager.RemoveTenant(tenant.ID)
return fmt.Errorf("setup failed: %w", err)
}
return nil
}
// Offboard tenant
func (s *TenantOnboardingService) OffboardTenant(ctx context.Context, tenantID string) error {
// Remove tenant configuration
s.poolManager.RemoveTenant(tenantID)
// Note: Underlying pool is not removed if other tenants share the same DSN
// This is the desired behavior for resource efficiency
return nil
}
// Migrate tenant to new database
func (s *TenantOnboardingService) MigrateTenant(ctx context.Context,
tenantID, newDSN, newSchema string) error {
// Get old configuration
oldDSN, oldSchema, err := s.poolManager.GetTenantDsn(tenantID)
if err != nil {
return err
}
// Update to new configuration
s.poolManager.SetTenantDsn(tenantID, newDSN, newSchema)
// Verify new connection
conn, err := s.poolManager.AcquireTenantConn(ctx, tenantID)
if err != nil {
// Rollback to old configuration
s.poolManager.SetTenantDsn(tenantID, oldDSN, oldSchema)
return fmt.Errorf("failed to connect to new database: %w", err)
}
defer conn.Release()
if err := conn.Ping(ctx); err != nil {
// Rollback to old configuration
s.poolManager.SetTenantDsn(tenantID, oldDSN, oldSchema)
return fmt.Errorf("new database ping failed: %w", err)
}
return nil
}
func (s *TenantOnboardingService) runTenantSetup(ctx context.Context,
conn serviceapi.DbConn, tenant *Tenant) error {
// Create tables, indexes, etc.
migrations := []string{
"CREATE TABLE IF NOT EXISTS users (id SERIAL PRIMARY KEY, name TEXT, email TEXT)",
"CREATE TABLE IF NOT EXISTS orders (id SERIAL PRIMARY KEY, user_id INT, amount DECIMAL)",
"CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)",
}
for _, migration := range migrations {
if _, err := conn.Exec(ctx, migration); err != nil {
return fmt.Errorf("migration failed: %w", err)
}
}
return nil
}
Related Documentation
- DbPool (PostgreSQL) - PostgreSQL connection pooling
- Services Overview - Service architecture and patterns
- Configuration - YAML configuration system
- Multi-Tenancy Guide - Multi-tenant application patterns
Next: Metrics Service - Prometheus metrics integration