Skip to content

Commit

Permalink
map bags to indices
Browse files Browse the repository at this point in the history
  • Loading branch information
nics committed Jan 24, 2019
1 parent 4fc376e commit 6c4b39c
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 56 deletions.
33 changes: 5 additions & 28 deletions lib/Catmandu/Store/ElasticSearch.pm
Original file line number Diff line number Diff line change
Expand Up @@ -4,49 +4,26 @@ use Catmandu::Sane;

our $VERSION = '0.0512';

use Moo;
use Search::Elasticsearch;
use Catmandu::Util qw(is_instance);
use Catmandu::Store::ElasticSearch::Bag;
use Moo;
use namespace::clean;

with 'Catmandu::Store';
with 'Catmandu::Droppable';

has index_name => (is => 'ro', required => 1);
has index_settings => (is => 'ro', lazy => 1, default => sub {+{}});
has index_mappings => (is => 'ro', lazy => 1, default => sub {+{}});
has _es_args => (is => 'rw', lazy => 1, default => sub {+{}});
has _es_args => (is => 'rw', lazy => 1, default => sub {+{}});
has es => (is => 'lazy');

# used internally
has is_es_1_or_2 => (is => 'lazy');

sub _build_es {
my ($self) = @_;
my $es = Search::Elasticsearch->new($self->_es_args);
unless ($es->indices->exists(index => $self->index_name)) {
$es->indices->create(
index => $self->index_name,
body => {
settings => $self->index_settings,
mappings => $self->index_mappings,
},
);
}
$es;
}
has is_es_1_or_2 => (is => 'lazy', init_arg => undef);

sub BUILD {
my ($self, $args) = @_;

# TODO filter out own args
$self->_es_args($args);
}

sub drop {
sub _build_es {
my ($self) = @_;
$self->es->indices->delete(index => $self->index_name);
Search::Elasticsearch->new($self->_es_args);
}

sub _build_is_es_1_or_2 {
Expand Down
70 changes: 52 additions & 18 deletions lib/Catmandu/Store/ElasticSearch/Bag.pm
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,41 @@ use Catmandu::Sane;

our $VERSION = '0.0512';

use Moo;
use Catmandu::Hits;
use Cpanel::JSON::XS qw(encode_json decode_json);
use Catmandu::Store::ElasticSearch::Searcher;
use Catmandu::Store::ElasticSearch::CQL;
use Catmandu::Util qw(is_code_ref is_string);
use Moo;
use namespace::clean;

with 'Catmandu::Bag';
with 'Catmandu::Droppable';
with 'Catmandu::CQLSearchable';

has type => (is => 'ro', lazy => 1);
has buffer_size => (is => 'ro', lazy => 1, builder => 'default_buffer_size');
has _bulk => (is => 'ro', lazy => 1);
has cql_mapping => (is => 'ro');
has on_error => (is => 'ro', default => sub {'log'});
has index => (is => 'lazy');
has settings => (is => 'lazy');
has mapping => (is => 'lazy');
has type => (is => 'lazy');
has buffer_size => (is => 'lazy', builder => 'default_buffer_size');
has _bulk => (is => 'lazy');
has cql_mapping => (is => 'ro');
has on_error => (is => 'lazy');

sub BUILD {
my ($self) = @_;
my $es = $self->store->es;
unless ($es->indices->exists(index => $self->index)) {
$es->indices->create(
index => $self->index,
body => {
settings => $self->settings,
mappings => { $self->type => $self->mapping },
},
);
}
$es;
}

sub default_buffer_size {100}

Expand Down Expand Up @@ -49,6 +68,22 @@ sub _coerce_on_error {
"on_error should be code ref, 'throw', 'log', or 'ignore'");
}

sub _build_on_error {
'log';
}

sub _build_settings {
+{};
}

sub _build_mapping {
+{};
}

sub _build_index {
$_[0]->name;
}

sub _build_type {
$_[0]->name;
}
Expand All @@ -57,7 +92,7 @@ sub _build__bulk {
my ($self) = @_;
my $on_error = $self->_coerce_on_error($self->on_error);
my %args = (
index => $self->store->index_name,
index => $self->index,
type => $self->type,
max_count => $self->buffer_size,
on_error => $on_error,
Expand All @@ -77,7 +112,7 @@ sub generator {
sub {
state $scroll = do {
my %args = (
index => $self->store->index_name,
index => $self->index,
type => $self->type,
size => $self->buffer_size, # TODO divide by number of shards
body => {query => {match_all => {}},},
Expand All @@ -100,7 +135,7 @@ sub generator {
sub count {
my ($self) = @_;
$self->store->es->count(
index => $self->store->index_name,
index => $self->index,
type => $self->type,
)->{count};
}
Expand All @@ -109,7 +144,7 @@ sub get {
my ($self, $id) = @_;
try {
my $data = $self->store->es->get_source(
index => $self->store->index_name,
index => $self->index,
type => $self->type,
id => $id,
);
Expand All @@ -136,7 +171,7 @@ sub delete_all {
my $es = $self->store->es;
if ($es->can('delete_by_query')) {
$es->delete_by_query(
index => $self->store->index_name,
index => $self->index,
type => $self->type,
body => {query => {match_all => {}},},
);
Expand All @@ -145,7 +180,7 @@ sub delete_all {
$es->transport->perform_request(
method => 'DELETE',
path => '/'
. $self->store->index_name . '/'
. $self->index . '/'
. $self->type
. '/_query',
body => {query => {match_all => {}},}
Expand All @@ -158,7 +193,7 @@ sub delete_by_query {
my $es = $self->store->es;
if ($es->can('delete_by_query')) {
$es->delete_by_query(
index => $self->store->index_name,
index => $self->index,
type => $self->type,
body => {query => $args{query},},
);
Expand All @@ -167,7 +202,7 @@ sub delete_by_query {
$es->transport->perform_request(
method => 'DELETE',
path => '/'
. $self->store->index_name . '/'
. $self->index . '/'
. $self->type
. '/_query',
body => {query => $args{query},}
Expand All @@ -180,7 +215,7 @@ sub commit {
$self->_bulk->flush;
$self->store->es->transport->perform_request(
method => 'POST',
path => '/' . $self->store->index_name . '/_refresh',
path => '/' . $self->index . '/_refresh',
);
}

Expand All @@ -198,7 +233,7 @@ sub search {
}

my $res = $self->store->es->search(
index => $self->store->index_name,
index => $self->index,
type => $self->type,
body => {%args, from => $start, size => $limit,},
);
Expand Down Expand Up @@ -309,8 +344,7 @@ sub normalize_sort {

sub drop {
my ($self) = @_;
$self->delete_all;
$self->commit;
$self->store->es->indices->delete(index => $self->index);
}

1;
Expand Down
19 changes: 10 additions & 9 deletions lib/Catmandu/Store/ElasticSearch/Searcher.pm
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ has sort => (is => 'ro');

sub generator {
my ($self) = @_;
my $store = $self->bag->store;
my $id_key = $self->bag->id_key;
my $bag = $self->bag;
my $store = $bag->store;
my $id_key = $bag->id_key;
sub {
state $total = $self->total;
if (defined $total) {
Expand All @@ -30,11 +31,11 @@ sub generator {
my $body = {query => $self->query};
$body->{sort} = $self->sort if $self->sort;
my %args = (
index => $store->index_name,
type => $self->bag->type,
index => $bag->index,
type => $bag->type,
from => $self->start,
size =>
$self->bag->buffer_size, # TODO divide by number of shards
$bag->buffer_size, # TODO divide by number of shards
body => $body,
);
if (!$self->sort && $store->is_es_1_or_2) {
Expand Down Expand Up @@ -71,10 +72,10 @@ sub slice { # TODO constrain total?

sub count {
my ($self) = @_;
my $store = $self->bag->store;
$store->es->count(
index => $store->index_name,
type => $self->bag->type,
my $bag = $self->bag;
$bag->store->es->count(
index => $bag->index,
type => $bag->type,
body => {query => $self->query,},
)->{count};
}
Expand Down
2 changes: 1 addition & 1 deletion t/00.t
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ my @pkgs = qw(

require_ok $_ for @pkgs;

done_testing 4;
done_testing;

0 comments on commit 6c4b39c

Please sign in to comment.