diff --git a/src/pgsql_connection.erl b/src/pgsql_connection.erl index 81c3c17..84372ec 100644 --- a/src/pgsql_connection.erl +++ b/src/pgsql_connection.erl @@ -14,8 +14,8 @@ -export([startup/3, auth/2, initializing/2, ready/2, ready/3]). -export([querying/2, parsing/2, binding/2, describing/2]). --export([executing/2, closing/2, synchronizing/2, timeout/2]). --export([aborted/3]). +-export([executing/2, closing/2, synchronizing/2, synchronizing/3]). +-export([timeout/2]). -include("pgsql.hrl"). @@ -27,6 +27,8 @@ reply, reply_to, async, + autosync=false, + callers=[], backend, statement, txstatus}). @@ -46,31 +48,31 @@ connect(C, Host, Username, Password, Opts) -> gen_fsm:sync_send_event(C, {connect, Host, Username, Password, Opts}, infinity). get_parameter(C, Name) -> - gen_fsm:sync_send_event(C, {get_parameter, to_binary(Name)}). + sync_send_event(C, {get_parameter, to_binary(Name)}, infinity). squery(C, Sql) -> - gen_fsm:sync_send_event(C, {squery, Sql}, infinity). + sync_send_event(C, {squery, Sql}, infinity). equery(C, Statement, Parameters) -> - gen_fsm:sync_send_event(C, {equery, Statement, Parameters}, infinity). + sync_send_event(C, {equery, Statement, Parameters}, infinity). parse(C, Name, Sql, Types) -> - gen_fsm:sync_send_event(C, {parse, Name, Sql, Types}, infinity). + sync_send_event(C, {parse, Name, Sql, Types}, infinity). bind(C, Statement, PortalName, Parameters) -> - gen_fsm:sync_send_event(C, {bind, Statement, PortalName, Parameters}, infinity). + sync_send_event(C, {bind, Statement, PortalName, Parameters}, infinity). execute(C, Statement, PortalName, MaxRows) -> - gen_fsm:sync_send_event(C, {execute, Statement, PortalName, MaxRows}, infinity). + sync_send_event(C, {execute, Statement, PortalName, MaxRows}, infinity). describe(C, Type, Name) -> - gen_fsm:sync_send_event(C, {describe, Type, Name}, infinity). + sync_send_event(C, {describe, Type, Name}, infinity). close(C, Type, Name) -> - gen_fsm:sync_send_event(C, {close, Type, Name}, infinity). + sync_send_event(C, {close, Type, Name}, infinity). sync(C) -> - gen_fsm:sync_send_event(C, sync, infinity). + sync_send_event(C, sync, infinity). %% -- gen_fsm implementation -- @@ -295,6 +297,12 @@ ready(sync, From, State) -> State2 = State#state{reply = ok, reply_to = From}, {next_state, synchronizing, State2, Timeout}. +%% If a caller sends a request while the connection is autosyncing +%% store the caller's pid and tell them to wait. +synchronizing(_Msg, {Pid, _Tag}, #state{callers = Callers, autosync = true} = State) -> + {reply, {wait, synchronizing}, synchronizing, State#state{callers = [Pid|Callers]}}. + + %% BindComplete querying({$2, <<>>}, State) -> #state{timeout = Timeout, statement = #statement{columns = Columns}} = State, @@ -470,7 +478,11 @@ executing(timeout, State) -> executing({error, E}, State) -> #state{timeout = Timeout} = State, notify(State, {error, E}), - {next_state, aborted, State, Timeout}. + %% Send sync command to database and transition immediately + %% to synchronizing state. This automatically handles the + %% case where the driver needs to checkpoint after an error + send(State, $S, []), + {next_state, synchronizing, State#state{autosync = true}, Timeout}. %% CloseComplete closing({$3, <<>>}, State) -> @@ -500,9 +512,8 @@ synchronizing(timeout, State) -> %% ReadyForQuery synchronizing({$Z, <>}, State) -> - #state{reply = Reply, reply_to = Reply_To} = State, - gen_fsm:reply(Reply_To, Reply), - {next_state, ready, State#state{reply = undefined, txstatus = Status}}. + State1 = maybe_reply(State), + {next_state, ready, State1#state{reply = undefined, txstatus = Status, autosync = false}}. timeout({$Z, <>}, State) -> notify(State, timeout), @@ -516,16 +527,6 @@ timeout(_Event, State) -> #state{timeout = Timeout} = State, {next_state, timeout, State, Timeout}. -aborted(sync, From, State) -> - #state{timeout = Timeout} = State, - send(State, $S, []), - State2 = State#state{reply = ok, reply_to = From}, - {next_state, synchronizing, State2, Timeout}; - -aborted(_Msg, _From, State) -> - #state{timeout = Timeout} = State, - {reply, {error, sync_required}, aborted, State, Timeout}. - %% -- internal functions -- %% decode data @@ -659,3 +660,25 @@ hex(Bin) -> send(#state{sock = Sock}, Type, Data) -> pgsql_sock:send(Sock, Type, Data). + +%% If autosyncing, tell callers we are done so they can +%% retry their request +maybe_reply(#state{autosync = true, callers = Callers} = State) -> + [Caller ! {ok, proceed} || Caller <- Callers], + State#state{callers = []}; +%% If not autosyncing, send FSM reply as usual +maybe_reply(#state{reply = Reply, reply_to = Reply_To, autosync = false} = State) -> + gen_fsm:reply(Reply_To, Reply), + State. + +%% send event and handle connection autosync +sync_send_event(C, Event, Timeout) -> + case gen_fsm:sync_send_event(C, Event, Timeout) of + {wait, synchronizing} -> + receive + {ok, proceed} -> + sync_send_event(C, Event, Timeout) + end; + R -> + R + end. diff --git a/test_src/pgsql_tests.erl b/test_src/pgsql_tests.erl index e3c0031..8404549 100644 --- a/test_src/pgsql_tests.erl +++ b/test_src/pgsql_tests.erl @@ -266,7 +266,7 @@ execute_error_test() -> {ok, S} = pgsql:parse(C, "insert into test_table1 (id, value) values ($1, $2)"), ok = pgsql:bind(C, S, [1, <<"foo">>]), {error, #error{code = <<"23505">>}} = pgsql:execute(C, S, 0), - {error, sync_required} = pgsql:bind(C, S, [3, <<"quux">>]), + ok = pgsql:bind(C, S, [3, <<"quux">>]), ok = pgsql:sync(C), ok = pgsql:bind(C, S, [3, <<"quux">>]), {ok, _} = pgsql:execute(C, S, 0),