Skip to content

Commit

Permalink
[JET-1762] schemaless 增加 insert_all/4 (#46)
Browse files Browse the repository at this point in the history
  • Loading branch information
vanppo authored Aug 1, 2024
1 parent 9178b56 commit 853e083
Show file tree
Hide file tree
Showing 13 changed files with 332 additions and 14 deletions.
16 changes: 15 additions & 1 deletion .github/workflows/elixir.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,21 @@ jobs:
test:
runs-on: ubuntu-latest
needs: mix

services:
postgres:
image: postgres:15-alpine
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: jet_ext_test
ports:
- 5432:5432
# Set health checks to wait until postgres has started
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- name: Setup
uses: byzanteam/jet-actions/setup-elixir@main
Expand Down
10 changes: 10 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import Config

config :jet_ext, ecto_repos: [JetExt.Repo]

config :jet_ext, JetExt.Repo,
migration_foreign_key: [type: :binary_id],
migration_primary_key: [type: :binary_id],
migration_timestamps: [type: :utc_datetime_usec]

import_config "#{config_env()}.exs"
1 change: 1 addition & 0 deletions config/dev.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
import Config
8 changes: 8 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import Config

config :jet_ext, JetExt.Repo,
hostname: "localhost",
database: "jet_ext_test",
username: "postgres",
password: "postgres",
pool: Ecto.Adapters.SQL.Sandbox
53 changes: 53 additions & 0 deletions lib/jet_ext/ecto/schemaless/repo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ defmodule JetExt.Ecto.Schemaless.Repo do

@type row() :: map()

@typep result() :: {non_neg_integer(), nil | [term()]}

@spec insert(Ecto.Repo.t(), Schema.t(), row() | changeset, options :: keyword()) ::
{:ok, row()} | {:error, changeset | term()}
when changeset: Ecto.Changeset.t(row())
Expand All @@ -15,6 +17,57 @@ defmodule JetExt.Ecto.Schemaless.Repo do
apply_action(:insert, repo, schema, changeset, options)
end

@spec insert_all(
repo :: Ecto.Repo.t(),
schema :: Schema.t(),
entries :: [map()],
opts :: keyword()
) :: {:ok, result()} | {:error, Ecto.Changeset.t()}
def insert_all(repo, schema, entries, opts \\ []) do
options =
case schema.prefix do
nil -> opts
prefix -> Keyword.put_new(opts, :prefix, prefix)
end

with {:ok, rows} <- build_rows(schema, entries) do
{:ok, repo.insert_all(schema.source, rows, options)}
end
end

defp build_rows(schema, entries) do
entries
|> Enum.reduce_while({:ok, []}, fn entry, {:ok, acc} ->
case build_row(schema, entry) do
{:ok, values} -> {:cont, {:ok, [values | acc]}}
{:error, changeset} -> {:halt, {:error, changeset}}
end
end)
|> case do
{:ok, rows} -> {:ok, Enum.reverse(rows)}
{:error, reason} -> {:error, reason}
end
end

defp build_row(schema, entry) do
schema
|> Schema.changeset()
|> Ecto.Changeset.cast(entry, Map.keys(schema.types))
|> Map.put(:action, :insert)
|> case do
%Ecto.Changeset{valid?: true} = changeset ->
values =
schema
|> Schema.autogenerate_changes(changeset)
|> Ecto.Changeset.apply_changes()

{:ok, Schema.dump!(schema, values)}

%Ecto.Changeset{valid?: false} = changeset ->
{:error, changeset}
end
end

@spec update(Ecto.Repo.t(), Schema.t(), changeset, options :: keyword()) ::
{:ok, row()} | {:error, changeset | term()}
when changeset: Ecto.Changeset.t(row())
Expand Down
15 changes: 11 additions & 4 deletions lib/jet_ext/ecto/schemaless/schema.ex
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,18 @@ defmodule JetExt.Ecto.Schemaless.Schema do

@spec dump!(t(), row()) :: map()
def dump!(%__MODULE__{} = schema, row) do
Map.new(schema.types, fn {field, type} ->
value = Map.get(row, field)
{:ok, value} = Ecto.Type.dump(type, value)
{field, value}
schema.types
|> Stream.flat_map(fn {field, type} ->
case Map.fetch(row, field) do
{:ok, value} ->
{:ok, value} = Ecto.Type.dump(type, value)
[{field, value}]

:error ->
[]
end
end)
|> Map.new()
end

@spec primary_key(t(), row() | Ecto.Changeset.t(row())) :: keyword()
Expand Down
17 changes: 8 additions & 9 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,24 @@ defmodule JetExt.MixProject do
end

def application do
[
extra_applications: [:logger]
]
[extra_applications: [:logger]]
end

defp deps do
[
{:ecto, ">= 3.9.5 and < 4.0.0"},
{:jason, "~> 1.4"},
{:typed_struct, "~> 0.3.0"},
{:postgrex, "~> 0.18.0", optional: true},
{:urn, "~> 1.0", optional: true},
{:plug, "~> 1.14", optional: true},
{:absinthe, "~> 1.7", optional: true},
{:absinthe_relay, "~> 1.5", optional: true},
{:plug, "~> 1.14", optional: true},
{:postgrex, "~> 0.18.0", optional: true},
{:urn, "~> 1.0", optional: true},
{:credo, "~> 1.6", only: [:dev, :test], runtime: false},
{:dialyxir, "~> 1.0", only: [:dev], runtime: false},
{:jason, "~> 1.4"},
{:mimic, "~> 1.7", only: :test},
{:dialyxir, "~> 1.0", only: :dev, runtime: false},
{:ex_doc, "~> 0.31", only: :dev, runtime: false},
{:ecto_sql, "~> 3.11", only: :test},
{:mimic, "~> 1.7", only: :test},
{:polymorphic_embed, "~> 4.1.1", only: :test}
]
end
Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"dialyxir": {:hex, :dialyxir, "1.2.0", "58344b3e87c2e7095304c81a9ae65cb68b613e28340690dfe1a5597fd08dec37", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "61072136427a851674cab81762be4dbeae7679f85b1272b6d25c3a839aff8463"},
"earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"},
"ecto": {:hex, :ecto, "3.11.0", "ff8614b4e70a774f9d39af809c426def80852048440e8785d93a6e91f48fec00", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7769dad267ef967310d6e988e92d772659b11b09a0c015f101ce0fff81ce1f81"},
"ecto_sql": {:hex, :ecto_sql, "3.11.3", "4eb7348ff8101fbc4e6bbc5a4404a24fecbe73a3372d16569526b0cf34ebc195", [:mix], [{:db_connection, "~> 2.4.1 or ~> 2.5", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.11.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.6", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.16 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "e5f36e3d736b99c7fee3e631333b8394ade4bafe9d96d35669fca2d81c2be928"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.33.0", "690562b153153c7e4d455dc21dab86e445f66ceba718defe64b0ef6f0bd83ba0", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "3f69adc28274cb51be37d09b03e4565232862a4b10288a3894587b0131412124"},
"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
Expand Down
57 changes: 57 additions & 0 deletions test/jet_ext/ecto/schemaless/repo_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
defmodule JetExt.Ecto.Schemaless.RepoTest do
use JetExt.Case.Database, async: false

alias JetExt.Ecto.Schemaless.Query
alias JetExt.Ecto.Schemaless.Repo

@movie_name "movies"

@movie_columns [
{:add, :id, :uuid, primary_key: true, auto_generate: true},
{:add, :title, :text, null: false},
{:add, :likes, :numeric, null: false},
{:add, :released, :boolean, null: false},
{:add, :release_date, :date, null: false},
{:add, :created_at, :timestamp, null: false, auto_generate: true},
{:add, :tags, {:array, :text}, null: false}
]

setup :setup_tables

describe "insert_all/4" do
@describetag [tables: [{@movie_name, @movie_columns}]]

test "works" do
schema = build_schema(@movie_name, @movie_columns)

entries = [
%{
"title" => "Longlegs",
"likes" => 1024,
"released" => false,
"release_date" => "2024-12-31",
"tags" => ["Crime", "Horror", "Thriller"]
},
%{
"title" => "Twisters",
"likes" => 1024,
"released" => false,
"release_date" => "2024-12-31",
"tags" => ["Action", "Adventure", "Thriller"]
},
%{
"title" => "Find Me Falling",
"likes" => 1024,
"released" => false,
"release_date" => "2024-12-31",
"tags" => ["Comedy", "Music", "Romance"]
}
]

assert {:ok, {3, nil}} = Repo.insert_all(JetExt.Repo, schema, entries)

assert [%{title: "Longlegs"}, %{title: "Twisters"}, %{title: "Find Me Falling"}] =
JetExt.Repo.all(Query.from(schema))
end
end
end
61 changes: 61 additions & 0 deletions test/jet_ext/ecto/schemaless/schema_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
defmodule JetExt.Ecto.Schemaless.SchemaTest do
use JetExt.Case.Database, async: false

alias JetExt.Ecto.Schemaless.Schema

@movie_name "movies"

@movie_columns [
{:add, :id, :uuid, primary_key: true, auto_generate: true},
{:add, :title, :text, null: false},
{:add, :likes, :numeric, null: false},
{:add, :released, :boolean, null: false},
{:add, :release_date, :date, null: false},
{:add, :created_at, :timestamp, null: false, auto_generate: true},
{:add, :tags, {:array, :text}, null: false}
]

describe "dump!/2" do
setup :build_schema

test "works", ctx do
%{schema: schema} = ctx

row = %{
title: "Longlegs",
likes: 1024,
released: false,
release_date: ~D[2024-12-31],
created_at: ~U[2024-08-01 08:06:29.240326Z],
tags: ["Crime", "Horror", "Thriller"]
}

assert %{
title: "Longlegs",
likes: %Decimal{coef: 1024},
released: false,
release_date: ~D[2024-12-31],
created_at: ~U[2024-08-01 08:06:29.240326Z],
tags: ["Crime", "Horror", "Thriller"]
} = Schema.dump!(schema, row)
end

test "dumps only explicit keys", ctx do
%{schema: schema} = ctx

assert values = Schema.dump!(schema, %{title: "Longlegs"})
assert [:title] = Map.keys(values)
end

test "keeps explicit nil", ctx do
%{schema: schema} = ctx

assert %{title: nil} = values = Schema.dump!(schema, %{title: nil})
assert [:title] = Map.keys(values)
end
end

defp build_schema(_ctx) do
[schema: build_schema(@movie_name, @movie_columns)]
end
end
88 changes: 88 additions & 0 deletions test/support/jet_ext/case/database.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
defmodule JetExt.Case.Database do
use ExUnit.CaseTemplate

alias Ecto.Adapters.SQL.Sandbox
alias JetExt.Ecto.Schemaless.Schema, as: SchemalessSchema

@types %{
{:array, :text} => {:array, :string},
uuid: Ecto.UUID,
text: :string,
numeric: :decimal,
boolean: :boolean,
timestamp: :utc_datetime_usec,
date: :date
}

@generators %{
uuid: {Ecto.UUID, :generate, []},
timestamp: {DateTime, :utc_now, []}
}

using do
quote location: :keep do
import Ecto
import Ecto.Changeset
import Ecto.Query
import unquote(__MODULE__)
end
end

setup ctx do
JetExt.Case.Database.setup_sandbox(ctx)
end

@doc """
Sets up the sandbox based on the test context.
"""
@spec setup_sandbox(ctx :: map()) :: :ok
def setup_sandbox(%{async: async}) do
pid = Sandbox.start_owner!(JetExt.Repo, shared: not async)
on_exit(fn -> Sandbox.stop_owner(pid) end)
end

@spec setup_tables(ctx :: map()) :: :ok
def setup_tables(ctx) do
ctx
|> Map.get(:tables, [])
|> Enum.each(fn {name, columns} ->
table = Ecto.Migration.table(name)

{:ok, _result} = JetExt.Repo.execute_ddl({:create, table, columns})

on_exit({:drop_table, name}, fn ->
{:ok, _result} = JetExt.Repo.execute_ddl({:drop_if_exists, table, :restrict})
end)
end)
end

@spec build_schema(table :: binary, columns :: [term()]) :: SchemalessSchema.t()
def build_schema(table, columns) do
types =
Map.new(columns, fn {:add, name, type, _opts} ->
{name, Map.fetch!(@types, type)}
end)

options = [
source: table,
primary_key: build_primary_key(columns),
autogenerate: build_autogenerate(columns)
]

SchemalessSchema.new(types, options)
end

defp build_primary_key(columns) do
Enum.flat_map(columns, fn {:add, name, _type, opts} ->
if Keyword.get(opts, :primary_key, false), do: [name], else: []
end)
end

defp build_autogenerate(columns) do
Enum.flat_map(columns, fn {:add, name, type, opts} ->
if Keyword.get(opts, :auto_generate, false),
do: [{[name], Map.fetch!(@generators, type)}],
else: []
end)
end
end
13 changes: 13 additions & 0 deletions test/support/jet_ext/repo.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
defmodule JetExt.Repo do
@moduledoc false

use Ecto.Repo, otp_app: :jet_ext, adapter: Ecto.Adapters.Postgres

@spec execute_ddl(command :: term()) :: Postgrex.Result.t()
def execute_ddl(command) do
command
|> Ecto.Adapters.Postgres.Connection.execute_ddl()
|> Enum.map(&IO.iodata_to_binary/1)
|> query()
end
end
Loading

0 comments on commit 853e083

Please sign in to comment.