Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/ch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ defmodule Ch do
| {:username, String.t()}
| {:password, String.t()}
| {:settings, Keyword.t()}
| {:session_id, String.t()}
| {:session_timeout, pos_integer()}
| {:timeout, timeout}

@typedoc """
Expand Down
137 changes: 128 additions & 9 deletions lib/ch/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@ defmodule Ch.Connection do
alias Mint.HTTP1, as: HTTP

@user_agent "ch/" <> Mix.Project.config()[:version]
@default_session_timeout 300
@session_id_key :session_id
@session_timeout_key :session_timeout
@transaction_status_key :transaction_status

@typep conn :: HTTP.t()

@impl true
@spec connect([Ch.start_option()]) :: {:ok, conn} | {:error, Error.t() | Mint.Types.error()}
def connect(opts) do
opts = put_default_session_opts(opts)
scheme = String.to_existing_atom(opts[:scheme] || "http")
address = opts[:hostname] || "localhost"
port = opts[:port] || 8123
Expand All @@ -25,6 +30,9 @@ defmodule Ch.Connection do
|> maybe_put_private(:username, opts[:username])
|> maybe_put_private(:password, opts[:password])
|> maybe_put_private(:settings, opts[:settings])
|> maybe_put_private(@session_id_key, opts[@session_id_key])
|> maybe_put_private(@session_timeout_key, opts[@session_timeout_key])
|> put_transaction_status(:idle)

handshake = Query.build("select 1, version()")
params = DBConnection.Query.encode(handshake, _params = [], _opts = [])
Expand Down Expand Up @@ -93,17 +101,53 @@ defmodule Ch.Connection do
@spec checkout(conn) :: {:ok, conn}
def checkout(conn), do: {:ok, conn}

# we "support" these four tx callbacks for Repo.checkout
# even though ClickHouse doesn't support txs

@impl true
def handle_begin(_opts, conn), do: {:ok, %{}, conn}
def handle_begin(opts, conn) do
case {Keyword.get(opts, :mode, :transaction), transaction_status(conn)} do
{:transaction, :idle} ->
execute_transaction_command("BEGIN TRANSACTION", :transaction, conn)

{:transaction, status} when status in [:transaction, :error] ->
{:error, Error.exception("nested transactions are not supported"), conn}

{:savepoint, _status} ->
{:error, Error.exception("savepoints are not supported"), conn}
end
end

@impl true
def handle_commit(_opts, conn), do: {:ok, %{}, conn}
def handle_commit(opts, conn) do
case {Keyword.get(opts, :mode, :transaction), transaction_status(conn)} do
{:transaction, :transaction} ->
execute_transaction_command("COMMIT", :idle, conn)

{:transaction, :error} ->
{:error, conn}

{:transaction, :idle} ->
{:idle, conn}

{:savepoint, _status} ->
{:error, Error.exception("savepoints are not supported"), conn}
end
end

@impl true
def handle_rollback(_opts, conn), do: {:ok, %{}, conn}
def handle_rollback(opts, conn) do
case {Keyword.get(opts, :mode, :transaction), transaction_status(conn)} do
{:transaction, status} when status in [:transaction, :error] ->
execute_transaction_command("ROLLBACK", :idle, conn)

{:transaction, :idle} ->
{:idle, conn}

{:savepoint, _status} ->
{:error, Error.exception("savepoints are not supported"), conn}
end
end

@impl true
def handle_status(_opts, conn), do: {:idle, conn}
def handle_status(_opts, conn), do: {transaction_status(conn), conn}

@impl true
def handle_prepare(_query, _opts, conn) do
Expand Down Expand Up @@ -424,6 +468,13 @@ defmodule Ch.Connection do
String.to_integer(code)
end

conn =
if transaction_status(conn) == :transaction do
put_transaction_status(conn, :error)
else
conn
end

{:error, Error.exception(code: code, message: message), conn}
end
end
Expand Down Expand Up @@ -474,7 +525,10 @@ defmodule Ch.Connection do
end

defp get_opts_or_private(conn, opts, key) do
Keyword.get(opts, key) || HTTP.get_private(conn, key)
case Keyword.fetch(opts, key) do
{:ok, value} -> value
:error -> HTTP.get_private(conn, key)
end
end

defp maybe_put_new_header(headers, _name, _no_value = nil), do: headers
Expand All @@ -496,7 +550,72 @@ defmodule Ch.Connection do

defp path(conn, query_params, opts) do
settings = settings(conn, opts)
"/?" <> URI.encode_query(settings ++ query_params)
"/?" <> URI.encode_query(settings ++ session_query_params(conn, opts) ++ query_params)
end

defp execute_transaction_command(statement, next_status, conn) do
opts = session_opts(conn)

case request(conn, "POST", path(conn, [], opts), headers(conn, [], opts), statement, opts) do
{:ok, conn, _responses} ->
{:ok, %{}, put_transaction_status(conn, next_status)}

{:error, error, conn} ->
{:error, error, conn}

{:disconnect, reason, conn} ->
{:disconnect, reason, conn}
end
end

defp session_query_params(conn, opts) do
if transaction_status(conn) != :idle or
Keyword.has_key?(opts, @session_id_key) or
Keyword.has_key?(opts, @session_timeout_key) do
session_id = get_opts_or_private(conn, opts, @session_id_key)
session_timeout = get_opts_or_private(conn, opts, @session_timeout_key)

[]
|> maybe_put_query_param("session_id", session_id)
|> maybe_put_query_param("session_timeout", session_timeout)
else
[]
end
end

defp maybe_put_query_param(params, _key, nil), do: params
defp maybe_put_query_param(params, key, value), do: [{key, value} | params]

defp transaction_status(conn) do
HTTP.get_private(conn, @transaction_status_key, :idle)
end

defp put_transaction_status(conn, status) do
HTTP.put_private(conn, @transaction_status_key, status)
end

defp new_session_id do
"ch_" <> Base.url_encode64(:crypto.strong_rand_bytes(12), padding: false)
end

defp put_default_session_opts(opts) do
opts
|> Keyword.put_new(@session_timeout_key, @default_session_timeout)
|> put_default_session_id()
end

defp put_default_session_id(opts) do
case Keyword.has_key?(opts, @session_id_key) do
true -> opts
false -> Keyword.put(opts, @session_id_key, new_session_id())
end
end

defp session_opts(conn) do
[
session_id: HTTP.get_private(conn, @session_id_key),
session_timeout: HTTP.get_private(conn, @session_timeout_key)
]
end

@server_display_name_key :server_display_name
Expand Down
106 changes: 94 additions & 12 deletions test/ch/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1665,22 +1665,90 @@ defmodule Ch.ConnectionTest do
end
end

describe "transactions" do
test "commit", ctx do
DBConnection.transaction(ctx.conn, fn conn ->
ctx = Map.put(ctx, :conn, conn)
parameterize_query!(ctx, "select 1 + 1")
end)
describe "transactions when supported" do
setup %{conn: conn} do
if transactions_supported?(conn) do
table = "transaction_t_#{System.unique_integer([:positive])}"
Ch.query!(conn, "create table #{table}(id UInt8) engine = MergeTree order by tuple()")
{:ok, table: table, transactions_supported?: true}
else
{:ok, table: nil, transactions_supported?: false}
end
end

test "rollback", ctx do
DBConnection.transaction(ctx.conn, fn conn ->
DBConnection.rollback(conn, :some_reason)
end)
test "commit persists rows and resets status", %{
conn: conn,
table: table,
transactions_supported?: transactions_supported?
} do
if transactions_supported? do
assert DBConnection.status(conn) == :idle

assert {:ok, :committed} =
DBConnection.transaction(conn, fn conn ->
assert DBConnection.status(conn) == :transaction

assert {:ok, %{num_rows: 1}} =
Ch.query(conn, "insert into #{table} values (1)")

assert Ch.query!(conn, "select count() from #{table}").rows == [[1]]
:committed
end)

assert DBConnection.status(conn) == :idle
assert Ch.query!(conn, "select count() from #{table}").rows == [[1]]
end
end

test "status", ctx do
assert DBConnection.status(ctx.conn) == :idle
test "rollback discards rows and resets status", %{
conn: conn,
table: table,
transactions_supported?: transactions_supported?
} do
if transactions_supported? do
assert DBConnection.status(conn) == :idle

assert {:error, :rolled_back} =
DBConnection.transaction(conn, fn conn ->
assert DBConnection.status(conn) == :transaction

assert {:ok, %{num_rows: 1}} =
Ch.query(conn, "insert into #{table} values (1)")

assert Ch.query!(conn, "select count() from #{table}").rows == [[1]]
DBConnection.rollback(conn, :rolled_back)
end)

assert DBConnection.status(conn) == :idle
assert Ch.query!(conn, "select count() from #{table}").rows == [[0]]
end
end

test "query errors mark the transaction as failed", %{
conn: conn,
transactions_supported?: transactions_supported?
} do
if transactions_supported? do
assert_raise Ch.Error, "cannot commit a failed transaction; rollback is required", fn ->
DBConnection.transaction(conn, fn conn ->
assert DBConnection.status(conn) == :transaction

assert {:error, %Ch.Error{}} =
Ch.query(conn, "select missing_transaction_column")

assert DBConnection.status(conn) == :error
:ok
end)
end

assert DBConnection.status(conn) == :idle
end
end
end

describe "transaction status" do
test "is idle outside of a transaction", %{conn: conn} do
assert DBConnection.status(conn) == :idle
end
end

Expand Down Expand Up @@ -1827,4 +1895,18 @@ defmodule Ch.ConnectionTest do
assert List.last(row) == 1000
end
end

defp transactions_supported?(conn) do
case Ch.query(conn, "BEGIN TRANSACTION") do
{:ok, _result} ->
Ch.query!(conn, "ROLLBACK")
true

{:error, %Ch.Error{code: 48}} ->
false

{:error, error} ->
raise error
end
end
end
Loading
Loading