diff --git a/lib/mongo/mongo_db_connection.ex b/lib/mongo/mongo_db_connection.ex index 9998f2fe..deb07953 100644 --- a/lib/mongo/mongo_db_connection.ex +++ b/lib/mongo/mongo_db_connection.ex @@ -37,6 +37,7 @@ defmodule Mongo.MongoDBConnection do wire_version: 0, auth_mechanism: opts[:auth_mechanism] || nil, connection_type: Keyword.fetch!(opts, :connection_type), + server_pid: Keyword.get(opts, :server_pid), topology_pid: Keyword.fetch!(opts, :topology_pid), stable_api: Keyword.get(opts, :stable_api), use_op_msg: Keyword.get(opts, :stable_api) != nil, @@ -48,8 +49,24 @@ defmodule Mongo.MongoDBConnection do end @impl true - def disconnect(_error, %{connection: {mod, socket}, connection_type: type, topology_pid: pid, host: host}) do - GenServer.cast(pid, {:disconnect, type, host}) + ## the stream monitor disconnects, we change the mode of the parent monitor + def disconnect(_error, %{connection: {mod, socket}, connection_type: :stream_monitor, parent_pid: parent_pid}) do + ## Logger.debug("MongoDB-Connection: disconnected stream monitor: #{inspect(error)}") + GenServer.cast(parent_pid, :stop_streaming_mode) + mod.close(socket) + :ok + end + + def disconnect(_error, %{connection: {mod, socket}, connection_type: :monitor, topology_pid: topology_pid, host: host, server_pid: server_pid}) do + ## Logger.debug("MongoDB-Connection: disconnected: #{inspect(error)}, #{inspect(server_pid)}, #{inspect(host)}, cast disconnect :monitor") + GenServer.cast(server_pid, :stop_streaming_mode) + GenServer.cast(topology_pid, {:disconnect, :monitor, host, server_pid}) + mod.close(socket) + :ok + end + + def disconnect(_error, %{connection: {mod, socket}}) do + ## Logger.debug("MongoDB-Connection: disconnected: #{inspect error}, #{inspect type}, #{inspect host} #{inspect server_pid}") mod.close(socket) :ok end diff --git a/lib/mongo/monitor.ex b/lib/mongo/monitor.ex index d80e984d..75ece731 100644 --- a/lib/mongo/monitor.ex +++ b/lib/mongo/monitor.ex @@ -47,6 +47,10 @@ defmodule Mongo.Monitor do GenServer.cast(pid, :update) end + def stop_streaming_mode(pid) do + GenServer.cast(pid, :stop_streaming_mode) + end + def set_heartbeat_frequency_ms(pid, heartbeat_frequency_ms) do GenServer.cast(pid, {:update, heartbeat_frequency_ms}) end @@ -59,7 +63,7 @@ defmodule Mongo.Monitor do Initialize the monitor process """ def init([address, topology_pid, heartbeat_frequency_ms, connection_opts]) do - ## debug info("Starting monitor process with pid #{inspect self()}, #{inspect address}") + ## Logger.info("Starting monitor process with pid #{inspect(self())}, #{inspect(address)}") # monitors don't authenticate and use the "admin" database opts = @@ -73,6 +77,7 @@ defmodule Mongo.Monitor do |> Keyword.put(:topology_pid, topology_pid) |> Keyword.put(:pool_size, 1) |> Keyword.put(:idle_interval, 5_000) + |> Keyword.put(:server_pid, self()) with {:ok, pid} <- DBConnection.start_link(Mongo.MongoDBConnection, opts) do {:ok, @@ -97,18 +102,17 @@ defmodule Mongo.Monitor do end @doc """ - In case of terminating we stop the our linked processes as well: + In case of terminating we stop our linked processes as well: * connection * streaming process """ def terminate(reason, %{connection_pid: connection_pid, streaming_pid: nil}) do - ## debug info("Terminating monitor for reason #{inspect reason}") + ## Logger.debug("Terminating monitor #{inspect(self())} for reason #{inspect(reason)}") GenServer.stop(connection_pid, reason) end def terminate(reason, %{connection_pid: connection_pid, streaming_pid: streaming_pid}) do - ## debug info("Terminating monitor for reason #{inspect reason}, #{inspect self()}, #{inspect streaming_pid}") - + ## Logger.debug("Terminating monitor #{inspect(self())} for reason #{inspect(reason)}, #{inspect(streaming_pid)}") GenServer.stop(connection_pid, reason) GenServer.stop(streaming_pid, reason) end @@ -117,6 +121,7 @@ defmodule Mongo.Monitor do Report the connection event, so the topology process can now create the connection pool. """ def connected(_connection, me, topology_pid) do + ## Logger.info("Monitor #{inspect(me)} connected to server! ") Topology.monitor_connected(topology_pid, me) GenServer.cast(me, :update) end @@ -125,6 +130,15 @@ defmodule Mongo.Monitor do {:reply, Map.put(state, :pid, self()), state} end + def handle_cast(:stop_streaming_mode, %{streaming_pid: streaming_pid} = state) when streaming_pid != nil do + spawn(fn -> GenServer.stop(streaming_pid) end) + {:noreply, %{state | mode: :polling_mode, streaming_pid: nil}} + end + + def handle_cast(:stop_streaming_mode, state) do + {:noreply, %{state | mode: :polling_mode}} + end + ## # Update the server description or the rrt value ## @@ -207,11 +221,11 @@ defmodule Mongo.Monitor do # Starts the streaming mode ## defp start_streaming_mode(%{address: address, topology_pid: topology_pid, opts: opts} = state, _server_description) do - args = [topology_pid, address, opts] + args = [self(), topology_pid, address, opts] case StreamingHelloMonitor.start_link(args) do {:ok, pid} -> - ## debug info("Starting streaming mode: #{inspect self()}") + ## Logger.debug("Starting streaming mode: #{inspect(pid)}") %{state | mode: :streaming_mode, streaming_pid: pid, heartbeat_frequency_ms: 10_000} error -> diff --git a/lib/mongo/password_safe.ex b/lib/mongo/password_safe.ex index a9b55c2e..c52eb90f 100644 --- a/lib/mongo/password_safe.ex +++ b/lib/mongo/password_safe.ex @@ -10,7 +10,7 @@ defmodule Mongo.PasswordSafe do use GenServer - def new() do + def start_link() do GenServer.start_link(@me, []) end diff --git a/lib/mongo/repo.ex b/lib/mongo/repo.ex index f3c04a40..1b4d007b 100644 --- a/lib/mongo/repo.ex +++ b/lib/mongo/repo.ex @@ -457,7 +457,7 @@ defmodule Mongo.Repo do @callback update(doc :: Mongo.Collection.t(), opts :: Keyword.t()) :: {:ok, Mongo.Collection.t()} | {:error, any()} @doc """ - Same as `c:update/1` but raises an error. + Same as `c:update/2` but raises an error. """ @callback update!(doc :: Mongo.Collection.t(), opts :: Keyword.t()) :: Mongo.Collection.t() @@ -471,7 +471,7 @@ defmodule Mongo.Repo do @callback insert_or_update(doc :: Mongo.Collection.t(), opts :: Keyword.t()) :: {:ok, Mongo.Collection.t()} | {:error, any()} @doc """ - Same as `c:insert_or_update/1` but raises an error. + Same as `c:insert_or_update/2` but raises an error. """ @callback insert_or_update!(doc :: Mongo.Collection.t(), opts :: Keyword.t()) :: Mongo.Collection.t() diff --git a/lib/mongo/streaming_hello_monitor.ex b/lib/mongo/streaming_hello_monitor.ex index 05d7dc5a..df861c0a 100644 --- a/lib/mongo/streaming_hello_monitor.ex +++ b/lib/mongo/streaming_hello_monitor.ex @@ -30,7 +30,7 @@ defmodule Mongo.StreamingHelloMonitor do @doc """ Initialize the monitor process """ - def init([topology_pid, address, opts]) do + def init([monitor_pid, topology_pid, address, opts]) do heartbeat_frequency_ms = 10_000 opts = @@ -38,6 +38,8 @@ defmodule Mongo.StreamingHelloMonitor do |> Keyword.drop([:after_connect]) |> Keyword.put(:after_connect, {StreamingHelloMonitor, :connected, [self()]}) |> Keyword.put(:connection_type, :stream_monitor) + |> Keyword.put(:server_pid, self()) + |> Keyword.put(:monitor_pid, monitor_pid) ## debug info("Starting stream hello monitor with options #{inspect(opts, pretty: true)}") @@ -65,7 +67,7 @@ defmodule Mongo.StreamingHelloMonitor do In this case we stop the DBConnection. """ def terminate(reason, %{connection_pid: connection_pid}) do - ## debug info("Terminating streaming hello monitor for reason #{inspect reason}") + ## Logger.debug("Terminating streaming hello monitor for reason #{inspect(reason)}") GenServer.stop(connection_pid, reason) end @@ -84,7 +86,7 @@ defmodule Mongo.StreamingHelloMonitor do end def handle_info({:EXIT, _pid, reason}, state) do - ## debug Logger.warn("Stopped with reason #{inspect reason}") + Logger.warning("Stopped with reason #{inspect(reason)}") {:stop, reason, state} end diff --git a/lib/mongo/topology.ex b/lib/mongo/topology.ex index 1a133974..05581b12 100644 --- a/lib/mongo/topology.ex +++ b/lib/mongo/topology.ex @@ -79,7 +79,6 @@ defmodule Mongo.Topology do GenServer.call(pid, :get_state) end - # 97 def select_server(pid, type, opts \\ []) do timeout = Keyword.get(opts, :checkout_timeout, @default_checkout_timeout) GenServer.call(pid, {:select_server, type, opts}, timeout) @@ -109,7 +108,7 @@ defmodule Mongo.Topology do end def stop(pid) do - GenServer.stop(pid) + GenServer.stop(pid, :stop) end ## GenServer Callbacks @@ -163,14 +162,7 @@ defmodule Mongo.Topology do end end - def terminate(_reason, state) do - case state.opts[:pw_safe] do - nil -> nil - pid -> GenServer.stop(pid) - end - - Enum.each(state.connection_pools, fn {_address, pid} -> GenServer.stop(pid) end) - Enum.each(state.monitors, fn {_address, pid} -> GenServer.stop(pid) end) + def terminate(_reason, _state) do Mongo.Events.notify(%TopologyClosedEvent{topology_pid: self()}) end @@ -204,18 +196,25 @@ defmodule Mongo.Topology do ## # In case of :monitor or :stream_monitor we mark the server description of the address as unknown ## - def handle_cast({:disconnect, kind, address}, state) when kind in [:monitor, :stream_monitor] do - server_description = ServerDescription.parse_hello_response(address, "#{inspect(kind)} disconnected") + def handle_cast({:disconnect, :monitor, address, pid}, state) do + server_description = ServerDescription.parse_hello_response(address, "monitor disconnected") + ## Logger.debug("Disconnect monitor with #{inspect(pid)}") new_state = address - |> remove_address(state) + |> close_connection_pool(pid, state) |> maybe_reinit() handle_cast({:server_description, server_description}, new_state) end - def handle_cast({:disconnect, _kind, _host}, state) do + def handle_cast({:disconnect, :stream_monitor, _host, _pid}, state) do + ## IO.inspect("ignored: kind stream_monitor with #{inspect pid}") + {:noreply, state} + end + + def handle_cast({:disconnect, _kind, _host, _pid}, state) do + ## IO.inspect("ignored: kind #{inspect kind}") {:noreply, state} end @@ -233,6 +232,7 @@ defmodule Mongo.Topology do {host, ^monitor_pid} -> arbiters = fetch_arbiters(state) + Mongo.Events.notify(%ServerOpeningEvent{address: host, topology_pid: self()}) if host in arbiters do state @@ -243,8 +243,9 @@ defmodule Mongo.Topology do |> Keyword.put(:topology_pid, self()) |> connect_opts_from_address(host) + ## Logger.debug("Starting connection pool for #{inspect(host)}") {:ok, pool} = DBConnection.start_link(Mongo.MongoDBConnection, conn_opts) - connection_pools = Map.put(state.connection_pools, host, pool) + connection_pools = replace_pool(state.connection_pools, host, pool) Process.send_after(self(), {:new_connection, state.waiting_pids}, 10) @@ -279,6 +280,49 @@ defmodule Mongo.Topology do {:noreply, state} end + ## remove the address only if the pid is the same + defp close_connection_pool(address, pid, state) do + ## Logger.debug("Closing connection pool by pid: #{inspect(state.monitors[address] == pid)}, #{inspect(pid)}, #{inspect(state.monitors[address])}") + + case state.monitors[address] == pid do + true -> + Mongo.Events.notify(%ServerClosedEvent{address: address, topology_pid: self()}) + ## stopping the connection pool + case state.connection_pools[address] do + nil -> + :ok + + pid -> + if Process.alive?(pid) do + ## Logger.debug("Stopping the connection pool #{inspect(pid)} für #{inspect(address)}") + GenServer.stop(pid) + end + end + + %{state | connection_pools: Map.delete(state.connection_pools, address)} + + false -> + state + end + end + + ## replaces a pool for the host address + defp replace_pool(connection_pools, host, pool) do + ## if we found an existing pool, we will stop it first + case Map.get(connection_pools, host) do + nil -> + :noop + + pid -> + if Process.alive?(pid) do + ## Logger.debug("Stopping the connection pool #{inspect(pid)}") + GenServer.stop(pid) + end + end + + Map.put(connection_pools, host, pool) + end + ## # Update server description: in case of logical session the function creates a session pool for the `deployment`. # @@ -510,9 +554,6 @@ defmodule Mongo.Topology do Enum.reduce(added, state, fn address, state -> server_description = state.topology.servers[address] connopts = connect_opts_from_address(state.opts, address) - - Mongo.Events.notify(%ServerOpeningEvent{address: address, topology_pid: self()}) - args = [server_description.address, self(), heartbeat_frequency_ms, Keyword.put(connopts, :pool, DBConnection.ConnectionPool)] {:ok, pid} = Monitor.start_link(args) @@ -549,16 +590,23 @@ defmodule Mongo.Topology do end defp remove_address(address, state) do - Mongo.Events.notify(%ServerClosedEvent{address: address, topology_pid: self()}) - case state.monitors[address] do - nil -> :ok - pid -> GenServer.stop(pid) + nil -> + :ok + + pid -> + Mongo.Events.notify(%ServerClosedEvent{address: address, topology_pid: self()}) + ## Logger.debug("Stopping: #{inspect(pid)} for #{inspect(address)}") + GenServer.stop(pid) end case state.connection_pools[address] do - nil -> :ok - pid -> GenServer.stop(pid) + nil -> + :ok + + pid -> + ## Logger.debug("Connection pool: #{inspect(address)}") + GenServer.stop(pid) end %{state | monitors: Map.delete(state.monitors, address), connection_pools: Map.delete(state.connection_pools, address)} diff --git a/lib/mongo/url_parser.ex b/lib/mongo/url_parser.ex index 0fe9a5e6..41cd6b71 100644 --- a/lib/mongo/url_parser.ex +++ b/lib/mongo/url_parser.ex @@ -168,7 +168,7 @@ defmodule Mongo.UrlParser do value -> ## start GenServer and put id - with {:ok, pid} <- Mongo.PasswordSafe.new(), + with {:ok, pid} <- Mongo.PasswordSafe.start_link(), :ok <- Mongo.PasswordSafe.set_password(pid, value) do opts |> Keyword.put(:password, "*****") diff --git a/mix.exs b/mix.exs index 7cdbaaa9..60398961 100644 --- a/mix.exs +++ b/mix.exs @@ -37,7 +37,7 @@ defmodule Mongodb.Mixfile do {:patch, "~> 0.12.0", only: [:dev, :test]}, {:jason, "~> 1.3", only: [:dev, :test]}, {:credo, "~> 1.7.0", only: [:dev, :test], runtime: false}, - {:ex_doc, ">= 0.0.0", only: :dev, runtime: false} + {:ex_doc, "== 0.24.1", only: :dev, runtime: false} ] end diff --git a/mix.lock b/mix.lock index 163ce3b2..3dcb94ec 100644 --- a/mix.lock +++ b/mix.lock @@ -4,7 +4,7 @@ "db_connection": {:hex, :db_connection, "2.6.0", "77d835c472b5b67fc4f29556dee74bf511bbafecdcaf98c27d27fa5918152086", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c2f992d15725e721ec7fbc1189d4ecdb8afef76648c746a8e1cad35e3b8a35f3"}, "decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"}, "earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"}, - "ex_doc": {:hex, :ex_doc, "0.31.1", "8a2355ac42b1cc7b2379da9e40243f2670143721dd50748bf6c3b1184dae2089", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "3178c3a407c557d8343479e1ff117a96fd31bafe52a039079593fb0524ef61b0"}, + "ex_doc": {:hex, :ex_doc, "0.24.1", "15673de99154f93ca7f05900e4e4155ced1ee0cd34e0caeee567900a616871a4", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "07972f17bdf7dc7b5bd76ec97b556b26178ed3f056e7ec9288eb7cea7f91cce2"}, "file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"}, "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, "makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"}, diff --git a/test/mongo/password_safe_test.exs b/test/mongo/password_safe_test.exs index 316c885c..58699161 100644 --- a/test/mongo/password_safe_test.exs +++ b/test/mongo/password_safe_test.exs @@ -7,7 +7,7 @@ defmodule Mongo.PasswordSafeTest do test "encrypted password" do pw = "my-secret-password" - {:ok, pid} = PasswordSafe.new() + {:ok, pid} = PasswordSafe.start_link() PasswordSafe.set_password(pid, pw) %{key: _key, pw: enc_pw} = :sys.get_state(pid) assert enc_pw != pw