Skip to content

Commit

Permalink
support es 7
Browse files Browse the repository at this point in the history
  • Loading branch information
nics committed Aug 31, 2021
1 parent a47360f commit 1aea9d3
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 26 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ Catmandu::Store::ElasticSearch - A searchable store backed by Elasticsearch
## new(%params, bags => { mybag => { index => 'myindex', mapping => \\%map cql\_mapping => \\%map } })

Create a new Catmandu::Store::ElasticSearch store. ElasticSearch connection
parameters will be passed on to the underlying client.
parameters will be passed on to the underlying [Search::Elasticsearch](https://metacpan.org/pod/Search%3A%3AElasticsearch) client.

Optionally provide for each bag a `index` to indicate which index to use.
This defaults to the bag's name.
Expand Down Expand Up @@ -276,7 +276,7 @@ import you data again.

# SEE ALSO

[Catmandu::Store](https://metacpan.org/pod/Catmandu%3A%3AStore)
[Catmandu::Store](https://metacpan.org/pod/Catmandu%3A%3AStore) , [Search::Elasticsearch](https://metacpan.org/pod/Search%3A%3AElasticsearch)

# AUTHOR

Expand Down
24 changes: 18 additions & 6 deletions lib/Catmandu/Store/ElasticSearch.pm
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ use namespace::clean;

with 'Catmandu::Store';

has _es_args => (is => 'rw', lazy => 1, default => sub {+{}});
has es => (is => 'lazy');
has is_es_1_or_2 => (is => 'lazy', init_arg => undef);
has _es_args => (is => 'rw', lazy => 1, default => sub {+{}});
has es => (is => 'lazy');
has es_client_version => (is => 'lazy');
has is_es_1_or_2 => (is => 'lazy', init_arg => undef);
has is_es_7_or_higher => (is => 'lazy', init_arg => undef);

sub BUILD {
my ($self, $args) = @_;
Expand All @@ -26,11 +28,21 @@ sub _build_es {
Search::Elasticsearch->new($self->_es_args);
}

sub _build_es_client_version {
my ($self) = @_;
my ($version)
= ref($self->es) =~ 'Search::Elasticsearch::Client::(\d+)_0::Direct';
$version + 0;
}

sub _build_is_es_1_or_2 {
my ($self) = @_;
is_instance($self->es, 'Search::Elasticsearch::Client::1_0::Direct')
|| is_instance($self->es,
'Search::Elasticsearch::Client::2_0::Direct');
$self->es_client_version == 1 || $self->es_client_version == 2;
}

sub _build_is_es_7_or_higher {
my ($self) = @_;
$self->es_client_version >= 7;
}

1;
Expand Down
44 changes: 26 additions & 18 deletions lib/Catmandu/Store/ElasticSearch/Bag.pm
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ sub _build_index {
}

sub _build_type {
my ($self) = @_;
if ($self->store->is_es_7_or_higher) {
return '_doc';
}
$_[0]->name;
}

Expand All @@ -98,10 +102,12 @@ sub _build__bulk {
my $on_error = $self->_coerce_on_error($self->on_error);
my %args = (
index => $self->index,
type => $self->type,
max_count => $self->buffer_size,
on_error => $on_error,
);
if (!$self->store->is_es_7_or_higher) {
$args{type} = $self->type;
}
if ($self->log->is_debug) {
$args{on_success} = sub {
my ($action, $res, $i) = @_;
Expand Down Expand Up @@ -150,11 +156,11 @@ sub count {
sub get {
my ($self, $id) = @_;
try {
my $data = $self->store->es->get_source(
index => $self->index,
type => $self->type,
id => $id,
);
my %args = (index => $self->index, id => $id,);
if (!$self->store->is_es_7_or_higher) {
$args{type} = $self->type;
}
my $data = $self->store->es->get_source(%args);
$data->{$self->id_key} = $id;
$data;
}
Expand All @@ -177,12 +183,14 @@ sub delete_all {
my ($self) = @_;
my $es = $self->store->es;
if ($es->can('delete_by_query')) {
$es->delete_by_query(
index => $self->index,
type => $self->type,
body => {query => {match_all => {}},},
);
my %args
= (index => $self->index, body => {query => {match_all => {}},},);
if (!$self->store->is_es_7_or_higher) {
$args{type} = $self->type;
}
$es->delete_by_query(%args);
}

else { # TODO document plugin needed for es 2.x
$es->transport->perform_request(
method => 'DELETE',
Expand All @@ -193,20 +201,20 @@ sub delete_all {
}

sub delete_by_query {
my ($self, %args) = @_;
my ($self, %opts) = @_;
my $es = $self->store->es;
if ($es->can('delete_by_query')) {
$es->delete_by_query(
index => $self->index,
type => $self->type,
body => {query => $args{query},},
);
my %args = (index => $self->index, body => {query => $opts{query},},);
if (!$self->store->is_es_7_or_higher) {
$args{type} = $self->type;
}
$es->delete_by_query(%args);
}
else { # TODO document plugin needed for es 2.x
$es->transport->perform_request(
method => 'DELETE',
path => '/' . $self->index . '/' . $self->type . '/_query',
body => {query => $args{query},}
body => {query => $opts{query},}
);
}
}
Expand Down

0 comments on commit 1aea9d3

Please sign in to comment.