From aaed3f651e2025412d1521bdcf50dbdf47fee0c0 Mon Sep 17 00:00:00 2001 From: Aditya Choudhari Date: Wed, 11 Feb 2026 09:54:36 -0800 Subject: [PATCH] perf: batch changelog insertion and deletion pg queries --- apps/workspace-engine/pkg/db/batch.go | 129 ++++++++++++ apps/workspace-engine/pkg/db/changelog.sql.go | 52 +++++ apps/workspace-engine/pkg/db/db.go | 1 + apps/workspace-engine/pkg/db/models.go | 8 + .../pkg/db/persistence/store.go | 196 ++++++++++-------- .../pkg/db/queries/changelog.sql | 15 ++ .../pkg/db/queries/schema.sql | 9 + apps/workspace-engine/pkg/db/sqlc.yaml | 1 + 8 files changed, 326 insertions(+), 85 deletions(-) create mode 100644 apps/workspace-engine/pkg/db/batch.go create mode 100644 apps/workspace-engine/pkg/db/changelog.sql.go create mode 100644 apps/workspace-engine/pkg/db/queries/changelog.sql diff --git a/apps/workspace-engine/pkg/db/batch.go b/apps/workspace-engine/pkg/db/batch.go new file mode 100644 index 000000000..2299b9ab2 --- /dev/null +++ b/apps/workspace-engine/pkg/db/batch.go @@ -0,0 +1,129 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.30.0 +// source: batch.go + +package db + +import ( + "context" + "errors" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgtype" +) + +var ( + ErrBatchAlreadyClosed = errors.New("batch already closed") +) + +const deleteChangelogEntry = `-- name: DeleteChangelogEntry :batchexec +DELETE FROM changelog_entry +WHERE workspace_id = $1 AND entity_type = $2 AND entity_id = $3 +` + +type DeleteChangelogEntryBatchResults struct { + br pgx.BatchResults + tot int + closed bool +} + +type DeleteChangelogEntryParams struct { + WorkspaceID uuid.UUID + EntityType string + EntityID string +} + +func (q *Queries) DeleteChangelogEntry(ctx context.Context, arg []DeleteChangelogEntryParams) *DeleteChangelogEntryBatchResults { + batch := &pgx.Batch{} + for _, a := range arg { + vals := []interface{}{ + a.WorkspaceID, + a.EntityType, + a.EntityID, + } + batch.Queue(deleteChangelogEntry, vals...) + } + br := q.db.SendBatch(ctx, batch) + return &DeleteChangelogEntryBatchResults{br, len(arg), false} +} + +func (b *DeleteChangelogEntryBatchResults) Exec(f func(int, error)) { + defer b.br.Close() + for t := 0; t < b.tot; t++ { + if b.closed { + if f != nil { + f(t, ErrBatchAlreadyClosed) + } + continue + } + _, err := b.br.Exec() + if f != nil { + f(t, err) + } + } +} + +func (b *DeleteChangelogEntryBatchResults) Close() error { + b.closed = true + return b.br.Close() +} + +const upsertChangelogEntry = `-- name: UpsertChangelogEntry :batchexec +INSERT INTO changelog_entry (workspace_id, entity_type, entity_id, entity_data, created_at) +VALUES ($1, $2, $3, $4, $5) +ON CONFLICT (workspace_id, entity_type, entity_id) +DO UPDATE SET entity_data = EXCLUDED.entity_data +` + +type UpsertChangelogEntryBatchResults struct { + br pgx.BatchResults + tot int + closed bool +} + +type UpsertChangelogEntryParams struct { + WorkspaceID uuid.UUID + EntityType string + EntityID string + EntityData []byte + CreatedAt pgtype.Timestamptz +} + +func (q *Queries) UpsertChangelogEntry(ctx context.Context, arg []UpsertChangelogEntryParams) *UpsertChangelogEntryBatchResults { + batch := &pgx.Batch{} + for _, a := range arg { + vals := []interface{}{ + a.WorkspaceID, + a.EntityType, + a.EntityID, + a.EntityData, + a.CreatedAt, + } + batch.Queue(upsertChangelogEntry, vals...) + } + br := q.db.SendBatch(ctx, batch) + return &UpsertChangelogEntryBatchResults{br, len(arg), false} +} + +func (b *UpsertChangelogEntryBatchResults) Exec(f func(int, error)) { + defer b.br.Close() + for t := 0; t < b.tot; t++ { + if b.closed { + if f != nil { + f(t, ErrBatchAlreadyClosed) + } + continue + } + _, err := b.br.Exec() + if f != nil { + f(t, err) + } + } +} + +func (b *UpsertChangelogEntryBatchResults) Close() error { + b.closed = true + return b.br.Close() +} diff --git a/apps/workspace-engine/pkg/db/changelog.sql.go b/apps/workspace-engine/pkg/db/changelog.sql.go new file mode 100644 index 000000000..722ab9a01 --- /dev/null +++ b/apps/workspace-engine/pkg/db/changelog.sql.go @@ -0,0 +1,52 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.30.0 +// source: changelog.sql + +package db + +import ( + "context" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5/pgtype" +) + +const listChangelogEntriesByWorkspace = `-- name: ListChangelogEntriesByWorkspace :many +SELECT entity_type, entity_id, entity_data, created_at +FROM changelog_entry +WHERE workspace_id = $1 +ORDER BY created_at ASC +` + +type ListChangelogEntriesByWorkspaceRow struct { + EntityType string + EntityID string + EntityData []byte + CreatedAt pgtype.Timestamptz +} + +func (q *Queries) ListChangelogEntriesByWorkspace(ctx context.Context, workspaceID uuid.UUID) ([]ListChangelogEntriesByWorkspaceRow, error) { + rows, err := q.db.Query(ctx, listChangelogEntriesByWorkspace, workspaceID) + if err != nil { + return nil, err + } + defer rows.Close() + var items []ListChangelogEntriesByWorkspaceRow + for rows.Next() { + var i ListChangelogEntriesByWorkspaceRow + if err := rows.Scan( + &i.EntityType, + &i.EntityID, + &i.EntityData, + &i.CreatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} diff --git a/apps/workspace-engine/pkg/db/db.go b/apps/workspace-engine/pkg/db/db.go index 9d485b5f1..696f2287c 100644 --- a/apps/workspace-engine/pkg/db/db.go +++ b/apps/workspace-engine/pkg/db/db.go @@ -15,6 +15,7 @@ type DBTX interface { Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error) Query(context.Context, string, ...interface{}) (pgx.Rows, error) QueryRow(context.Context, string, ...interface{}) pgx.Row + SendBatch(context.Context, *pgx.Batch) pgx.BatchResults } func New(db DBTX) *Queries { diff --git a/apps/workspace-engine/pkg/db/models.go b/apps/workspace-engine/pkg/db/models.go index 8b387200c..cd1cc263b 100644 --- a/apps/workspace-engine/pkg/db/models.go +++ b/apps/workspace-engine/pkg/db/models.go @@ -56,6 +56,14 @@ func (ns NullDeploymentVersionStatus) Value() (driver.Value, error) { return string(ns.DeploymentVersionStatus), nil } +type ChangelogEntry struct { + WorkspaceID uuid.UUID + EntityType string + EntityID string + EntityData []byte + CreatedAt pgtype.Timestamptz +} + type Deployment struct { ID uuid.UUID Name string diff --git a/apps/workspace-engine/pkg/db/persistence/store.go b/apps/workspace-engine/pkg/db/persistence/store.go index 2592dc162..46bf5623f 100644 --- a/apps/workspace-engine/pkg/db/persistence/store.go +++ b/apps/workspace-engine/pkg/db/persistence/store.go @@ -4,12 +4,13 @@ import ( "context" "encoding/json" "fmt" - "time" + "workspace-engine/pkg/concurrency" "workspace-engine/pkg/db" "workspace-engine/pkg/persistence" "workspace-engine/pkg/workspace/store/repository/memory" - "github.com/jackc/pgx/v5" + "github.com/google/uuid" + "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" @@ -18,6 +19,8 @@ import ( var tracer = otel.Tracer("persistence/store") +const saveBatchSize = 500 + var _ persistence.Store = (*Store)(nil) type Store struct { @@ -32,70 +35,95 @@ func NewStore(ctx context.Context) (*Store, error) { return &Store{conn: conn}, nil } -func (s *Store) upsertChangelogEntry(ctx context.Context, tx pgx.Tx, change persistence.Change) error { - ctx, span := tracer.Start(ctx, "upsertChangelogEntry") +func (s *Store) upsertChangelogEntries(ctx context.Context, queries *db.Queries, changes []persistence.Change) error { + ctx, span := tracer.Start(ctx, "upsertChangelogEntries") defer span.End() - span.SetAttributes(attribute.String("change.type", string(change.ChangeType))) - span.SetAttributes(attribute.String("change.entity", fmt.Sprintf("%T: %+v", change.Entity, change.Entity))) - span.SetAttributes(attribute.Int64("change.timestamp", change.Timestamp.Unix())) + span.SetAttributes(attribute.Int("batch.size", len(changes))) - entityType, entityID := change.Entity.CompactionKey() + if len(changes) == 0 { + return nil + } - entityData, err := json.Marshal(change.Entity) - if err != nil { - return fmt.Errorf("failed to marshal entity: %w", err) - } - - sql := ` - INSERT INTO changelog_entry - (workspace_id, entity_type, entity_id, entity_data, created_at) - VALUES - ($1, $2, $3, $4, $5) - ON CONFLICT (workspace_id, entity_type, entity_id) - DO UPDATE SET - entity_data = EXCLUDED.entity_data - ` - - _, err = tx.Exec(ctx, sql, - change.Namespace, - entityType, - entityID, - entityData, - change.Timestamp, - ) - if err != nil { - span.RecordError(err) + params := make([]db.UpsertChangelogEntryParams, 0, len(changes)) + for _, change := range changes { + workspaceID, err := uuid.Parse(change.Namespace) + if err != nil { + return fmt.Errorf("failed to parse workspace ID: %w", err) + } + + entityType, entityID := change.Entity.CompactionKey() + + entityData, err := json.Marshal(change.Entity) + if err != nil { + return fmt.Errorf("failed to marshal entity: %w", err) + } + + params = append(params, db.UpsertChangelogEntryParams{ + WorkspaceID: workspaceID, + EntityType: entityType, + EntityID: entityID, + EntityData: entityData, + CreatedAt: pgtype.Timestamptz{Time: change.Timestamp, Valid: true}, + }) + } + + results := queries.UpsertChangelogEntry(ctx, params) + var batchErr error + results.Exec(func(i int, err error) { + if err != nil && batchErr == nil { + batchErr = err + } + }) + if batchErr != nil { + span.RecordError(batchErr) span.SetStatus(codes.Error, "failed to upsert changelog entry") + return fmt.Errorf("failed to upsert changelog entry: %w", batchErr) } - return err + + return nil } -func (s *Store) deleteChangelogEntry(ctx context.Context, tx pgx.Tx, change persistence.Change) error { - ctx, span := tracer.Start(ctx, "deleteChangelogEntry") +func (s *Store) deleteChangelogEntries(ctx context.Context, queries *db.Queries, changes []persistence.Change) error { + ctx, span := tracer.Start(ctx, "deleteChangelogEntries") defer span.End() - span.SetAttributes(attribute.String("change.type", string(change.ChangeType))) - span.SetAttributes(attribute.String("change.entity", fmt.Sprintf("%T: %+v", change.Entity, change.Entity))) - span.SetAttributes(attribute.Int64("change.timestamp", change.Timestamp.Unix())) + span.SetAttributes(attribute.Int("batch.size", len(changes))) + + if len(changes) == 0 { + return nil + } + + params := make([]db.DeleteChangelogEntryParams, 0, len(changes)) + for _, change := range changes { + workspaceID, err := uuid.Parse(change.Namespace) + if err != nil { + return fmt.Errorf("failed to parse workspace ID: %w", err) + } - entityType, entityID := change.Entity.CompactionKey() + entityType, entityID := change.Entity.CompactionKey() - sql := ` - DELETE FROM changelog_entry - WHERE workspace_id = $1 AND entity_type = $2 AND entity_id = $3 - ` + params = append(params, db.DeleteChangelogEntryParams{ + WorkspaceID: workspaceID, + EntityType: entityType, + EntityID: entityID, + }) + } - _, err := tx.Exec(ctx, sql, - change.Namespace, - entityType, - entityID, - ) - if err != nil { - span.RecordError(err) + results := queries.DeleteChangelogEntry(ctx, params) + var batchErr error + results.Exec(func(i int, err error) { + if err != nil && batchErr == nil { + batchErr = err + } + }) + if batchErr != nil { + span.RecordError(batchErr) span.SetStatus(codes.Error, "failed to delete changelog entry") + return fmt.Errorf("failed to delete changelog entry: %w", batchErr) } - return err + + return nil } func (s *Store) Save(ctx context.Context, changes persistence.Changes) error { @@ -113,20 +141,15 @@ func (s *Store) Save(ctx context.Context, changes persistence.Changes) error { } defer func() { _ = tx.Rollback(ctx) }() + setEntries := make([]persistence.Change, 0) + unsetEntries := make([]persistence.Change, 0) + for _, change := range changes { switch change.ChangeType { case persistence.ChangeTypeSet: - if err := s.upsertChangelogEntry(ctx, tx, change); err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, "failed to upsert change") - return fmt.Errorf("failed to upsert change: %w", err) - } + setEntries = append(setEntries, change) case persistence.ChangeTypeUnset: - if err := s.deleteChangelogEntry(ctx, tx, change); err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, "failed to delete change") - return fmt.Errorf("failed to delete change: %w", err) - } + unsetEntries = append(unsetEntries, change) default: span.RecordError(fmt.Errorf("unknown change type: %s", change.ChangeType)) span.SetStatus(codes.Error, "unknown change type") @@ -134,6 +157,24 @@ func (s *Store) Save(ctx context.Context, changes persistence.Changes) error { } } + queries := db.New(tx) + + for _, chunk := range concurrency.Chunk(setEntries, saveBatchSize) { + if err := s.upsertChangelogEntries(ctx, queries, chunk); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "failed to upsert changes") + return fmt.Errorf("failed to upsert changes: %w", err) + } + } + + for _, chunk := range concurrency.Chunk(unsetEntries, saveBatchSize) { + if err := s.deleteChangelogEntries(ctx, queries, chunk); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "failed to delete changes") + return fmt.Errorf("failed to delete changes: %w", err) + } + } + if err := tx.Commit(ctx); err != nil { span.RecordError(err) span.SetStatus(codes.Error, "failed to commit transaction") @@ -149,33 +190,22 @@ func (s *Store) Load(ctx context.Context, namespace string) (persistence.Changes span.SetAttributes(attribute.String("namespace", namespace)) - sql := ` - SELECT entity_type, entity_id, entity_data, created_at - FROM changelog_entry - WHERE workspace_id = $1 - ORDER BY created_at ASC - ` + workspaceID, err := uuid.Parse(namespace) + if err != nil { + return nil, fmt.Errorf("failed to parse workspace ID: %w", err) + } - rows, err := s.conn.Query(ctx, sql, namespace) + queries := db.New(s.conn) + rows, err := queries.ListChangelogEntriesByWorkspace(ctx, workspaceID) if err != nil { return nil, fmt.Errorf("failed to query changelog entries: %w", err) } - defer rows.Close() var changes persistence.Changes - jsonEntityRegistry := memory.GlobalRegistry() - for rows.Next() { - var entityType, entityID string - var entityData []byte - var createdAt time.Time - - if err := rows.Scan(&entityType, &entityID, &entityData, &createdAt); err != nil { - return nil, fmt.Errorf("failed to scan row: %w", err) - } - - entity, err := jsonEntityRegistry.Unmarshal(entityType, entityData) + for _, row := range rows { + entity, err := jsonEntityRegistry.Unmarshal(row.EntityType, row.EntityData) if err != nil { continue } @@ -184,14 +214,10 @@ func (s *Store) Load(ctx context.Context, namespace string) (persistence.Changes Namespace: namespace, ChangeType: persistence.ChangeTypeSet, // All loaded entities are "set" type Entity: entity, - Timestamp: createdAt, + Timestamp: row.CreatedAt.Time, }) } - if err := rows.Err(); err != nil { - return nil, fmt.Errorf("error iterating rows: %w", err) - } - return changes, nil } diff --git a/apps/workspace-engine/pkg/db/queries/changelog.sql b/apps/workspace-engine/pkg/db/queries/changelog.sql new file mode 100644 index 000000000..85966afd5 --- /dev/null +++ b/apps/workspace-engine/pkg/db/queries/changelog.sql @@ -0,0 +1,15 @@ +-- name: UpsertChangelogEntry :batchexec +INSERT INTO changelog_entry (workspace_id, entity_type, entity_id, entity_data, created_at) +VALUES ($1, $2, $3, $4, $5) +ON CONFLICT (workspace_id, entity_type, entity_id) +DO UPDATE SET entity_data = EXCLUDED.entity_data; + +-- name: DeleteChangelogEntry :batchexec +DELETE FROM changelog_entry +WHERE workspace_id = $1 AND entity_type = $2 AND entity_id = $3; + +-- name: ListChangelogEntriesByWorkspace :many +SELECT entity_type, entity_id, entity_data, created_at +FROM changelog_entry +WHERE workspace_id = $1 +ORDER BY created_at ASC; diff --git a/apps/workspace-engine/pkg/db/queries/schema.sql b/apps/workspace-engine/pkg/db/queries/schema.sql index bc51809fb..8b7e436ca 100644 --- a/apps/workspace-engine/pkg/db/queries/schema.sql +++ b/apps/workspace-engine/pkg/db/queries/schema.sql @@ -62,4 +62,13 @@ CREATE TABLE environment ( resource_selector JSONB DEFAULT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), CONSTRAINT environment_uniq UNIQUE (system_id, name) +); + +CREATE TABLE changelog_entry ( + workspace_id UUID NOT NULL REFERENCES workspace(id) ON DELETE CASCADE, + entity_type TEXT NOT NULL, + entity_id TEXT NOT NULL, + entity_data JSONB NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + PRIMARY KEY (workspace_id, entity_type, entity_id) ); \ No newline at end of file diff --git a/apps/workspace-engine/pkg/db/sqlc.yaml b/apps/workspace-engine/pkg/db/sqlc.yaml index a8596c138..ad0c5c1db 100644 --- a/apps/workspace-engine/pkg/db/sqlc.yaml +++ b/apps/workspace-engine/pkg/db/sqlc.yaml @@ -6,6 +6,7 @@ sql: queries: - queries/workspaces.sql - queries/deployment_versions.sql + - queries/changelog.sql gen: go: package: db