Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions src/sync/pg-connector.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import { test, expect } from "vitest";
import { PostgreSqlContainer } from "@testcontainers/postgresql";
import { ConnectionManager } from "./connection-manager.ts";
import { Connectable } from "./connectable.ts";

test("getRecentQueries resolves pg_stat_statements in a non-default schema", async () => {
const pg = await new PostgreSqlContainer("postgres:17")
.withCopyContentToContainer([
{
content: `
CREATE SCHEMA monitoring;
CREATE EXTENSION pg_stat_statements SCHEMA monitoring;

CREATE TABLE users(id int, name text);
INSERT INTO users (id, name) VALUES (1, 'alice');
SELECT * FROM users WHERE id = 1;
`,
target: "/docker-entrypoint-initdb.d/init.sql",
},
])
.withCommand([
"-c",
"shared_preload_libraries=pg_stat_statements",
"-c",
"autovacuum=off",
"-c",
"track_counts=off",
"-c",
"track_io_timing=off",
"-c",
"track_activities=off",
])
.start();

const manager = ConnectionManager.forLocalDatabase();
const conn = Connectable.fromString(pg.getConnectionUri());
const connector = manager.getConnectorFor(conn);

try {
const recentQueries = await connector.getRecentQueries();
const userQuery = recentQueries.find((q) =>
q.query.includes("users")
);
expect(userQuery, "Expected to find a query involving 'users' table").toBeTruthy();
} finally {
await manager.closeAll();
await pg.stop();
}
});

test("resetPgStatStatements works with a non-default schema", async () => {
const pg = await new PostgreSqlContainer("postgres:17")
.withCopyContentToContainer([
{
content: `
CREATE SCHEMA monitoring;
CREATE EXTENSION pg_stat_statements SCHEMA monitoring;

CREATE TABLE users(id int, name text);
INSERT INTO users (id, name) VALUES (1, 'alice');
SELECT * FROM users WHERE id = 1;
`,
target: "/docker-entrypoint-initdb.d/init.sql",
},
])
.withCommand([
"-c",
"shared_preload_libraries=pg_stat_statements",
"-c",
"autovacuum=off",
"-c",
"track_counts=off",
"-c",
"track_io_timing=off",
"-c",
"track_activities=off",
])
.start();

const manager = ConnectionManager.forLocalDatabase();
const conn = Connectable.fromString(pg.getConnectionUri());
const connector = manager.getConnectorFor(conn);

try {
const before = await connector.getRecentQueries();
expect(before.length, "Expected queries before reset").toBeGreaterThan(0);

await connector.resetPgStatStatements();

const after = await connector.getRecentQueries();
expect(after.length, "Expected 0 queries after reset").toEqual(0);
} finally {
await manager.closeAll();
await pg.stop();
}
});
27 changes: 22 additions & 5 deletions src/sync/pg-connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ export type ResetPgStatStatementsResult =
export class PostgresConnector implements DatabaseConnector<PostgresTuple> {
private static readonly QUERY_DOCTOR_USER = "query_doctor_db_link";
private readonly tupleEstimates = new Map<TableName, number>();
private pssSchema: PgIdentifier | null = null;
/**
* The minimum size for a table to be considered for sampling.
* Otherwise we use the `order by random()` instead.
Expand Down Expand Up @@ -471,13 +472,29 @@ ORDER BY
};
}

private async getPssSchema(): Promise<PgIdentifier> {
if (this.pssSchema) return this.pssSchema;
const result = await this.db.exec<{ schema: string }>(`
SELECT n.nspname as schema
FROM pg_extension e
JOIN pg_namespace n ON n.oid = e.extnamespace
WHERE e.extname = 'pg_stat_statements'
`);
if (result.length === 0) {
throw new ExtensionNotInstalledError("pg_stat_statements");
}
this.pssSchema = PgIdentifier.fromString(result[0].schema);
return this.pssSchema;
}

/**
* Get the latest queries using pg_stat_statements
* @throws {ExtensionNotInstalledError} - pg_stat_statements is not installed
* @throws {PostgresError} - Not regular Error
*/
public async getRecentQueries(): Promise<RecentQuery[]> {
try {
const pssSchema = await this.getPssSchema();
const results = await this.db.exec<RawRecentQuery>(`
SELECT
'unknown_user' as "username",
Expand All @@ -486,7 +503,7 @@ ORDER BY
calls,
rows,
toplevel as "topLevel"
FROM pg_stat_statements
FROM ${pssSchema}.pg_stat_statements
WHERE query not like '%pg_stat_statements%'
-- and dbid = (select oid from pg_database where datname = current_database())
and query not like '%@qd_introspection%'
Expand All @@ -498,9 +515,7 @@ ORDER BY
results,
);
} catch (err) {
if (err instanceof ExtensionNotInstalledError) {
throw err;
}
if (err instanceof ExtensionNotInstalledError) throw err;
if (
err instanceof Error &&
err.message.includes('relation "pg_stat_statements" does not exist')
Expand All @@ -518,12 +533,14 @@ ORDER BY
*/
public async resetPgStatStatements(): Promise<void> {
try {
const pssSchema = await this.getPssSchema();
await this.db.exec(`
SELECT pg_stat_statements_reset(); -- @qd_introspection
SELECT ${pssSchema}.pg_stat_statements_reset(); -- @qd_introspection
`);

this.segmentedQueryCache.reset(this.db);
} catch (err) {
if (err instanceof ExtensionNotInstalledError) throw err;
if (
err instanceof Error &&
err.message.includes(
Expand Down