From 420447c124d4a369c07e4840560cabe9b780a84d Mon Sep 17 00:00:00 2001 From: Nicolas Steenlant Date: Fri, 9 May 2014 09:13:25 +0200 Subject: [PATCH] start updating to Search::Elasticsearch --- .gitignore | 2 +- Build.PL | 6 +- Changes | 24 +-- .../{ElasticSearch.pm => Elasticsearch.pm} | 68 +++------ .../{ElasticSearch => Elasticsearch}/Bag.pm | 140 ++++++++++-------- .../{ElasticSearch => Elasticsearch}/CQL.pm | 26 ++-- .../Searcher.pm | 47 +++--- t/00.t | 8 +- 8 files changed, 148 insertions(+), 173 deletions(-) rename lib/Catmandu/Store/{ElasticSearch.pm => Elasticsearch.pm} (70%) rename lib/Catmandu/Store/{ElasticSearch => Elasticsearch}/Bag.pm (53%) rename lib/Catmandu/Store/{ElasticSearch => Elasticsearch}/CQL.pm (92%) rename lib/Catmandu/Store/{ElasticSearch => Elasticsearch}/Searcher.pm (50%) diff --git a/.gitignore b/.gitignore index 36df5c1..46d7f08 100644 --- a/.gitignore +++ b/.gitignore @@ -10,4 +10,4 @@ Makefile.PL MANIFEST* META.* MYMETA.* -Catmandu-Store-ElasticSearch-* +Catmandu-Store-Elasticsearch-* diff --git a/Build.PL b/Build.PL index 3db6ebf..7d56f01 100644 --- a/Build.PL +++ b/Build.PL @@ -3,7 +3,7 @@ use warnings; use Module::Build; my $builder = Module::Build->new( - module_name => 'Catmandu::Store::ElasticSearch', + module_name => 'Catmandu::Store::Elasticsearch', license => 'perl', dist_author => [ 'Nicolas Steenlant ', @@ -18,10 +18,10 @@ my $builder = Module::Build->new( 'Catmandu' => '0.8', 'CQL::Parser' => '1.12', 'Moo' => '1.00', - 'Search::Elasticsearch::Compat' => '0.10', + 'Search::Elasticsearch' => '1.11', }, add_to_cleanup => [qw( - Catmandu-Store-ElasticSearch-* + Catmandu-Store-Elasticsearch-* )], create_makefile_pl => 'traditional', create_license => 1, diff --git a/Changes b/Changes index feab05a..016bc3c 100644 --- a/Changes +++ b/Changes @@ -1,22 +1,6 @@ -Revision history for Catmandu-Store-ElasticSearch +Revision history for Catmandu-Store-Elasticsearch -0.0205 2014-03-10 - - switch to Search::Elasticsearch::Compat - - store drop method - -0.0204 2014-01-02 - - switch to Elasticsearch::Compat - -0.0203 2013-06-24 - - remove given statements - -0.0202 2013-03-26 - - import confess - - fix pod formatting - -0.0201 2012-12-03 - - fix reference to old CQL::ElasticSearch - -0.02 2012-12-03 - - initial release +0.01 2014-04-29 + - Continue development from Catmandu::Store::ElasticSearch with the + official Search::Elasticsearch client diff --git a/lib/Catmandu/Store/ElasticSearch.pm b/lib/Catmandu/Store/Elasticsearch.pm similarity index 70% rename from lib/Catmandu/Store/ElasticSearch.pm rename to lib/Catmandu/Store/Elasticsearch.pm index bf39667..bfabf7d 100644 --- a/lib/Catmandu/Store/ElasticSearch.pm +++ b/lib/Catmandu/Store/Elasticsearch.pm @@ -1,35 +1,29 @@ -package Catmandu::Store::ElasticSearch; +package Catmandu::Store::Elasticsearch; use Catmandu::Sane; use Moo; -use Search::Elasticsearch::Compat; -use Catmandu::Store::ElasticSearch::Bag; +use Search::Elasticsearch; +use Catmandu::Store::Elasticsearch::Bag; with 'Catmandu::Store'; =head1 NAME -Catmandu::Store::ElasticSearch - A searchable store backed by Elasticsearch - -=head1 DEPRECIATION NOTICE - -This is the last version of L. Development will -continue as L using the official -L client. +Catmandu::Store::Elasticsearch - A searchable store backed by Elasticsearch =head1 VERSION -Version 0.0205 +Version 0.01 =cut -our $VERSION = '0.0205'; +our $VERSION = '0.01'; =head1 SYNOPSIS - use Catmandu::Store::ElasticSearch; + use Catmandu::Store::Elasticsearch; - my $store = Catmandu::Store::ElasticSearch->new(index_name => 'catmandu'); + my $store = Catmandu::Store::Elasticsearch->new(index_name => 'catmandu'); my $obj1 = $store->bag->add({ name => 'Patrick' }); @@ -54,63 +48,47 @@ our $VERSION = '0.0205'; # Some stores can be searched my $hits = $store->bag->search(query => 'name:Patrick'); - # Catmandu::Store::ElasticSearch supports CQL... + # Catmandu::Store::Elasticsearch supports CQL... my $hits = $store->bag->search(cql_query => 'name any "Patrick"'); =cut -my $ELASTIC_SEARCH_ARGS = [qw( - transport - servers - trace_calls - timeout - max_requests - no_refresh -)]; - has index_name => (is => 'ro', required => 1); has index_settings => (is => 'ro', lazy => 1, default => sub { +{} }); has index_mappings => (is => 'ro', lazy => 1, default => sub { +{} }); +has _es_args => (is => 'rw', lazy => 1, default => sub { +{} }); +has es => (is => 'lazy'); -has elastic_search => ( - is => 'ro', - lazy => 1, - builder => '_build_elastic_search', -); - -sub _build_elastic_search { - my $self = $_[0]; - my $args = delete $self->{_args}; - my $es = Search::Elasticsearch::Compat->new($args); - unless ($es->index_exists(index => $self->index_name)) { - $es->create_index( +sub _build_es { + my ($self) = @_; + my $es = Search::Elasticsearch->new($self->_es_args); + unless ($es->indices->exists(index => $self->index_name)) { + $es->indices->create( index => $self->index_name, - settings => $self->index_settings, - mappings => $self->index_mappings, + body => { + settings => $self->index_settings, + mappings => $self->index_mappings, + }, ); } - $es->use_index($self->index_name); $es; } sub BUILD { my ($self, $args) = @_; - $self->{_args} = {}; - for my $key (@$ELASTIC_SEARCH_ARGS) { - $self->{_args}{$key} = $args->{$key} if exists $args->{$key}; - } + $self->_es_args($args); } sub drop { my ($self) = @_; - $self->elastic_search->delete_index; + $self->es->indices->delete(index => $self->index_name); } =head1 METHODS =head2 new(index_name => $name, cql_mapping => \%mapping) -Create a new Catmandu::Store::ElasticSearch store connected to index $name. The +Create a new Catmandu::Store::Elasticsearch store connected to index $name. The store supports CQL searches when a cql_mapping is provided. This hash contains a translation of CQL fields into Elasticsearch searchable fields. diff --git a/lib/Catmandu/Store/ElasticSearch/Bag.pm b/lib/Catmandu/Store/Elasticsearch/Bag.pm similarity index 53% rename from lib/Catmandu/Store/ElasticSearch/Bag.pm rename to lib/Catmandu/Store/Elasticsearch/Bag.pm index 8af3417..9bfd26d 100644 --- a/lib/Catmandu/Store/ElasticSearch/Bag.pm +++ b/lib/Catmandu/Store/Elasticsearch/Bag.pm @@ -1,101 +1,114 @@ -package Catmandu::Store::ElasticSearch::Bag; +package Catmandu::Store::Elasticsearch::Bag; use Catmandu::Sane; use Moo; use Catmandu::Hits; -use Catmandu::Store::ElasticSearch::Searcher; -use Catmandu::Store::ElasticSearch::CQL; +use Catmandu::Store::Elasticsearch::Searcher; +use Catmandu::Store::Elasticsearch::CQL; with 'Catmandu::Bag'; with 'Catmandu::Searchable'; -with 'Catmandu::Buffer'; -has cql_mapping => (is => 'ro'); # TODO move to Searchable +has buffer_size => (is => 'ro', lazy => 1, builder => 'default_buffer_size'); +has _bulk => (is => 'ro', lazy => 1, builder => '_build_bulk'); +has cql_mapping => (is => 'ro'); + +sub default_buffer_size { 100 } + +sub _build_bulk { + my ($self) = @_; + my %args = ( + index => $self->store->index_name, + type => $self->name, + max_count => $self->buffer_size, + on_error => sub { + my ($action, $res, $i) = @_; + $self->log->error($res); + }, + ); + if ($self->log->is_debug) { + $args{on_success} = sub { + my ($action, $res, $i) = @_; + $self->log->debug($res); + }; + } + $self->store->es->bulk_helper(%args); +} sub generator { my ($self) = @_; - my $limit = $self->buffer_size; sub { - state $scroller = $self->store->elastic_search->scrolled_search({ + state $scroll = $self->store->es->scroll_helper( + index => $self->store->index_name, + type => $self->name, search_type => 'scan', - query => {match_all => {}}, - type => $self->name, - }); - state @hits; - @hits = $scroller->next($limit) unless @hits; - (shift(@hits) || return)->{_source}; + size => $self->buffer_size, # TODO divide by number of shards + body => { + query => {match_all => {}}, + }, + ); + my $data = $scroll->next // return; + $data->{_source}; }; } sub count { my ($self) = @_; - $self->store->elastic_search->count(type => $self->name)->{count}; + $self->store->es->count( + index => $self->store->index_name, + type => $self->name, + )->{count}; } -sub get { +sub get { # TODO ignore missing my ($self, $id) = @_; - my $res = $self->store->elastic_search->get( - type => $self->name, - ignore_missing => 1, - id => $id, + $self->store->es->get_source( + index => $self->store->index_name, + type => $self->name, + id => $id, ); - return $res->{_source} if $res; - return; } sub add { my ($self, $data) = @_; - - $self->buffer_add({index => { - type => $self->name, - id => $data->{_id}, - data => $data, - }}); - - if ($self->buffer_is_full) { - $self->commit; - } + $self->_bulk->index({ + id => $data->{_id}, + source => $data, + }); } sub delete { my ($self, $id) = @_; - - $self->buffer_add({delete => { - type => $self->name, - id => $id, - }}); - - if ($self->buffer_is_full) { - $self->commit; - } + $self->_bulk->delete({id => $id}); } -sub delete_all { +sub delete_all { # TODO refresh my ($self) = @_; - my $es = $self->store->elastic_search; + my $es = $self->store->es; $es->delete_by_query( - query => {match_all => {}}, + index => $self->store->index_name, type => $self->name, + body => { + query => {match_all => {}}, + }, ); - $es->refresh_index; } -sub delete_by_query { +sub delete_by_query { # TODO refresh my ($self, %args) = @_; - my $es = $self->store->elastic_search; + my $es = $self->store->es; $es->delete_by_query( - query => $args{query}, + index => $self->store->index_name, type => $self->name, + body => { + query => $args{query}, + }, ); - $es->refresh_index; } -sub commit { # TODO optimize, better error handling +sub commit { my ($self) = @_; - return 1 unless $self->buffer_used; - my $err = $self->store->elastic_search->bulk(actions => $self->buffer, refresh => 1)->{errors}; - $self->clear_buffer; - return !defined $err, $err; + $self->_bulk->flush; } sub search { @@ -109,12 +122,15 @@ sub search { $args{fields} = []; } - my $res = $self->store->elastic_search->search({ - %args, + my $res = $self->store->es->search( + index => $self->store->index_name, type => $self->name, - from => $start, - size => $limit, - }); + body => { + %args, + from => $start, + size => $limit, + }, + ); my $docs = $res->{hits}{hits}; @@ -134,8 +150,8 @@ sub search { $hits = Catmandu::Hits->new($hits); - if ($args{facets}) { - $hits->{facets} = $res->{facets}; + for my $key (qw(facets suggest)) { + $hits->{$key} = $res->{$key} if exists $args{$key}; } if ($args{highlight}) { @@ -151,7 +167,7 @@ sub search { sub searcher { my ($self, %args) = @_; - Catmandu::Store::ElasticSearch::Searcher->new(%args, bag => $self); + Catmandu::Store::Elasticsearch::Searcher->new(%args, bag => $self); } sub translate_sru_sortkeys { @@ -183,7 +199,7 @@ sub _translate_sru_sortkey { sub translate_cql_query { my ($self, $query) = @_; - Catmandu::Store::ElasticSearch::CQL->new(mapping => $self->cql_mapping)->parse($query); + Catmandu::Store::Elasticsearch::CQL->new(mapping => $self->cql_mapping)->parse($query); } sub normalize_query { diff --git a/lib/Catmandu/Store/ElasticSearch/CQL.pm b/lib/Catmandu/Store/Elasticsearch/CQL.pm similarity index 92% rename from lib/Catmandu/Store/ElasticSearch/CQL.pm rename to lib/Catmandu/Store/Elasticsearch/CQL.pm index 1483096..c0027a1 100644 --- a/lib/Catmandu/Store/ElasticSearch/CQL.pm +++ b/lib/Catmandu/Store/Elasticsearch/CQL.pm @@ -1,4 +1,4 @@ -package Catmandu::Store::ElasticSearch::CQL; +package Catmandu::Store::Elasticsearch::CQL; use Catmandu::Sane; use Catmandu::Util qw(require_package trim); @@ -161,7 +161,7 @@ sub _parse_prox_node { my $slop = 0; my $qualifier = $node->left->getQualifier; - my $term = join ' ', $node->left->getTerm, $node->right->getTerm; + my $term = join(' ', $node->left->getTerm, $node->right->getTerm); if (my ($n) = $node->op =~ $RE_DISTANCE_MODIFIER) { $slop = $n - 1 if $n > 1; } @@ -169,7 +169,7 @@ sub _parse_prox_node { $qualifier = '_all'; } - $query->{text_phrase} = { $qualifier => { query => $term, slop => $slop } }; + $query->{match_phrase} = { $qualifier => { query => $term, slop => $slop } }; } sub _term_node { @@ -220,14 +220,14 @@ sub _term_node { if ($_ eq '_id') { { ids => { values => [$term] } }; } else { - { text_phrase => { $_ => { query => $term } } }; + { match_phrase => { $_ => { query => $term } } }; } } @$qualifier ] } }; } else { if ($qualifier eq '_id') { return { bool => { must_not => [ { ids => { values => [$term] } } ] } }; } - return { bool => { must_not => [ { text_phrase => { $qualifier => { query => $term } } } ] } }; + return { bool => { must_not => [ { match_phrase => { $qualifier => { query => $term } } } ] } }; } } elsif ($base eq 'exact') { if (ref $qualifier) { @@ -235,14 +235,14 @@ sub _term_node { if ($_ eq '_id') { { ids => { values => [$term] } }; } else { - { text_phrase => { $_ => { query => $term } } }; + { match_phrase => { $_ => { query => $term } } }; } } @$qualifier ] } }; } else { if ($qualifier eq '_id') { return { ids => { values => [$term] } }; } - return { text_phrase => { $qualifier => { query => $term } } }; + return {match_phrase => { $qualifier => { query => $term } } }; } } elsif ($base eq 'any') { $term = [split /\s+/, trim($term)]; @@ -270,7 +270,7 @@ sub _term_node { if (ref $qualifier) { return { bool => { should => [ map { { text => { $_ => { query => $term } } } } @$qualifier ] } }; } else { - return { text => { $qualifier => { query => $term } } }; + return { match => { $qualifier => { query => $term } } }; } } if (ref $qualifier) { @@ -296,24 +296,24 @@ sub _text_node { } for my $m (@modifiers) { if ($m->[1] eq 'fuzzy') { # TODO only works for single terms, mapping fuzzy_factor - return { fuzzy => { $qualifier => { value => $term, max_expansions => 10, min_similarity => 0.75 } } }; + return { fuzzy => { $qualifier => { value => $term, max_expansions => 10 } } }; } } if ($term =~ /\s/) { - return { text_phrase => { $qualifier => { query => $term } } }; + return { match_phrase => { $qualifier => { query => $term } } }; } - { text => { $qualifier => { query => $term } } }; + { match => { $qualifier => { query => $term } } }; } 1; =head1 NAME -Catmandu::Store::ElasticSearch::CQL - Converts a CQL query string to a Elasticsearch query hashref +Catmandu::Store::Elasticsearch::CQL - Converts a CQL query string to a Elasticsearch query hashref =head1 SYNOPSIS - $es_query = Catmandu::Store::ElasticSearch::CQL + $es_query = Catmandu::Store::Elasticsearch::CQL ->new(mapping => $cql_mapping) ->parse($cql_query_string); diff --git a/lib/Catmandu/Store/ElasticSearch/Searcher.pm b/lib/Catmandu/Store/Elasticsearch/Searcher.pm similarity index 50% rename from lib/Catmandu/Store/ElasticSearch/Searcher.pm rename to lib/Catmandu/Store/Elasticsearch/Searcher.pm index 0dd8ecd..852ae88 100644 --- a/lib/Catmandu/Store/ElasticSearch/Searcher.pm +++ b/lib/Catmandu/Store/Elasticsearch/Searcher.pm @@ -1,4 +1,4 @@ -package Catmandu::Store::ElasticSearch::Searcher; +package Catmandu::Store::Elasticsearch::Searcher; use Catmandu::Sane; use Moo; @@ -14,37 +14,30 @@ has sort => (is => 'ro'); sub generator { my ($self) = @_; - my $limit = $self->limit; sub { state $total = $self->total; if (defined $total) { return unless $total; } - state $scroller = do { - my $args = { - query => $self->query, - type => $self->bag->name, - from => $self->start, - }; - if ($self->sort) { - $args->{search_type} = 'query_then_fetch'; - $args->{sort} = $self->sort; - } else { - $args->{search_type} = 'scan'; - } - $self->bag->store->elastic_search->scrolled_search($args); + + state $scroll = do { + my $body = {query => $self->query}; + $body->{sort} = $self->sort if $self->sort; + $self->store->es->scroll_helper( + index => $self->store->index_name, + type => $self->name, + search_type => $self->sort ? 'query_then_fetch' : 'scan', + from => $self->start, + size => $self->bag->buffer_size, # TODO divide by number of shards + body => $body, + ); }; - state @hits; - unless (@hits) { - if ($total && $limit > $total) { - $limit = $total; - } - @hits = $scroller->next($limit); - } + + my $data = $scroll->next // return; if ($total) { $total--; } - (shift(@hits) || return)->{_source}; + $data->{_source}; }; } @@ -63,9 +56,13 @@ sub slice { # TODO constrain total? sub count { my ($self) = @_; - $self->bag->store->elastic_search->count( - query => $self->query, + my $store = $self->bag->store; + $store->es->count( + index => $store->index_name, type => $self->bag->name, + body => { + query => $self->query, + }, )->{count}; } diff --git a/t/00.t b/t/00.t index c0e6cce..d8f021a 100644 --- a/t/00.t +++ b/t/00.t @@ -5,10 +5,10 @@ use warnings; use Test::More; my @pkgs = qw( - Catmandu::Store::ElasticSearch - Catmandu::Store::ElasticSearch::Bag - Catmandu::Store::ElasticSearch::Searcher - Catmandu::Store::ElasticSearch::CQL + Catmandu::Store::Elasticsearch + Catmandu::Store::Elasticsearch::Bag + Catmandu::Store::Elasticsearch::Searcher + Catmandu::Store::Elasticsearch::CQL ); require_ok $_ for @pkgs;