Skip to content
1 change: 1 addition & 0 deletions internal/collector/postgres_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
PostgresV16 = 160000
PostgresV17 = 170000
PostgresV18 = 180000
PostgresV19 = 190000

// Minimal required version is 9.5
PostgresVMinNum = PostgresV95
Expand Down
37 changes: 35 additions & 2 deletions internal/collector/postgres_stat_subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const (
"FROM pg_stat_subscription s1 JOIN pg_stat_subscription_stats s2 ON s1.subid = s2.subid " +
"WHERE s1.relid ISNULL;"

postgresStatSubscriptionQueryLatest = "SELECT s1.subid, s1.subname, COALESCE(s1.pid, 0) AS pid, " +
postgresStatSubscriptionQuery18 = "SELECT s1.subid, s1.subname, COALESCE(s1.pid, 0) AS pid, " +
"COALESCE(s1.worker_type, 'unknown') AS worker_type, " +
"COALESCE(received_lsn - '0/0', 0) AS received_lsn, " +
"COALESCE(latest_end_lsn - '0/0', 0) AS reported_lsn, " +
Expand All @@ -57,6 +57,22 @@ const (
"s2.confl_multiple_unique_conflicts " +
"FROM pg_stat_subscription s1 JOIN pg_stat_subscription_stats s2 ON s1.subid = s2.subid " +
"WHERE s1.relid ISNULL;"

postgresStatSubscriptionQueryLatest = "SELECT s1.subid, s1.subname, COALESCE(s1.pid, 0) AS pid, " +
"COALESCE(s1.worker_type, 'unknown') AS worker_type, " +
"COALESCE(received_lsn - '0/0', 0) AS received_lsn, " +
"COALESCE(latest_end_lsn - '0/0', 0) AS reported_lsn, " +
"COALESCE(EXTRACT(EPOCH FROM last_msg_send_time), 0) AS msg_send_time, " +
"COALESCE(EXTRACT(EPOCH FROM last_msg_receipt_time), 0) AS msg_recv_time, " +
"COALESCE(EXTRACT(EPOCH FROM latest_end_time), 0) AS reported_time, " +
"s2.apply_error_count, s2.sync_table_error_count, " +
"s2.confl_insert_exists, s2.confl_update_origin_differs, " +
"s2.confl_update_exists, s2.confl_update_missing, " +
"s2.confl_delete_origin_differs, s2.confl_delete_missing, " +
"s2.confl_multiple_unique_conflicts " +
"s2.sync_seq_error_count " +
"FROM pg_stat_subscription s1 JOIN pg_stat_subscription_stats s2 ON s1.subid = s2.subid " +
"WHERE s1.relid ISNULL;"
)

// postgresStatSubscriptionCollector defines metric descriptors and stats store.
Expand Down Expand Up @@ -192,6 +208,9 @@ func (c *postgresStatSubscriptionCollector) Update(ctx context.Context, config C
if value, ok := stat.values["confl_multiple_unique_conflicts"]; ok {
ch <- c.conflCount.newConstMetric(value, stat.SubID, stat.SubName, stat.WorkerType, "multiple_unique_conflicts")
}
if value, ok := stat.values["sync_seq_error_count"]; ok {
ch <- c.errorCount.newConstMetric(value, stat.SubID, stat.SubName, stat.WorkerType, "seq_error_count")
}
}

return nil
Expand Down Expand Up @@ -270,8 +289,17 @@ func parsePostgresSubscriptionStat(r *model.PGResult, labelNames []string) map[s
s.values["reported_time"] = v
case "apply_error_count":
s.values["apply_error_count"] = v
case "sync_error_count":
// PostgreSQL 19: column sync_error_count renamed to sync_table_error_count
case "sync_error_count", "sync_table_error_count":
s.values["sync_error_count"] = v
// PostgreSQL 18: following columns added
// confl_insert_exists - counts INSERT operations that violate unique constraints
// confl_update_origin_differs - identifies UPDATE conflicts where rows were modified by different origins
// confl_update_exists - tracks UPDATEs that violate unique constraints
// confl_update_missing - counts attempts to UPDATE rows that don't exist
// confl_delete_origin_differs - captures DELETE operations on rows modified by another origin
// confl_delete_missing - tracks DELETE attempts on non-existent rows
// confl_multiple_unique_conflicts - detects scenarios where multiple unique constraints are violated simultaneously
case "confl_insert_exists":
s.values["confl_insert_exists"] = v
case "confl_update_origin_differs":
Expand All @@ -286,6 +314,9 @@ func parsePostgresSubscriptionStat(r *model.PGResult, labelNames []string) map[s
s.values["confl_delete_missing"] = v
case "confl_multiple_unique_conflicts":
s.values["confl_multiple_unique_conflicts"] = v
// PostgreSQL 19: column sync_seq_error_count added
case "sync_seq_error_count":
s.values["sync_seq_error_count"] = v
default:
continue
}
Expand All @@ -306,6 +337,8 @@ func selectSubscriptionQuery(version int) string {
return postgresStatSubscriptionQuery16
case version < PostgresV18:
return postgresStatSubscriptionQuery17
case version < PostgresV19:
return postgresStatSubscriptionQuery18
default:
return postgresStatSubscriptionQueryLatest
}
Expand Down
3 changes: 2 additions & 1 deletion internal/collector/postgres_stat_subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ func Test_selectSubscriptionQuery(t *testing.T) {
{version: 150000, want: postgresStatSubscriptionQuery16},
{version: 160000, want: postgresStatSubscriptionQuery16},
{version: 170000, want: postgresStatSubscriptionQuery17},
{version: 180000, want: postgresStatSubscriptionQueryLatest},
{version: 180000, want: postgresStatSubscriptionQuery18},
{version: 190000, want: postgresStatSubscriptionQueryLatest},
}

for _, tc := range testcases {
Expand Down
128 changes: 104 additions & 24 deletions internal/collector/postgres_statements.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ package collector
import (
"context"
"fmt"
"github.com/cherts/pgscv/internal/log"
"github.com/cherts/pgscv/internal/model"
"github.com/prometheus/client_golang/prometheus"
"strconv"
"strings"
"sync"

"github.com/cherts/pgscv/internal/log"
"github.com/cherts/pgscv/internal/model"
"github.com/prometheus/client_golang/prometheus"
)

const (
Expand Down Expand Up @@ -129,7 +130,7 @@ const (

// postgresStatementsQueryLatest defines query for querying statements metrics.
// 1. use nullif(value, 0) to nullify zero values, NULL are skipped by stats method and metrics wil not be generated.
postgresStatementsQueryLatest = "SELECT d.datname AS database, pg_get_userbyid(p.userid) AS \"user\", p.queryid, " +
postgresStatementsQuery18 = "SELECT d.datname AS database, pg_get_userbyid(p.userid) AS \"user\", p.queryid, " +
"COALESCE(%s, '') AS query, p.calls, p.rows, p.total_exec_time, p.total_plan_time, p.shared_blk_read_time AS blk_read_time, " +
"p.shared_blk_write_time AS blk_write_time, NULLIF(p.shared_blks_hit, 0) AS shared_blks_hit, NULLIF(p.shared_blks_read, 0) AS shared_blks_read, " +
"NULLIF(p.shared_blks_dirtied, 0) AS shared_blks_dirtied, NULLIF(p.shared_blks_written, 0) AS shared_blks_written, " +
Expand All @@ -140,7 +141,7 @@ const (
"NULLIF(p.wal_buffers_full, 0) AS wal_buffers_full " +
"FROM %s.pg_stat_statements p JOIN pg_database d ON d.oid=p.dbid"

postgresStatementsQueryLatestTopK = "WITH stat AS (SELECT d.datname AS DATABASE, pg_get_userbyid(p.userid) AS \"user\", p.queryid, " +
postgresStatementsQuery18TopK = "WITH stat AS (SELECT d.datname AS DATABASE, pg_get_userbyid(p.userid) AS \"user\", p.queryid, " +
"COALESCE(%s, '') AS query, p.calls, p.rows, p.total_exec_time, p.total_plan_time, p.shared_blk_read_time AS blk_read_time, " +
"p.shared_blk_write_time AS blk_write_time, NULLIF(p.shared_blks_hit, 0) AS shared_blks_hit, NULLIF(p.shared_blks_read, 0) AS shared_blks_read, " +
"NULLIF(p.shared_blks_dirtied, 0) AS shared_blks_dirtied, NULLIF(p.shared_blks_written, 0) AS shared_blks_written, " +
Expand Down Expand Up @@ -171,29 +172,79 @@ const (
"NULLIF(SUM(COALESCE(temp_blks_written, 0)), 0), NULLIF(SUM(COALESCE(wal_records, 0)), 0), NULLIF(SUM(COALESCE(wal_fpi, 0)), 0), " +
"NULLIF(SUM(COALESCE(wal_bytes, 0)), 0), NULLIF(SUM(COALESCE(wal_buffers_full, 0)), 0) FROM stat WHERE NOT visible " +
"GROUP BY DATABASE HAVING EXISTS (SELECT 1 FROM stat WHERE NOT visible)"

// postgresStatementsQueryLatest defines query for querying statements metrics.
// 1. use nullif(value, 0) to nullify zero values, NULL are skipped by stats method and metrics wil not be generated.
postgresStatementsQueryLatest = "SELECT d.datname AS database, pg_get_userbyid(p.userid) AS \"user\", p.queryid, " +
"COALESCE(%s, '') AS query, p.calls, p.rows, p.total_exec_time, p.total_plan_time, p.shared_blk_read_time AS blk_read_time, " +
"p.shared_blk_write_time AS blk_write_time, NULLIF(p.shared_blks_hit, 0) AS shared_blks_hit, NULLIF(p.shared_blks_read, 0) AS shared_blks_read, " +
"NULLIF(p.shared_blks_dirtied, 0) AS shared_blks_dirtied, NULLIF(p.shared_blks_written, 0) AS shared_blks_written, " +
"NULLIF(p.local_blks_hit, 0) AS local_blks_hit, NULLIF(p.local_blks_read, 0) AS local_blks_read, " +
"NULLIF(p.local_blks_dirtied, 0) AS local_blks_dirtied, NULLIF(p.local_blks_written, 0) AS local_blks_written, " +
"NULLIF(p.temp_blks_read, 0) AS temp_blks_read, NULLIF(p.temp_blks_written, 0) AS temp_blks_written, " +
"NULLIF(p.wal_records, 0) AS wal_records, NULLIF(p.wal_fpi, 0) AS wal_fpi, NULLIF(p.wal_bytes, 0) AS wal_bytes, " +
"NULLIF(p.wal_buffers_full, 0) AS wal_buffers_full, NULLIF(p.generic_plan_calls, 0) AS generic_plan_calls, " +
"NULLIF(p.custom_plan_calls, 0) AS custom_plan_calls, " +
"FROM %s.pg_stat_statements p JOIN pg_database d ON d.oid=p.dbid"

postgresStatementsQueryLatestTopK = "WITH stat AS (SELECT d.datname AS DATABASE, pg_get_userbyid(p.userid) AS \"user\", p.queryid, " +
"COALESCE(%s, '') AS query, p.calls, p.rows, p.total_exec_time, p.total_plan_time, p.shared_blk_read_time AS blk_read_time, " +
"p.shared_blk_write_time AS blk_write_time, NULLIF(p.shared_blks_hit, 0) AS shared_blks_hit, NULLIF(p.shared_blks_read, 0) AS shared_blks_read, " +
"NULLIF(p.shared_blks_dirtied, 0) AS shared_blks_dirtied, NULLIF(p.shared_blks_written, 0) AS shared_blks_written, " +
"NULLIF(p.local_blks_hit, 0) AS local_blks_hit, NULLIF(p.local_blks_read, 0) AS local_blks_read, " +
"NULLIF(p.local_blks_dirtied, 0) AS local_blks_dirtied, NULLIF(p.local_blks_written, 0) AS local_blks_written, " +
"NULLIF(p.temp_blks_read, 0) AS temp_blks_read, NULLIF(p.temp_blks_written, 0) AS temp_blks_written, " +
"NULLIF(p.wal_records, 0) AS wal_records, NULLIF(p.wal_fpi, 0) AS wal_fpi, NULLIF(p.wal_bytes, 0) AS wal_bytes, " +
"NULLIF(p.wal_buffers_full, 0) AS wal_buffers_full, " +
"(ROW_NUMBER() OVER ( ORDER BY p.calls DESC NULLS LAST) < $1) OR (ROW_NUMBER() OVER ( ORDER BY p.rows DESC NULLS LAST) < $1) OR " +
"(ROW_NUMBER() OVER ( ORDER BY p.total_exec_time DESC NULLS LAST) < $1) OR (ROW_NUMBER() OVER ( ORDER BY p.total_plan_time DESC NULLS LAST) < $1) OR " +
"(ROW_NUMBER() OVER ( ORDER BY p.shared_blk_read_time DESC NULLS LAST) < $1) OR (ROW_NUMBER() OVER ( ORDER BY p.shared_blk_write_time DESC NULLS LAST) < $1) OR " +
"(ROW_NUMBER() OVER ( ORDER BY p.shared_blks_hit DESC NULLS LAST) < $1) OR (ROW_NUMBER() OVER ( ORDER BY p.shared_blks_read DESC NULLS LAST) < $1) OR " +
"(ROW_NUMBER() OVER ( ORDER BY p.shared_blks_dirtied DESC NULLS LAST) < $1) OR (ROW_NUMBER() OVER ( ORDER BY p.shared_blks_written DESC NULLS LAST) < $1) OR " +
"(ROW_NUMBER() OVER ( ORDER BY p.local_blks_hit DESC NULLS LAST) < $1) OR (ROW_NUMBER() OVER ( ORDER BY p.local_blks_read DESC NULLS LAST) < $1) OR " +
"(ROW_NUMBER() OVER ( ORDER BY p.local_blks_dirtied DESC NULLS LAST) < $1) OR (ROW_NUMBER() OVER ( ORDER BY p.local_blks_written DESC NULLS LAST) < $1) OR " +
"(ROW_NUMBER() OVER ( ORDER BY p.temp_blks_read DESC NULLS LAST) < $1) OR (ROW_NUMBER() OVER ( ORDER BY p.temp_blks_written DESC NULLS LAST) < $1) OR " +
"(ROW_NUMBER() OVER ( ORDER BY p.wal_records DESC NULLS LAST) < $1) OR (ROW_NUMBER() OVER ( ORDER BY p.wal_fpi DESC NULLS LAST) < $1) OR " +
"(ROW_NUMBER() OVER ( ORDER BY p.wal_bytes DESC NULLS LAST) < $1) OR (ROW_NUMBER() OVER ( ORDER BY p.wal_buffers_full DESC NULLS LAST) < $1) OR " +
"(ROW_NUMBER() OVER ( ORDER BY p.generic_plan_calls DESC NULLS LAST) < $1) OR (ROW_NUMBER() OVER ( ORDER BY p.custom_plan_calls DESC NULLS LAST) < $1) AS visible " +
"FROM %s.pg_stat_statements p JOIN pg_database d ON d.oid = p.dbid) " +
"SELECT DATABASE, \"user\", queryid, query, calls, rows, total_exec_time, total_plan_time, blk_read_time, blk_write_time, shared_blks_hit, " +
"shared_blks_read, shared_blks_dirtied, shared_blks_written, local_blks_hit, local_blks_read, local_blks_dirtied, local_blks_written, " +
"temp_blks_read, temp_blks_written, wal_records, wal_fpi, wal_bytes, wal_buffers_full FROM stat WHERE visible UNION ALL SELECT DATABASE, 'all_users', NULL, " +
"'all_queries', NULLIF(SUM(COALESCE(calls, 0)), 0), NULLIF(SUM(COALESCE(ROWS, 0)), 0), NULLIF(SUM(COALESCE(total_exec_time, 0)), 0), " +
"NULLIF(SUM(COALESCE(total_plan_time, 0)), 0), NULLIF(SUM(COALESCE(blk_read_time, 0)), 0), NULLIF(SUM(COALESCE(blk_write_time, 0)), 0), " +
"NULLIF(SUM(COALESCE(shared_blks_hit, 0)), 0), NULLIF(SUM(COALESCE(shared_blks_read, 0)), 0), NULLIF(SUM(COALESCE(shared_blks_dirtied, 0)), 0), " +
"NULLIF(SUM(COALESCE(shared_blks_written, 0)), 0), NULLIF(SUM(COALESCE(local_blks_hit, 0)), 0), NULLIF(SUM(COALESCE(local_blks_read, 0)), 0), " +
"NULLIF(SUM(COALESCE(local_blks_dirtied, 0)), 0), NULLIF(SUM(COALESCE(local_blks_written, 0)), 0), NULLIF(SUM(COALESCE(temp_blks_read, 0)), 0), " +
"NULLIF(SUM(COALESCE(temp_blks_written, 0)), 0), NULLIF(SUM(COALESCE(wal_records, 0)), 0), NULLIF(SUM(COALESCE(wal_fpi, 0)), 0), " +
"NULLIF(SUM(COALESCE(wal_bytes, 0)), 0), NULLIF(SUM(COALESCE(wal_buffers_full, 0)), 0), " +
"NULLIF(SUM(COALESCE(generic_plan_calls, 0)), 0), NULLIF(SUM(COALESCE(custom_plan_calls, 0)), 0) FROM stat WHERE NOT visible " +
"GROUP BY DATABASE HAVING EXISTS (SELECT 1 FROM stat WHERE NOT visible)"
)

// postgresStatementsCollector ...
type postgresStatementsCollector struct {
query typedDesc
calls typedDesc
rows typedDesc
times typedDesc
allTimes typedDesc
sharedHit typedDesc
sharedRead typedDesc
sharedDirtied typedDesc
sharedWritten typedDesc
localHit typedDesc
localRead typedDesc
localDirtied typedDesc
localWritten typedDesc
tempRead typedDesc
tempWritten typedDesc
walRecords typedDesc
walBuffers typedDesc
walAllBytes typedDesc
walBytes typedDesc
query typedDesc
calls typedDesc
rows typedDesc
times typedDesc
allTimes typedDesc
sharedHit typedDesc
sharedRead typedDesc
sharedDirtied typedDesc
sharedWritten typedDesc
localHit typedDesc
localRead typedDesc
localDirtied typedDesc
localWritten typedDesc
tempRead typedDesc
tempWritten typedDesc
walRecords typedDesc
walBuffers typedDesc
walAllBytes typedDesc
walBytes typedDesc
genericPlanCalls typedDesc
customPlanCalls typedDesc
}

// NewPostgresStatementsCollector returns a new Collector exposing postgres statements stats.
Expand Down Expand Up @@ -314,6 +365,18 @@ func NewPostgresStatementsCollector(constLabels labels, settings model.Collector
[]string{"user", "database", "queryid", "wal"}, constLabels,
settings.Filters,
),
genericPlanCalls: newBuiltinTypedDesc(
descOpts{"postgres", "statements", "generic_plan_calls_total", "Total number of times prepared statement was executed using a generic plan.", 0},
prometheus.CounterValue,
[]string{"user", "database", "queryid"}, constLabels,
settings.Filters,
),
customPlanCalls: newBuiltinTypedDesc(
descOpts{"postgres", "statements", "custom_plan_calls_total", "Total number of times prepared statement was executed using a custom plan.", 0},
prometheus.CounterValue,
[]string{"user", "database", "queryid"}, constLabels,
settings.Filters,
),
}, nil
}

Expand Down Expand Up @@ -435,6 +498,12 @@ func (c *postgresStatementsCollector) Update(ctx context.Context, config Config,
ch <- c.walBuffers.newConstMetric(stat.walBuffers, stat.user, stat.database, stat.queryid)
}
}
if stat.genericPlanCalls > 0 {
ch <- c.genericPlanCalls.newConstMetric(stat.genericPlanCalls, stat.user, stat.database, stat.queryid)
}
if stat.customPlanCalls > 0 {
ch <- c.customPlanCalls.newConstMetric(stat.customPlanCalls, stat.user, stat.database, stat.queryid)
}
}

return nil
Expand Down Expand Up @@ -466,6 +535,8 @@ type postgresStatementStat struct {
walFPI float64
walBytes float64
walBuffers float64
genericPlanCalls float64
customPlanCalls float64
}

// parsePostgresStatementsStats parses PGResult and return structs with stats values.
Expand Down Expand Up @@ -564,6 +635,10 @@ func parsePostgresStatementsStats(r *model.PGResult, labelNames []string) map[st
s.walBytes += v
case "wal_buffers_full":
s.walBuffers += v
case "generic_plan_calls":
s.genericPlanCalls += v
case "custom_plan_calls":
s.customPlanCalls += v
default:
continue
}
Expand Down Expand Up @@ -598,6 +673,11 @@ func selectStatementsQuery(version int, schema string, notrackmode bool, topK in
return fmt.Sprintf(postgresStatementsQuery17TopK, queryColumm, schema)
}
return fmt.Sprintf(postgresStatementsQuery17, queryColumm, schema)
} else if version > PostgresV17 && version < PostgresV19 {
if topK > 0 {
return fmt.Sprintf(postgresStatementsQuery18TopK, queryColumm, schema)
}
return fmt.Sprintf(postgresStatementsQuery18, queryColumm, schema)
}
if topK > 0 {
return fmt.Sprintf(postgresStatementsQueryLatestTopK, queryColumm, schema)
Expand Down
8 changes: 6 additions & 2 deletions internal/collector/postgres_statements_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ func TestPostgresStatementsCollector_Update(t *testing.T) {
"postgres_statements_wal_bytes_all_total",
"postgres_statements_wal_bytes_total",
"postgres_statements_wal_buffers_full",
"postgres_statements_generic_plan_calls_total",
"postgres_statements_custom_plan_calls_total",
},
collector: NewPostgresStatementsCollector,
service: model.ServiceTypePostgresql,
Expand Down Expand Up @@ -176,8 +178,10 @@ func Test_selectStatementsQuery(t *testing.T) {
{version: PostgresV13, want: fmt.Sprintf(postgresStatementsQuery16TopK, "p.query", "example"), topK: 100},
{version: PostgresV17, want: fmt.Sprintf(postgresStatementsQuery17, "p.query", "example"), topK: 0},
{version: PostgresV17, want: fmt.Sprintf(postgresStatementsQuery17TopK, "p.query", "example"), topK: 100},
{version: PostgresV18, want: fmt.Sprintf(postgresStatementsQueryLatest, "p.query", "example"), topK: 0},
{version: PostgresV18, want: fmt.Sprintf(postgresStatementsQueryLatestTopK, "p.query", "example"), topK: 100},
{version: PostgresV18, want: fmt.Sprintf(postgresStatementsQuery18, "p.query", "example"), topK: 0},
{version: PostgresV18, want: fmt.Sprintf(postgresStatementsQuery18TopK, "p.query", "example"), topK: 100},
{version: PostgresV19, want: fmt.Sprintf(postgresStatementsQueryLatest, "p.query", "example"), topK: 0},
{version: PostgresV19, want: fmt.Sprintf(postgresStatementsQueryLatestTopK, "p.query", "example"), topK: 100},
}

for _, tc := range testcases {
Expand Down
Loading