diff --git a/lib/Catmandu/Store/ElasticSearch.pm b/lib/Catmandu/Store/ElasticSearch.pm index e2c7fa0..75ed54d 100644 --- a/lib/Catmandu/Store/ElasticSearch.pm +++ b/lib/Catmandu/Store/ElasticSearch.pm @@ -4,49 +4,26 @@ use Catmandu::Sane; our $VERSION = '0.0512'; -use Moo; use Search::Elasticsearch; use Catmandu::Util qw(is_instance); use Catmandu::Store::ElasticSearch::Bag; +use Moo; use namespace::clean; with 'Catmandu::Store'; -with 'Catmandu::Droppable'; -has index_name => (is => 'ro', required => 1); -has index_settings => (is => 'ro', lazy => 1, default => sub {+{}}); -has index_mappings => (is => 'ro', lazy => 1, default => sub {+{}}); -has _es_args => (is => 'rw', lazy => 1, default => sub {+{}}); +has _es_args => (is => 'rw', lazy => 1, default => sub {+{}}); has es => (is => 'lazy'); - -# used internally -has is_es_1_or_2 => (is => 'lazy'); - -sub _build_es { - my ($self) = @_; - my $es = Search::Elasticsearch->new($self->_es_args); - unless ($es->indices->exists(index => $self->index_name)) { - $es->indices->create( - index => $self->index_name, - body => { - settings => $self->index_settings, - mappings => $self->index_mappings, - }, - ); - } - $es; -} +has is_es_1_or_2 => (is => 'lazy', init_arg => undef); sub BUILD { my ($self, $args) = @_; - - # TODO filter out own args $self->_es_args($args); } -sub drop { +sub _build_es { my ($self) = @_; - $self->es->indices->delete(index => $self->index_name); + Search::Elasticsearch->new($self->_es_args); } sub _build_is_es_1_or_2 { diff --git a/lib/Catmandu/Store/ElasticSearch/Bag.pm b/lib/Catmandu/Store/ElasticSearch/Bag.pm index 08c8b52..0171733 100644 --- a/lib/Catmandu/Store/ElasticSearch/Bag.pm +++ b/lib/Catmandu/Store/ElasticSearch/Bag.pm @@ -4,22 +4,41 @@ use Catmandu::Sane; our $VERSION = '0.0512'; -use Moo; use Catmandu::Hits; use Cpanel::JSON::XS qw(encode_json decode_json); use Catmandu::Store::ElasticSearch::Searcher; use Catmandu::Store::ElasticSearch::CQL; use Catmandu::Util qw(is_code_ref is_string); +use Moo; +use namespace::clean; with 'Catmandu::Bag'; with 'Catmandu::Droppable'; with 'Catmandu::CQLSearchable'; -has type => (is => 'ro', lazy => 1); -has buffer_size => (is => 'ro', lazy => 1, builder => 'default_buffer_size'); -has _bulk => (is => 'ro', lazy => 1); -has cql_mapping => (is => 'ro'); -has on_error => (is => 'ro', default => sub {'log'}); +has index => (is => 'lazy'); +has settings => (is => 'lazy'); +has mapping => (is => 'lazy'); +has type => (is => 'lazy'); +has buffer_size => (is => 'lazy', builder => 'default_buffer_size'); +has _bulk => (is => 'lazy'); +has cql_mapping => (is => 'ro'); +has on_error => (is => 'lazy'); + +sub BUILD { + my ($self) = @_; + my $es = $self->store->es; + unless ($es->indices->exists(index => $self->index)) { + $es->indices->create( + index => $self->index, + body => { + settings => $self->settings, + mappings => { $self->type => $self->mapping }, + }, + ); + } + $es; +} sub default_buffer_size {100} @@ -49,6 +68,22 @@ sub _coerce_on_error { "on_error should be code ref, 'throw', 'log', or 'ignore'"); } +sub _build_on_error { + 'log'; +} + +sub _build_settings { + +{}; +} + +sub _build_mapping { + +{}; +} + +sub _build_index { + $_[0]->name; +} + sub _build_type { $_[0]->name; } @@ -57,7 +92,7 @@ sub _build__bulk { my ($self) = @_; my $on_error = $self->_coerce_on_error($self->on_error); my %args = ( - index => $self->store->index_name, + index => $self->index, type => $self->type, max_count => $self->buffer_size, on_error => $on_error, @@ -77,7 +112,7 @@ sub generator { sub { state $scroll = do { my %args = ( - index => $self->store->index_name, + index => $self->index, type => $self->type, size => $self->buffer_size, # TODO divide by number of shards body => {query => {match_all => {}},}, @@ -100,7 +135,7 @@ sub generator { sub count { my ($self) = @_; $self->store->es->count( - index => $self->store->index_name, + index => $self->index, type => $self->type, )->{count}; } @@ -109,7 +144,7 @@ sub get { my ($self, $id) = @_; try { my $data = $self->store->es->get_source( - index => $self->store->index_name, + index => $self->index, type => $self->type, id => $id, ); @@ -136,7 +171,7 @@ sub delete_all { my $es = $self->store->es; if ($es->can('delete_by_query')) { $es->delete_by_query( - index => $self->store->index_name, + index => $self->index, type => $self->type, body => {query => {match_all => {}},}, ); @@ -145,7 +180,7 @@ sub delete_all { $es->transport->perform_request( method => 'DELETE', path => '/' - . $self->store->index_name . '/' + . $self->index . '/' . $self->type . '/_query', body => {query => {match_all => {}},} @@ -158,7 +193,7 @@ sub delete_by_query { my $es = $self->store->es; if ($es->can('delete_by_query')) { $es->delete_by_query( - index => $self->store->index_name, + index => $self->index, type => $self->type, body => {query => $args{query},}, ); @@ -167,7 +202,7 @@ sub delete_by_query { $es->transport->perform_request( method => 'DELETE', path => '/' - . $self->store->index_name . '/' + . $self->index . '/' . $self->type . '/_query', body => {query => $args{query},} @@ -180,7 +215,7 @@ sub commit { $self->_bulk->flush; $self->store->es->transport->perform_request( method => 'POST', - path => '/' . $self->store->index_name . '/_refresh', + path => '/' . $self->index . '/_refresh', ); } @@ -198,7 +233,7 @@ sub search { } my $res = $self->store->es->search( - index => $self->store->index_name, + index => $self->index, type => $self->type, body => {%args, from => $start, size => $limit,}, ); @@ -309,8 +344,7 @@ sub normalize_sort { sub drop { my ($self) = @_; - $self->delete_all; - $self->commit; + $self->store->es->indices->delete(index => $self->index); } 1; diff --git a/lib/Catmandu/Store/ElasticSearch/Searcher.pm b/lib/Catmandu/Store/ElasticSearch/Searcher.pm index 9e3aeec..f03aae4 100644 --- a/lib/Catmandu/Store/ElasticSearch/Searcher.pm +++ b/lib/Catmandu/Store/ElasticSearch/Searcher.pm @@ -18,8 +18,9 @@ has sort => (is => 'ro'); sub generator { my ($self) = @_; - my $store = $self->bag->store; - my $id_key = $self->bag->id_key; + my $bag = $self->bag; + my $store = $bag->store; + my $id_key = $bag->id_key; sub { state $total = $self->total; if (defined $total) { @@ -30,11 +31,11 @@ sub generator { my $body = {query => $self->query}; $body->{sort} = $self->sort if $self->sort; my %args = ( - index => $store->index_name, - type => $self->bag->type, + index => $bag->index, + type => $bag->type, from => $self->start, size => - $self->bag->buffer_size, # TODO divide by number of shards + $bag->buffer_size, # TODO divide by number of shards body => $body, ); if (!$self->sort && $store->is_es_1_or_2) { @@ -71,10 +72,10 @@ sub slice { # TODO constrain total? sub count { my ($self) = @_; - my $store = $self->bag->store; - $store->es->count( - index => $store->index_name, - type => $self->bag->type, + my $bag = $self->bag; + $bag->store->es->count( + index => $bag->index, + type => $bag->type, body => {query => $self->query,}, )->{count}; } diff --git a/t/00.t b/t/00.t index c0e6cce..6e98410 100644 --- a/t/00.t +++ b/t/00.t @@ -13,4 +13,4 @@ my @pkgs = qw( require_ok $_ for @pkgs; -done_testing 4; +done_testing;