Skip to content

Commit

Permalink
start updating to Search::Elasticsearch
Browse files Browse the repository at this point in the history
  • Loading branch information
nics committed May 9, 2014
1 parent be22740 commit 420447c
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 173 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ Makefile.PL
MANIFEST*
META.*
MYMETA.*
Catmandu-Store-ElasticSearch-*
Catmandu-Store-Elasticsearch-*
6 changes: 3 additions & 3 deletions Build.PL
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use warnings;
use Module::Build;

my $builder = Module::Build->new(
module_name => 'Catmandu::Store::ElasticSearch',
module_name => 'Catmandu::Store::Elasticsearch',
license => 'perl',
dist_author => [
'Nicolas Steenlant <[email protected]>',
Expand All @@ -18,10 +18,10 @@ my $builder = Module::Build->new(
'Catmandu' => '0.8',
'CQL::Parser' => '1.12',
'Moo' => '1.00',
'Search::Elasticsearch::Compat' => '0.10',
'Search::Elasticsearch' => '1.11',
},
add_to_cleanup => [qw(
Catmandu-Store-ElasticSearch-*
Catmandu-Store-Elasticsearch-*
)],
create_makefile_pl => 'traditional',
create_license => 1,
Expand Down
24 changes: 4 additions & 20 deletions Changes
Original file line number Diff line number Diff line change
@@ -1,22 +1,6 @@
Revision history for Catmandu-Store-ElasticSearch
Revision history for Catmandu-Store-Elasticsearch


0.0205 2014-03-10
- switch to Search::Elasticsearch::Compat
- store drop method

0.0204 2014-01-02
- switch to Elasticsearch::Compat

0.0203 2013-06-24
- remove given statements

0.0202 2013-03-26
- import confess
- fix pod formatting

0.0201 2012-12-03
- fix reference to old CQL::ElasticSearch

0.02 2012-12-03
- initial release
0.01 2014-04-29
- Continue development from Catmandu::Store::ElasticSearch with the
official Search::Elasticsearch client
Original file line number Diff line number Diff line change
@@ -1,35 +1,29 @@
package Catmandu::Store::ElasticSearch;
package Catmandu::Store::Elasticsearch;

use Catmandu::Sane;
use Moo;
use Search::Elasticsearch::Compat;
use Catmandu::Store::ElasticSearch::Bag;
use Search::Elasticsearch;
use Catmandu::Store::Elasticsearch::Bag;

with 'Catmandu::Store';

=head1 NAME
Catmandu::Store::ElasticSearch - A searchable store backed by Elasticsearch
=head1 DEPRECIATION NOTICE
This is the last version of L<Catmandu::Store::ElasticSearch>. Development will
continue as L<Catmandu::Store::Elasticsearch> using the official
L<Search::Elasticsearch> client.
Catmandu::Store::Elasticsearch - A searchable store backed by Elasticsearch
=head1 VERSION
Version 0.0205
Version 0.01
=cut

our $VERSION = '0.0205';
our $VERSION = '0.01';

=head1 SYNOPSIS
use Catmandu::Store::ElasticSearch;
use Catmandu::Store::Elasticsearch;
my $store = Catmandu::Store::ElasticSearch->new(index_name => 'catmandu');
my $store = Catmandu::Store::Elasticsearch->new(index_name => 'catmandu');
my $obj1 = $store->bag->add({ name => 'Patrick' });
Expand All @@ -54,63 +48,47 @@ our $VERSION = '0.0205';
# Some stores can be searched
my $hits = $store->bag->search(query => 'name:Patrick');
# Catmandu::Store::ElasticSearch supports CQL...
# Catmandu::Store::Elasticsearch supports CQL...
my $hits = $store->bag->search(cql_query => 'name any "Patrick"');
=cut

my $ELASTIC_SEARCH_ARGS = [qw(
transport
servers
trace_calls
timeout
max_requests
no_refresh
)];

has index_name => (is => 'ro', required => 1);
has index_settings => (is => 'ro', lazy => 1, default => sub { +{} });
has index_mappings => (is => 'ro', lazy => 1, default => sub { +{} });
has _es_args => (is => 'rw', lazy => 1, default => sub { +{} });
has es => (is => 'lazy');

has elastic_search => (
is => 'ro',
lazy => 1,
builder => '_build_elastic_search',
);

sub _build_elastic_search {
my $self = $_[0];
my $args = delete $self->{_args};
my $es = Search::Elasticsearch::Compat->new($args);
unless ($es->index_exists(index => $self->index_name)) {
$es->create_index(
sub _build_es {
my ($self) = @_;
my $es = Search::Elasticsearch->new($self->_es_args);
unless ($es->indices->exists(index => $self->index_name)) {
$es->indices->create(
index => $self->index_name,
settings => $self->index_settings,
mappings => $self->index_mappings,
body => {
settings => $self->index_settings,
mappings => $self->index_mappings,
},
);
}
$es->use_index($self->index_name);
$es;
}

sub BUILD {
my ($self, $args) = @_;
$self->{_args} = {};
for my $key (@$ELASTIC_SEARCH_ARGS) {
$self->{_args}{$key} = $args->{$key} if exists $args->{$key};
}
$self->_es_args($args);
}

sub drop {
my ($self) = @_;
$self->elastic_search->delete_index;
$self->es->indices->delete(index => $self->index_name);
}

=head1 METHODS
=head2 new(index_name => $name, cql_mapping => \%mapping)
Create a new Catmandu::Store::ElasticSearch store connected to index $name. The
Create a new Catmandu::Store::Elasticsearch store connected to index $name. The
store supports CQL searches when a cql_mapping is provided. This hash
contains a translation of CQL fields into Elasticsearch searchable fields.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,101 +1,114 @@
package Catmandu::Store::ElasticSearch::Bag;
package Catmandu::Store::Elasticsearch::Bag;

use Catmandu::Sane;
use Moo;
use Catmandu::Hits;
use Catmandu::Store::ElasticSearch::Searcher;
use Catmandu::Store::ElasticSearch::CQL;
use Catmandu::Store::Elasticsearch::Searcher;
use Catmandu::Store::Elasticsearch::CQL;

with 'Catmandu::Bag';
with 'Catmandu::Searchable';
with 'Catmandu::Buffer';

has cql_mapping => (is => 'ro'); # TODO move to Searchable
has buffer_size => (is => 'ro', lazy => 1, builder => 'default_buffer_size');
has _bulk => (is => 'ro', lazy => 1, builder => '_build_bulk');
has cql_mapping => (is => 'ro');

sub default_buffer_size { 100 }

sub _build_bulk {
my ($self) = @_;
my %args = (
index => $self->store->index_name,
type => $self->name,
max_count => $self->buffer_size,
on_error => sub {
my ($action, $res, $i) = @_;
$self->log->error($res);
},
);
if ($self->log->is_debug) {
$args{on_success} = sub {
my ($action, $res, $i) = @_;
$self->log->debug($res);
};
}
$self->store->es->bulk_helper(%args);
}

sub generator {
my ($self) = @_;
my $limit = $self->buffer_size;
sub {
state $scroller = $self->store->elastic_search->scrolled_search({
state $scroll = $self->store->es->scroll_helper(
index => $self->store->index_name,
type => $self->name,
search_type => 'scan',
query => {match_all => {}},
type => $self->name,
});
state @hits;
@hits = $scroller->next($limit) unless @hits;
(shift(@hits) || return)->{_source};
size => $self->buffer_size, # TODO divide by number of shards
body => {
query => {match_all => {}},
},
);
my $data = $scroll->next // return;
$data->{_source};
};
}

sub count {
my ($self) = @_;
$self->store->elastic_search->count(type => $self->name)->{count};
$self->store->es->count(
index => $self->store->index_name,
type => $self->name,
)->{count};
}

sub get {
sub get { # TODO ignore missing
my ($self, $id) = @_;
my $res = $self->store->elastic_search->get(
type => $self->name,
ignore_missing => 1,
id => $id,
$self->store->es->get_source(
index => $self->store->index_name,
type => $self->name,
id => $id,
);
return $res->{_source} if $res;
return;
}

sub add {
my ($self, $data) = @_;

$self->buffer_add({index => {
type => $self->name,
id => $data->{_id},
data => $data,
}});

if ($self->buffer_is_full) {
$self->commit;
}
$self->_bulk->index({
id => $data->{_id},
source => $data,
});
}

sub delete {
my ($self, $id) = @_;

$self->buffer_add({delete => {
type => $self->name,
id => $id,
}});

if ($self->buffer_is_full) {
$self->commit;
}
$self->_bulk->delete({id => $id});
}

sub delete_all {
sub delete_all { # TODO refresh
my ($self) = @_;
my $es = $self->store->elastic_search;
my $es = $self->store->es;
$es->delete_by_query(
query => {match_all => {}},
index => $self->store->index_name,
type => $self->name,
body => {
query => {match_all => {}},
},
);
$es->refresh_index;
}

sub delete_by_query {
sub delete_by_query { # TODO refresh
my ($self, %args) = @_;
my $es = $self->store->elastic_search;
my $es = $self->store->es;
$es->delete_by_query(
query => $args{query},
index => $self->store->index_name,
type => $self->name,
body => {
query => $args{query},
},
);
$es->refresh_index;
}

sub commit { # TODO optimize, better error handling
sub commit {
my ($self) = @_;
return 1 unless $self->buffer_used;
my $err = $self->store->elastic_search->bulk(actions => $self->buffer, refresh => 1)->{errors};
$self->clear_buffer;
return !defined $err, $err;
$self->_bulk->flush;
}

sub search {
Expand All @@ -109,12 +122,15 @@ sub search {
$args{fields} = [];
}

my $res = $self->store->elastic_search->search({
%args,
my $res = $self->store->es->search(
index => $self->store->index_name,
type => $self->name,
from => $start,
size => $limit,
});
body => {
%args,
from => $start,
size => $limit,
},
);

my $docs = $res->{hits}{hits};

Expand All @@ -134,8 +150,8 @@ sub search {

$hits = Catmandu::Hits->new($hits);

if ($args{facets}) {
$hits->{facets} = $res->{facets};
for my $key (qw(facets suggest)) {
$hits->{$key} = $res->{$key} if exists $args{$key};
}

if ($args{highlight}) {
Expand All @@ -151,7 +167,7 @@ sub search {

sub searcher {
my ($self, %args) = @_;
Catmandu::Store::ElasticSearch::Searcher->new(%args, bag => $self);
Catmandu::Store::Elasticsearch::Searcher->new(%args, bag => $self);
}

sub translate_sru_sortkeys {
Expand Down Expand Up @@ -183,7 +199,7 @@ sub _translate_sru_sortkey {

sub translate_cql_query {
my ($self, $query) = @_;
Catmandu::Store::ElasticSearch::CQL->new(mapping => $self->cql_mapping)->parse($query);
Catmandu::Store::Elasticsearch::CQL->new(mapping => $self->cql_mapping)->parse($query);
}

sub normalize_query {
Expand Down
Loading

0 comments on commit 420447c

Please sign in to comment.