Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public DatastoreConfig convert(final Config config) {
connectionConfig.credentials(),
connectionConfig.applicationName(),
connectionConfig.connectionPoolConfig(),
connectionConfig.queryTimeout(),
connectionConfig.customParameters()) {
@Override
public String toConnectionString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public ConnectionConfig build() {
credentials,
applicationName,
connectionPoolConfig,
queryTimeout,
customParameters);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static java.util.Collections.unmodifiableList;
import static java.util.function.Predicate.not;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -36,6 +37,7 @@ public class PostgresConnectionConfig extends ConnectionConfig {

@NonNull String applicationName;
@NonNull ConnectionPoolConfig connectionPoolConfig;
@NonNull Duration queryTimeout;

public static ConnectionConfigBuilder builder() {
return ConnectionConfig.builder().type(DatabaseType.POSTGRES);
Expand All @@ -47,6 +49,7 @@ public PostgresConnectionConfig(
@Nullable final ConnectionCredentials credentials,
@NonNull final String applicationName,
@Nullable final ConnectionPoolConfig connectionPoolConfig,
@NonNull final Duration queryTimeout,
@NonNull final Map<String, String> customParameters) {
super(
ensureSingleEndpoint(endpoints),
Expand All @@ -55,6 +58,7 @@ public PostgresConnectionConfig(
customParameters);
this.applicationName = applicationName;
this.connectionPoolConfig = getConnectionPoolConfigOrDefault(connectionPoolConfig);
this.queryTimeout = queryTimeout;
}

public String toConnectionString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ public Map<String, String> getCustomParameters() {
return connectionConfig.customParameters();
}

public int getQueryTimeoutSeconds() {
return (int) connectionConfig.queryTimeout().toSeconds();
}

public void close() {
if (connection != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public PostgresCollection(final PostgresClient client, final String collectionNa
this.tableIdentifier = tableIdentifier;
this.subDocUpdater =
new PostgresSubDocumentUpdater(new PostgresQueryBuilder(this.tableIdentifier));
this.queryExecutor = new PostgresQueryExecutor();
this.queryExecutor = new PostgresQueryExecutor(client.getQueryTimeoutSeconds());
this.updateValidator = new CommonUpdateValidator();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,19 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser;
import org.hypertrace.core.documentstore.postgres.query.v1.transformer.PostgresQueryTransformer;

@Slf4j
@AllArgsConstructor
public class PostgresQueryExecutor {

private final int queryTimeoutSeconds;

public PostgresQueryExecutor(int queryTimeoutSeconds) {
this.queryTimeoutSeconds = queryTimeoutSeconds;
}

static org.hypertrace.core.documentstore.query.Query transformAndLog(
org.hypertrace.core.documentstore.query.Query query) {
log.debug("Original query before transformation: {}", query);
Expand All @@ -28,7 +32,8 @@ protected ResultSet execute(final Connection connection, PostgresQueryParser que
final String sqlQuery = queryParser.parse();
final Params params = queryParser.getParamsBuilder().build();
// this is closed when the corresponding ResultSet is closed in the iterators
PreparedStatement preparedStatement = buildPreparedStatement(sqlQuery, params, connection);
PreparedStatement preparedStatement =
buildPreparedStatement(sqlQuery, params, connection, queryTimeoutSeconds);
try {
log.debug("Executing SQL query: {}", sqlQuery);
return preparedStatement.executeQuery();
Expand All @@ -45,7 +50,16 @@ protected ResultSet execute(final Connection connection, PostgresQueryParser que

public PreparedStatement buildPreparedStatement(
String sqlQuery, Params params, Connection connection) throws SQLException {
return buildPreparedStatement(sqlQuery, params, connection, this.queryTimeoutSeconds);
}

public PreparedStatement buildPreparedStatement(
String sqlQuery, Params params, Connection connection, int queryTimeoutSeconds)
throws SQLException {
PreparedStatement preparedStatement = connection.prepareStatement(sqlQuery);
if (queryTimeoutSeconds > 0) {
preparedStatement.setQueryTimeout(queryTimeoutSeconds);
}
enrichPreparedStatementWithParams(preparedStatement, params);
return preparedStatement;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ void testBuildPostgres() {
.poolMaxConnectionsKey(MAX_CONNECTIONS_KEY)
.poolConnectionAccessTimeoutKey(CONNECTION_ACCESS_TIMEOUT_KEY)
.poolConnectionSurrenderTimeoutKey(CONNECTION_SURRENDER_TIMEOUT_KEY)
.queryTimeoutKey(QUERY_TIMEOUT_KEY)
.extract()
.connectionConfig();
final ConnectionConfig expected =
Expand All @@ -179,6 +180,7 @@ void testBuildPostgres() {
.connectionSurrenderTimeout(surrenderTimeout)
.build())
.aggregationPipelineMode(SORT_OPTIMIZED_IF_POSSIBLE)
.queryTimeout(queryTimeout)
.build();

assertEquals(expected, config);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.hypertrace.core.documentstore.postgres;

import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
class PostgresQueryExecutorTest {

@Mock private Connection mockConnection;
@Mock private PreparedStatement mockPreparedStatement;

@Test
void testPreparedStatementUsesConfiguredTimeout() throws SQLException {
int configuredTimeoutSeconds = 45;
String sqlQuery = "SELECT * FROM test_table";
Params params = Params.newBuilder().build();

when(mockConnection.prepareStatement(sqlQuery)).thenReturn(mockPreparedStatement);

PostgresQueryExecutor executor = new PostgresQueryExecutor(configuredTimeoutSeconds);
executor.buildPreparedStatement(sqlQuery, params, mockConnection);
verify(mockPreparedStatement).setQueryTimeout(configuredTimeoutSeconds);
}

@Test
void testPreparedStatementUsesOverridenTimeout() throws SQLException {
int defaultTimeoutSeconds = 30;
String sqlQuery = "SELECT * FROM test_table";
Params params = Params.newBuilder().build();

when(mockConnection.prepareStatement(sqlQuery)).thenReturn(mockPreparedStatement);

PostgresQueryExecutor executor = new PostgresQueryExecutor(defaultTimeoutSeconds);
// override timeout to 45s
executor.buildPreparedStatement(sqlQuery, params, mockConnection, 45);
verify(mockPreparedStatement).setQueryTimeout(45);
}
}
Loading