Skip to content

Commit

Permalink
Upgrade to Elasticsearch 5 (#3789)
Browse files Browse the repository at this point in the history
* Upgrade to elasticsearch 5

* Fix broken percolate queries

* Lint

* Handle updates during recreate_index from legacy indexes
  • Loading branch information
George Schneeloch authored Feb 8, 2018
1 parent dbcb907 commit b037a94
Show file tree
Hide file tree
Showing 17 changed files with 1,048 additions and 626 deletions.
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ services:
- "6379"

elastic:
image: elasticsearch:2.4.1
command: elasticsearch -Des.network.host=0.0.0.0 -Des.http.cors.enabled=true -Des.http.cors.allow-origin=*
image: elasticsearch:5.5
command: elasticsearch -E network.host=0.0.0.0 -E http.cors.enabled=true -E http.cors.allow-origin=* -E rest.action.multi.allow_explicit_index=false
ports:
- "9100:9200"

Expand Down
4 changes: 2 additions & 2 deletions requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ django-webpack-loader==0.4.1
djangorestframework==3.5.2
edx-api-client==0.4.0
edx-opaque-keys==0.4
elasticsearch-dsl==2.1.0
elasticsearch==2.3.0
elasticsearch-dsl==5.4.0
elasticsearch==5.5.1
factory_boy
faker
gnupg==2.2.0
Expand Down
6 changes: 3 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ django==1.10.5
djangorestframework==3.5.2
edx-api-client==0.4.0
edx-opaque-keys==0.4
elasticsearch-dsl==2.1.0
elasticsearch==2.3.0
elasticsearch-dsl==5.4.0
elasticsearch==5.5.1
factory-boy==2.8.1
faker==0.7.18
gnupg==2.2.0
Expand Down Expand Up @@ -84,7 +84,7 @@ stevedore==1.25.0 # via edx-opaque-keys
tornado==4.5.1 # via robohash
traitlets==4.3.2 # via ipython
unidecode==0.4.21 # via wagtail
urllib3==1.22 # via elasticsearch
urllib3==1.21.1 # via elasticsearch
uwsgi==2.0.15
vine==1.1.4 # via amqp
wagtail==1.10.1
Expand Down
34 changes: 15 additions & 19 deletions search/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
from profiles.models import Profile
from roles.api import get_advance_searchable_program_ids
from search.connection import (
get_default_alias,
get_default_alias_and_doc_type,
get_conn,
USER_DOC_TYPE,
PUBLIC_USER_DOC_TYPE,
PRIVATE_ENROLLMENT_INDEX_TYPE,
PUBLIC_ENROLLMENT_INDEX_TYPE,
PERCOLATE_INDEX_TYPE,
)
from search.models import (
PercolateQuery,
Expand Down Expand Up @@ -68,7 +69,7 @@ def get_searchable_programs(user, staff_program_ids):
staff_program_ids (list of int): the list of program ids the user is staff for if any
Returns:
set(courses.models.Program): set of programs the user can search in
set of courses.models.Program: set of programs the user can search in
"""

# filter only to the staff programs or enrolled programs
Expand Down Expand Up @@ -119,16 +120,6 @@ def create_program_limit_query(user, staff_program_ids, filter_on_email_optin=Fa
)


def _get_search_doc_types(is_advance_search_capable):
"""
Determines searchable doc types based on search capabilities
Args:
is_advance_search_capable (bool): If true, allows user to perform staff search
"""
return (USER_DOC_TYPE,) if is_advance_search_capable else (PUBLIC_USER_DOC_TYPE,)


def create_search_obj(user, search_param_dict=None, filter_on_email_optin=False):
"""
Creates a search object and prepares it with metadata and query parameters that
Expand All @@ -144,7 +135,9 @@ def create_search_obj(user, search_param_dict=None, filter_on_email_optin=False)
"""
staff_program_ids = get_advance_searchable_program_ids(user)
is_advance_search_capable = bool(staff_program_ids)
search_obj = Search(index=get_default_alias(), doc_type=_get_search_doc_types(is_advance_search_capable))
index_type = PRIVATE_ENROLLMENT_INDEX_TYPE if is_advance_search_capable else PUBLIC_ENROLLMENT_INDEX_TYPE
index, doc_type = get_default_alias_and_doc_type(index_type)
search_obj = Search(index=index, doc_type=doc_type)
# Update from search params first so our server-side filtering will overwrite it if necessary
if search_param_dict is not None:
search_obj.update_from_dict(search_param_dict)
Expand Down Expand Up @@ -212,7 +205,7 @@ def search_for_field(search_obj, field_name, page_size=DEFAULT_ES_LOOP_PAGE_SIZE
# Maintaining a consistent sort on '_doc' will help prevent bugs where the
# index is altered during the loop.
# This also limits the query to only return the field value.
search_obj = search_obj.sort('_doc').fields(field_name)
search_obj = search_obj.sort('_doc').source(include=[field_name])
loop = 0
all_results_returned = False
while not all_results_returned:
Expand All @@ -221,7 +214,7 @@ def search_for_field(search_obj, field_name, page_size=DEFAULT_ES_LOOP_PAGE_SIZE
search_results = execute_search(search_obj[from_index: to_index])
# add the field value for every search result hit to the set
for hit in search_results.hits:
results.add(getattr(hit, field_name)[0])
results.add(getattr(hit, field_name))
all_results_returned = to_index >= search_results.hits.total
loop += 1
return results
Expand Down Expand Up @@ -268,11 +261,12 @@ def _search_percolate_queries(program_enrollment):
list of int: A list of PercolateQuery ids
"""
conn = get_conn()
index, doc_type = get_default_alias_and_doc_type(PERCOLATE_INDEX_TYPE)
doc = serialize_program_enrolled_user(program_enrollment)
# We don't need this to search for percolator queries and
# it causes a dynamic mapping failure so we need to remove it
del doc['_id']
result = conn.percolate(get_default_alias(), USER_DOC_TYPE, body={"doc": doc})
result = conn.percolate(index, doc_type, body={"doc": doc})
failures = result.get('_shards', {}).get('failures', [])
if len(failures) > 0:
raise PercolateException("Failed to percolate: {}".format(failures))
Expand Down Expand Up @@ -318,9 +312,11 @@ def document_needs_updating(enrollment):
Returns:
bool: True if the document needs to be updated via reindex
"""
index, doc_type = get_default_alias_and_doc_type(PRIVATE_ENROLLMENT_INDEX_TYPE)

conn = get_conn()
try:
document = conn.get(index=get_default_alias(), doc_type=USER_DOC_TYPE, id=enrollment.id)
document = conn.get(index=index, doc_type=doc_type, id=enrollment.id)
except NotFoundError:
return True
serialized_enrollment = serialize_program_enrolled_user(enrollment)
Expand Down
17 changes: 9 additions & 8 deletions search/api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
)
from search.base import ESTestCase
from search.connection import (
USER_DOC_TYPE,
PUBLIC_USER_DOC_TYPE,
get_default_alias,
get_default_alias_and_doc_type,
PRIVATE_ENROLLMENT_INDEX_TYPE,
PUBLIC_ENROLLMENT_INDEX_TYPE,
)
from search.exceptions import (
NoProgramAccessException,
Expand Down Expand Up @@ -126,11 +126,11 @@ def test_size_param_in_query(self):
assert search_query_dict['size'] == 5

@ddt.data(
(True, [USER_DOC_TYPE]),
(False, [PUBLIC_USER_DOC_TYPE]),
(True, PRIVATE_ENROLLMENT_INDEX_TYPE),
(False, PUBLIC_ENROLLMENT_INDEX_TYPE),
)
@ddt.unpack
def test_create_search_obj_metadata(self, is_advance_search_capable, expected_doc_type):
def test_create_search_obj_metadata(self, is_advance_search_capable, expected_index_type):
"""
Test that Search objects are created with proper metadata
"""
Expand All @@ -141,8 +141,9 @@ def test_create_search_obj_metadata(self, is_advance_search_capable, expected_do
user,
search_param_dict=search_param_dict,
)
assert search_obj._doc_type == expected_doc_type # pylint: disable=protected-access
assert search_obj._index == [get_default_alias()] # pylint: disable=protected-access
expected_alias, doc_type = get_default_alias_and_doc_type(expected_index_type)
assert search_obj._doc_type == [doc_type] # pylint: disable=protected-access
assert search_obj._index == [expected_alias] # pylint: disable=protected-access
assert mock_update_from_dict.call_count == 2
assert isinstance(mock_update_from_dict.call_args[0][0], Search)
assert mock_update_from_dict.call_args[0][1] == search_param_dict
Expand Down
4 changes: 2 additions & 2 deletions search/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
)

from search import tasks
from search.indexing_api import recreate_index, delete_index
from search.indexing_api import recreate_index, delete_indices


class ESTestCase(TestCase):
Expand All @@ -33,7 +33,7 @@ def setUp(self):

@classmethod
def tearDownClass(cls):
delete_index()
delete_indices()
super().tearDownClass()


Expand Down
159 changes: 124 additions & 35 deletions search/connection.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
"""Manages the Elasticsearch connection"""
import uuid

from django.conf import settings
from elasticsearch_dsl.connections import connections

Expand All @@ -8,22 +10,36 @@
_CONN = None
# When we create the connection, check to make sure all appropriate mappings exist
_CONN_VERIFIED = False
# This is a builtin type
PERCOLATE_DOC_TYPE = '.percolator'

USER_DOC_TYPE = 'program_user'
PUBLIC_USER_DOC_TYPE = 'public_program_user'
VALIDATABLE_DOC_TYPES = (
USER_DOC_TYPE,
# need to run recreate_index once in each env first, otherwise this will fail
# uncomment in the next release
# PUBLIC_USER_DOC_TYPE,
)
# This is a builtin type in Elasticsearch 2
LEGACY_PERCOLATE_DOC_TYPE = '.percolator'

LEGACY_USER_DOC_TYPE = 'program_user'
LEGACY_PUBLIC_USER_DOC_TYPE = 'public_program_user'

PUBLIC_ENROLLMENT_INDEX_TYPE = 'public_enrollment'
PRIVATE_ENROLLMENT_INDEX_TYPE = 'private_enrollment'
PERCOLATE_INDEX_TYPE = 'percolate'

GLOBAL_DOC_TYPE = 'doc'

def get_conn(verify=True, verify_index=None):
ALL_INDEX_TYPES = [
PUBLIC_ENROLLMENT_INDEX_TYPE,
PRIVATE_ENROLLMENT_INDEX_TYPE,
PERCOLATE_INDEX_TYPE,
]


def get_conn(*, verify=True, verify_indices=None):
"""
Lazily create the connection.
Args:
verify (bool): If true, check the presence of indices and mappings
verify_indices (list of str): If set, check the presence of these indices. Else use the defaults.
Returns:
elasticsearch.client.Elasticsearch: An Elasticsearch client
"""
# pylint: disable=global-statement
global _CONN
Expand Down Expand Up @@ -55,45 +71,118 @@ def get_conn(verify=True, verify_index=None):
return _CONN

# Make sure everything exists.
if verify_index is None:
verify_index = get_default_alias()
if not _CONN.indices.exists(verify_index):
raise ReindexException("Unable to find index {index_name}".format(
index_name=verify_index
))

for doc_type in VALIDATABLE_DOC_TYPES:
mapping = _CONN.indices.get_mapping(index=verify_index, doc_type=doc_type)
if not mapping:
raise ReindexException("Mapping {doc_type} not found".format(
doc_type=doc_type
if verify_indices is None:
verify_indices = set()
for index_type in ALL_INDEX_TYPES:
verify_indices = verify_indices.union(
(tup[0] for tup in get_aliases_and_doc_types(index_type))
)
for verify_index in verify_indices:
if not _CONN.indices.exists(verify_index):
raise ReindexException("Unable to find index {index_name}".format(
index_name=verify_index
))

_CONN_VERIFIED = True
return _CONN


def get_temp_alias():
def make_new_backing_index_name():
"""
Get name for alias to a the temporary index
Make a unique name for use for a backing index
Returns:
str: A new name for a backing index
"""
return "{}_temp".format(settings.ELASTICSEARCH_INDEX)
return "{prefix}_{hash}".format(
prefix=settings.ELASTICSEARCH_INDEX,
hash=uuid.uuid4().hex,
)


def get_default_alias():
def make_new_alias_name(index_type, *, is_reindexing):
"""
Get name for the alias to the default index
Make the name used for the Elasticsearch alias
Args:
index_type (str): The type of index
is_reindexing (bool): If true, use the alias name meant for reindexing
Returns:
str: The name of the alias
"""
return "{prefix}_{index_type}_{suffix}".format(
prefix=settings.ELASTICSEARCH_INDEX,
index_type=index_type,
suffix='reindexing' if is_reindexing else 'default'
)


def get_legacy_default_alias():
"""
Get name for the alias to the legacy index
Returns:
str: The name of the legacy alias
"""
return "{}_alias".format(settings.ELASTICSEARCH_INDEX)


def get_active_aliases():
def get_aliases_and_doc_types(index_type):
"""
Get aliases for active indexes.
Depending on whether or not we upgraded to the new schema for Elasticsearch 5,
return a list of active indices and associated doc types to use for indexing.
There is always one item in the returned list and the first tuple is always for the default alias.
Args:
index_type (str): The index type
Returns:
list of tuple:
(a tuple of the alias, the doc type to use for the indexing of that alias)
The list will always have at least one tuple, and the first is always the default, newest alias
"""
conn = get_conn(verify=False)
aliases = []
for alias in (get_default_alias(), get_temp_alias()):
if conn.indices.exists(alias):
aliases.append(alias)
return aliases

mapping = {
PRIVATE_ENROLLMENT_INDEX_TYPE: LEGACY_USER_DOC_TYPE,
PUBLIC_ENROLLMENT_INDEX_TYPE: LEGACY_PUBLIC_USER_DOC_TYPE,
PERCOLATE_INDEX_TYPE: LEGACY_PERCOLATE_DOC_TYPE,
}

legacy_doc_type = mapping[index_type]

default_alias = make_new_alias_name(index_type, is_reindexing=False)
reindexing_alias = make_new_alias_name(index_type, is_reindexing=True)
legacy_alias = get_legacy_default_alias()

if conn.indices.exists(default_alias):
# Elasticsearch 5
tuples = [
(default_alias, GLOBAL_DOC_TYPE),
(reindexing_alias, GLOBAL_DOC_TYPE),
(legacy_alias, legacy_doc_type),
]
return [
tup for tup in tuples if conn.indices.exists(tup[0])
]
else:
# Elasticsearch 2
return [
(legacy_alias, legacy_doc_type),
]


def get_default_alias_and_doc_type(index_type):
"""
Depending on whether or not we upgraded to the new schema for Elasticsearch 5,
return the doc type and index to use
Args:
index_type (str): The index type
Returns:
tuple: (the default alias to update, the doc type to use for the indexing)
"""
return get_aliases_and_doc_types(index_type)[0]
Loading

0 comments on commit b037a94

Please sign in to comment.