Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/pgo_connection_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ init([QueueTid, SupPid, PoolPid, PoolName, PoolConfig]) ->
ChildSpecs = [#{id => pgo_connection,
start => {pgo_connection, start_link, [QueueTid, PoolPid, PoolName,
SupPid, PoolConfig]},
shutdown => 100}],
shutdown => 5000}],
{ok, {SupFlags, ChildSpecs}}.
53 changes: 9 additions & 44 deletions src/pgo_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@
-type extended_query_loop_state() ::
% expect parse_complete message
parse_complete
| {parse_complete_with_params, [any()]}
| {parse_complete_with_params, iodata(), [any()]}
% expect parameter_description
| {parameter_description_with_params, [any()]}
| {parameter_description_with_params, iodata(), [any()]}
% expect row_description or no_data
| pre_bind_row_description
% expect bind_complete
Expand Down Expand Up @@ -381,23 +381,11 @@ process_active_data(PartialHeader, Conn=#conn{socket=Socket,
end.


% This function should always return true as set or reset may only fail because
% we are within a failed transaction.
% If set failed because the transaction was aborted, the query will fail
% (unless it is a rollback).
% If set succeeded within a transaction, but the query failed, the reset may
% fail but set only applies to the transaction anyway.
%% -spec set_succeeded_or_within_failed_transaction({set, []} | {error, pgo_error:pgo_error()}) -> boolean().
%% set_succeeded_or_within_failed_transaction({set, []}) -> true;
%% set_succeeded_or_within_failed_transaction({error, {error, _} = Error}) ->
%% error:is_in_failed_sql_transaction(Error).

-spec extended_query(#conn{}, iodata(), list(), pgo:decode_opts(), any(), list()) -> pgo:result().
extended_query(Conn=#conn{socket=Socket,
socket_module=SocketModule,
pool=Pool}, Query, Parameters, DecodeOptions, PerRowFun, Acc0) ->
_ = setopts(SocketModule, Socket, [{active, false}]),
put(query, Query),
ParseMessage = pgo_protocol:encode_parse_message("", Query, []),
%% We ask for a description of parameters only if required.
PacketT = case pgo_query_cache:lookup(Pool, Query) of
Expand All @@ -411,7 +399,7 @@ extended_query(Conn=#conn{socket=Socket,
not_found ->
DescribeStatementMessage = pgo_protocol:encode_describe_message(statement, ""),
FlushMessage = pgo_protocol:encode_flush_message(),
LoopState0 = {parse_complete_with_params, Parameters},
LoopState0 = {parse_complete_with_params, Query, Parameters},
{ok, [ParseMessage, DescribeStatementMessage, FlushMessage], LoopState0}

end,
Expand Down Expand Up @@ -463,9 +451,6 @@ encode_bind_describe_execute(Conn, Parameters, ParameterDataTypes) ->
{Class, Exception}
end.

%% requires_statement_description(_Parameters) ->
%% true. %pgo_protocol:bind_requires_statement_description(Parameters).

-spec receive_loop(extended_query_loop_state(), pgo:decode_fun(), list(), list(), pgo:conn())
-> pgo:result().
receive_loop(LoopState, DecodeFun, Acc0, DecodeOptions, Conn=#conn{socket=Socket,
Expand All @@ -488,14 +473,14 @@ receive_loop0(#parse_complete{}, parse_complete, DecodeFun, Acc0, DecodeOptions,

%% Path where we ask the backend about what it expects.
%% We ignore row descriptions sent before bind as the format codes are null.
receive_loop0(#parse_complete{}, {parse_complete_with_params, Parameters}, DecodeFun, Acc0, DecodeOptions, Conn) ->
receive_loop({parameter_description_with_params, Parameters}, DecodeFun, Acc0, DecodeOptions, Conn);
receive_loop0(#parse_complete{}, {parse_complete_with_params, Query, Parameters}, DecodeFun, Acc0, DecodeOptions, Conn) ->
receive_loop({parameter_description_with_params, Query, Parameters}, DecodeFun, Acc0, DecodeOptions, Conn);
receive_loop0(#parameter_description{data_types=ParameterDataTypes},
{parameter_description_with_params, Parameters}, DecodeFun,
{parameter_description_with_params, Query, Parameters}, DecodeFun,
Acc0, DecodeOptions, Conn=#conn{socket=Socket,
socket_module=SocketModule,
pool=Pool}) ->
pgo_query_cache:insert(Pool, get(query), ParameterDataTypes),
pgo_query_cache:insert(Pool, Query, ParameterDataTypes),
%% oob_update_oid_map_if_required(Conn, ParameterDataTypes, DecodeOptions),
PacketT = encode_bind_describe_execute(Conn, Parameters, ParameterDataTypes),
case PacketT of
Expand Down Expand Up @@ -537,15 +522,6 @@ receive_loop0(#command_complete{command_tag = Tag}, _LoopState, DecodeFun, Acc0,
receive_loop({result, #{command => Command,
num_rows => NumRows,
rows => lists:reverse(Acc0)}}, DecodeFun, Acc0, DecodeOptions, Conn);
%% receive_loop0(#portal_suspended{}, LoopState, DecodeFun, Acc0, DecodeOptions, Conn={_,S}) ->
%% ExecuteMessage = pgo_protocol:encode_execute_message("", 0),
%% FlushMessage = pgo_protocol:encode_flush_message(),
%% SinglePacket = [ExecuteMessage, FlushMessage],
%% case gen_tcp:send(S, SinglePacket) of
%% ok -> receive_loop(LoopState, DecodeFun, Acc0, DecodeOptions, Conn);
%% {error, _} = SendSinglePacketError ->
%% SendSinglePacketError
%% end;
receive_loop0(#ready_for_query{}, {result, Result}, _Fun, _Acc0, _DecodeOptions, __Socket) ->
Result;
receive_loop0(#error_response{fields = Fields}, LoopState, _Fun, _Acc0, _DecodeOptions,
Expand All @@ -556,8 +532,8 @@ receive_loop0(#error_response{fields = Fields}, LoopState, _Fun, _Acc0, _DecodeO
% - when we asked for the statement description
% - when MaxRowsStep > 0
NeedSync = case LoopState of
{parse_complete_with_params, _Args} -> true;
{parameter_description_with_params, _Parameters} -> true;
{parse_complete_with_params, _Query, _Args} -> true;
{parameter_description_with_params, _Query, _Parameters} -> true;
_ -> false
end,
case NeedSync of
Expand Down Expand Up @@ -685,17 +661,6 @@ simple_query(Conn=#conn{socket_module=SocketModule,
end.


%% % This function should always return true as set or reset may only fail because
%% % we are within a failed transaction.
%% % If set failed because the transaction was aborted, the query will fail
%% % (unless it is a rollback).
%% % If set succeeded within a transaction, but the query failed, the reset may
%% % fail but set only applies to the transaction anyway.
%% -spec set_succeeded_or_within_failed_transaction({set, []} | {error, pgsql_error:pgsql_error()}) -> boolean().
%% set_succeeded_or_within_failed_transaction({set, []}) -> true;
%% set_succeeded_or_within_failed_transaction({error, {pgsql_error, _} = Error}) ->
%% pgsql_error:is_in_failed_sql_transaction(Error).

simple_query_loop(#conn{socket=Socket}=Conn, Acc) ->
case simple_receive_message(Socket, Conn, []) of
{ok, row_description} ->
Expand Down
Loading