Skip to content

Commit d440278

Browse files
Copilotintel352
andcommitted
feat: address PR review feedback and add partition features
- Fix appendTenantFilter to insert tenant predicate before ORDER BY/ LIMIT/GROUP BY/HAVING/OFFSET clauses instead of blindly appending - Reject tenantKey for INSERT statements in step.db_exec with clear error - Remove "postgresql" from isSupportedPartitionDriver for consistency - Fix schema tenantKey placeholders to use "steps." prefix consistently - Add partitionType config (list/range) with RANGE partition DDL support - Add partitionNameFormat config ({table}_{tenant}, {tenant}_{table}, etc.) - Add PartitionTableName method to PartitionKeyProvider interface - Add sourceTable/sourceColumn config for auto-partition sync - Add SyncPartitionsFromSource method to PartitionManager interface - Add step.db_sync_partitions for triggering partition sync from source - Add comprehensive tests for all new functionality Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>
1 parent 09f44e3 commit d440278

12 files changed

Lines changed: 571 additions & 52 deletions

cmd/wfctl/type_registry.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func KnownModuleTypes() map[string]ModuleTypeInfo {
5959
Type: "database.partitioned",
6060
Plugin: "storage",
6161
Stateful: true,
62-
ConfigKeys: []string{"driver", "dsn", "partitionKey", "tables", "maxOpenConns", "maxIdleConns"},
62+
ConfigKeys: []string{"driver", "dsn", "partitionKey", "tables", "partitionType", "partitionNameFormat", "sourceTable", "sourceColumn", "maxOpenConns", "maxIdleConns"},
6363
},
6464
"persistence.store": {
6565
Type: "persistence.store",
@@ -607,6 +607,11 @@ func KnownStepTypes() map[string]StepTypeInfo {
607607
Plugin: "pipelinesteps",
608608
ConfigKeys: []string{"database", "tenantKey"},
609609
},
610+
"step.db_sync_partitions": {
611+
Type: "step.db_sync_partitions",
612+
Plugin: "pipelinesteps",
613+
ConfigKeys: []string{"database"},
614+
},
610615
"step.json_response": {
611616
Type: "step.json_response",
612617
Plugin: "pipelinesteps",

module/database_partitioned.go

Lines changed: 158 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,35 @@ import (
1414
// validPartitionValue matches safe LIST partition values (alphanumeric, hyphens, underscores, dots).
1515
var validPartitionValue = regexp.MustCompile(`^[a-zA-Z0-9_.\-]+$`)
1616

17+
// Partition types supported by PostgreSQL.
18+
const (
19+
PartitionTypeList = "list"
20+
PartitionTypeRange = "range"
21+
)
22+
1723
// PartitionKeyProvider is optionally implemented by database modules that support
18-
// LIST partitioning. Steps can use PartitionKey() to determine the column name
19-
// for automatic tenant scoping.
24+
// partitioning. Steps can use PartitionKey() to determine the column name
25+
// for automatic tenant scoping, and PartitionTableName() to resolve
26+
// tenant-specific partition table names at query time.
2027
type PartitionKeyProvider interface {
2128
DBProvider
2229
PartitionKey() string
30+
// PartitionTableName resolves the partition table name for a given parent
31+
// table and tenant value, using the configured partitionNameFormat.
32+
// Returns the parent table name unchanged when no format is configured.
33+
PartitionTableName(parentTable, tenantValue string) string
2334
}
2435

2536
// PartitionManager is optionally implemented by database modules that support
26-
// runtime creation of LIST partitions. The EnsurePartition method is idempotent —
37+
// runtime creation of partitions. The EnsurePartition method is idempotent —
2738
// if the partition already exists the call succeeds without error.
2839
type PartitionManager interface {
2940
PartitionKeyProvider
3041
EnsurePartition(ctx context.Context, tenantValue string) error
42+
// SyncPartitionsFromSource queries the configured sourceTable for all
43+
// distinct tenant values and ensures that partitions exist for each one.
44+
// No-ops if sourceTable is not configured.
45+
SyncPartitionsFromSource(ctx context.Context) error
3146
}
3247

3348
// PartitionedDatabaseConfig holds configuration for the database.partitioned module.
@@ -38,9 +53,25 @@ type PartitionedDatabaseConfig struct {
3853
MaxIdleConns int `json:"maxIdleConns" yaml:"maxIdleConns"`
3954
PartitionKey string `json:"partitionKey" yaml:"partitionKey"`
4055
Tables []string `json:"tables" yaml:"tables"`
56+
// PartitionType is "list" (default) or "range".
57+
// LIST partitions are created with FOR VALUES IN ('value').
58+
// RANGE partitions are created with FOR VALUES FROM ('value') TO ('value_next').
59+
PartitionType string `json:"partitionType" yaml:"partitionType"`
60+
// PartitionNameFormat is a template for generating partition table names.
61+
// Supports {table} and {tenant} placeholders.
62+
// Default: "{table}_{tenant}" (e.g. forms_org_alpha).
63+
PartitionNameFormat string `json:"partitionNameFormat" yaml:"partitionNameFormat"`
64+
// SourceTable is the table that contains all tenant IDs.
65+
// When set, SyncPartitionsFromSource queries this table for all distinct
66+
// values in the partition key column and ensures partitions exist.
67+
// Example: "tenants" — will query "SELECT DISTINCT tenant_id FROM tenants".
68+
SourceTable string `json:"sourceTable" yaml:"sourceTable"`
69+
// SourceColumn overrides the column queried in sourceTable.
70+
// Defaults to PartitionKey if empty.
71+
SourceColumn string `json:"sourceColumn" yaml:"sourceColumn"`
4172
}
4273

43-
// PartitionedDatabase wraps WorkflowDatabase and adds PostgreSQL LIST partition
74+
// PartitionedDatabase wraps WorkflowDatabase and adds PostgreSQL partition
4475
// management. It satisfies DBProvider, DBDriverProvider, PartitionKeyProvider,
4576
// and PartitionManager.
4677
type PartitionedDatabase struct {
@@ -58,6 +89,12 @@ func NewPartitionedDatabase(name string, cfg PartitionedDatabaseConfig) *Partiti
5889
MaxOpenConns: cfg.MaxOpenConns,
5990
MaxIdleConns: cfg.MaxIdleConns,
6091
}
92+
if cfg.PartitionType == "" {
93+
cfg.PartitionType = PartitionTypeList
94+
}
95+
if cfg.PartitionNameFormat == "" {
96+
cfg.PartitionNameFormat = "{table}_{tenant}"
97+
}
6198
return &PartitionedDatabase{
6299
name: name,
63100
config: cfg,
@@ -109,22 +146,45 @@ func (p *PartitionedDatabase) DriverName() string {
109146
return p.config.Driver
110147
}
111148

112-
// PartitionKey returns the column name used for LIST partitioning (satisfies PartitionKeyProvider).
149+
// PartitionKey returns the column name used for partitioning (satisfies PartitionKeyProvider).
113150
func (p *PartitionedDatabase) PartitionKey() string {
114151
return p.config.PartitionKey
115152
}
116153

154+
// PartitionType returns the partition type ("list" or "range").
155+
func (p *PartitionedDatabase) PartitionType() string {
156+
return p.config.PartitionType
157+
}
158+
159+
// PartitionNameFormat returns the configured partition name format template.
160+
func (p *PartitionedDatabase) PartitionNameFormat() string {
161+
return p.config.PartitionNameFormat
162+
}
163+
164+
// PartitionTableName resolves the partition table name for a given parent
165+
// table and tenant value using the configured partitionNameFormat.
166+
func (p *PartitionedDatabase) PartitionTableName(parentTable, tenantValue string) string {
167+
suffix := sanitizePartitionSuffix(tenantValue)
168+
name := p.config.PartitionNameFormat
169+
name = strings.ReplaceAll(name, "{table}", parentTable)
170+
name = strings.ReplaceAll(name, "{tenant}", suffix)
171+
return name
172+
}
173+
117174
// Tables returns the list of tables managed by this partitioned database.
118175
func (p *PartitionedDatabase) Tables() []string {
119176
result := make([]string, len(p.config.Tables))
120177
copy(result, p.config.Tables)
121178
return result
122179
}
123180

124-
// EnsurePartition creates a LIST partition for the given tenant value on all
181+
// EnsurePartition creates a partition for the given tenant value on all
125182
// configured tables. The operation is idempotent — IF NOT EXISTS prevents errors
126183
// when the partition already exists.
127184
//
185+
// For LIST partitions: CREATE TABLE IF NOT EXISTS <name> PARTITION OF <table> FOR VALUES IN ('<value>')
186+
// For RANGE partitions: CREATE TABLE IF NOT EXISTS <name> PARTITION OF <table> FOR VALUES FROM ('<value>') TO ('<value>\x00')
187+
//
128188
// Only PostgreSQL (pgx, pgx/v5, postgres) is supported. The method validates
129189
// the tenant value and table/column names to prevent SQL injection.
130190
func (p *PartitionedDatabase) EnsurePartition(ctx context.Context, tenantValue string) error {
@@ -133,7 +193,7 @@ func (p *PartitionedDatabase) EnsurePartition(ctx context.Context, tenantValue s
133193
}
134194

135195
if !isSupportedPartitionDriver(p.config.Driver) {
136-
return fmt.Errorf("partitioned database %q: driver %q does not support LIST partitioning (use pgx, pgx/v5, or postgres)", p.name, p.config.Driver)
196+
return fmt.Errorf("partitioned database %q: driver %q does not support partitioning (use pgx, pgx/v5, or postgres)", p.name, p.config.Driver)
137197
}
138198

139199
if err := validateIdentifier(p.config.PartitionKey); err != nil {
@@ -153,22 +213,39 @@ func (p *PartitionedDatabase) EnsurePartition(ctx context.Context, tenantValue s
153213
return fmt.Errorf("partitioned database %q: invalid table name: %w", p.name, err)
154214
}
155215

156-
// Sanitize the partition suffix: replace hyphens and dots with underscores.
157-
partitionSuffix := sanitizePartitionSuffix(tenantValue)
158-
partitionName := table + "_" + partitionSuffix
216+
partitionName := p.PartitionTableName(table, tenantValue)
159217

160-
// Use IF NOT EXISTS to make this idempotent.
161-
// The tenant value is embedded as a quoted literal (single-quoted).
218+
// Validate the computed partition name is a safe identifier.
219+
if err := validateIdentifier(partitionName); err != nil {
220+
return fmt.Errorf("partitioned database %q: invalid partition name %q: %w", p.name, partitionName, err)
221+
}
222+
223+
var ddl string
162224
// We have already validated tenantValue against validPartitionValue so
163225
// it cannot contain single-quote characters.
164-
sql := fmt.Sprintf(
165-
"CREATE TABLE IF NOT EXISTS %s PARTITION OF %s FOR VALUES IN ('%s')",
166-
partitionName,
167-
table,
168-
strings.ReplaceAll(tenantValue, "'", ""),
169-
)
170-
171-
if _, err := db.ExecContext(ctx, sql); err != nil {
226+
safeValue := strings.ReplaceAll(tenantValue, "'", "")
227+
228+
switch p.config.PartitionType {
229+
case PartitionTypeList:
230+
ddl = fmt.Sprintf(
231+
"CREATE TABLE IF NOT EXISTS %s PARTITION OF %s FOR VALUES IN ('%s')",
232+
partitionName, table, safeValue,
233+
)
234+
case PartitionTypeRange:
235+
// RANGE partition: from the tenant value (inclusive) to the same
236+
// value followed by a null byte (exclusive). This creates a
237+
// single-value range partition, which is the closest equivalent
238+
// to LIST semantics for RANGE-partitioned tables.
239+
ddl = fmt.Sprintf(
240+
"CREATE TABLE IF NOT EXISTS %s PARTITION OF %s FOR VALUES FROM ('%s') TO ('%s\\x00')",
241+
partitionName, table, safeValue, safeValue,
242+
)
243+
default:
244+
return fmt.Errorf("partitioned database %q: unsupported partition type %q (use %q or %q)",
245+
p.name, p.config.PartitionType, PartitionTypeList, PartitionTypeRange)
246+
}
247+
248+
if _, err := db.ExecContext(ctx, ddl); err != nil {
172249
return fmt.Errorf("partitioned database %q: failed to create partition %q for table %q: %w",
173250
p.name, partitionName, table, err)
174251
}
@@ -177,10 +254,70 @@ func (p *PartitionedDatabase) EnsurePartition(ctx context.Context, tenantValue s
177254
return nil
178255
}
179256

257+
// SyncPartitionsFromSource queries the configured sourceTable for all distinct
258+
// tenant values and ensures that partitions exist for each one.
259+
// This enables automatic partition creation when new tenants are added to a
260+
// source table (e.g., a "tenants" table).
261+
//
262+
// No-ops if sourceTable is not configured.
263+
func (p *PartitionedDatabase) SyncPartitionsFromSource(ctx context.Context) error {
264+
if p.config.SourceTable == "" {
265+
return nil
266+
}
267+
268+
if err := validateIdentifier(p.config.SourceTable); err != nil {
269+
return fmt.Errorf("partitioned database %q: invalid source table: %w", p.name, err)
270+
}
271+
272+
srcCol := p.config.SourceColumn
273+
if srcCol == "" {
274+
srcCol = p.config.PartitionKey
275+
}
276+
if err := validateIdentifier(srcCol); err != nil {
277+
return fmt.Errorf("partitioned database %q: invalid source column: %w", p.name, err)
278+
}
279+
280+
db := p.base.DB()
281+
if db == nil {
282+
return fmt.Errorf("partitioned database %q: database connection is nil", p.name)
283+
}
284+
285+
// All identifiers (srcCol, SourceTable) have been validated by validateIdentifier above.
286+
query := fmt.Sprintf("SELECT DISTINCT %s FROM %s WHERE %s IS NOT NULL", //nolint:gosec // G201: identifiers validated above
287+
srcCol, p.config.SourceTable, srcCol)
288+
289+
rows, err := db.QueryContext(ctx, query)
290+
if err != nil {
291+
return fmt.Errorf("partitioned database %q: failed to query source table %q: %w",
292+
p.name, p.config.SourceTable, err)
293+
}
294+
defer rows.Close()
295+
296+
var tenants []string
297+
for rows.Next() {
298+
var val string
299+
if err := rows.Scan(&val); err != nil {
300+
return fmt.Errorf("partitioned database %q: failed to scan tenant value: %w", p.name, err)
301+
}
302+
tenants = append(tenants, val)
303+
}
304+
if err := rows.Err(); err != nil {
305+
return fmt.Errorf("partitioned database %q: row iteration error: %w", p.name, err)
306+
}
307+
308+
for _, tenant := range tenants {
309+
if err := p.EnsurePartition(ctx, tenant); err != nil {
310+
return err
311+
}
312+
}
313+
314+
return nil
315+
}
316+
180317
// isSupportedPartitionDriver returns true for PostgreSQL-compatible drivers.
181318
func isSupportedPartitionDriver(driver string) bool {
182319
switch driver {
183-
case "pgx", "pgx/v5", "postgres", "postgresql":
320+
case "pgx", "pgx/v5", "postgres":
184321
return true
185322
}
186323
return false

0 commit comments

Comments
 (0)