Skip to content

Commit

Permalink
Merge pull request #203 from clowder-framework/release/1.16.0
Browse files Browse the repository at this point in the history
Release/1.16.0
  • Loading branch information
lmarini authored Apr 1, 2021
2 parents 7d03641 + 8893116 commit 1e41093
Show file tree
Hide file tree
Showing 16 changed files with 617 additions and 78 deletions.
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,22 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## 1.16.0 - 2021-03-31

### Fixed
- Remove the RabbitMQ plugin from the docker version of clowder

### Added
- Added a `sort` and `order` parameter to `/api/search` endpoint that supports date and numeric field sorting.
If only order is specified, created date is used. String fields are not currently supported.
- Added a new `/api/deleteindex` admin endpoint that will queue an action to delete an Elasticsearch index (usually prior to a reindex).
- JMeter testing suite.

### Changed
- Consolidated field names sent by the EventSinkService to maximize reuse.
- Add status column to files report to indicate if files are ARCHIVED, etc.
- Reworked auto-archival configuration options to make their meanings more clear.

## 1.15.1 - 2021-03-12

### Fixed
Expand Down
27 changes: 12 additions & 15 deletions app/Global.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,18 @@ object Global extends WithFilters(new GzipFilter(), new Jsonp(), CORSFilter()) w

val archiveEnabled = Play.application.configuration.getBoolean("archiveEnabled", false)
if (archiveEnabled && archivalTimer == null) {
val archiveDebug = Play.application.configuration.getBoolean("archiveDebug", false)
val interval = if (archiveDebug) { 5 minutes } else { 1 day }

// Determine time until next midnight
val now = ZonedDateTime.now
val midnight = now.truncatedTo(ChronoUnit.DAYS)
val sinceLastMidnight = Duration.between(midnight, now).getSeconds
val delay = if (archiveDebug) { 10 seconds } else {
(Duration.ofDays(1).getSeconds - sinceLastMidnight) seconds
}

Logger.info("Starting archival loop - first iteration in " + delay + ", next iteration after " + interval)
archivalTimer = Akka.system.scheduler.schedule(delay, interval) {
Logger.info("Starting auto archive process...")
files.autoArchiveCandidateFiles()
// Set archiveAutoInterval == 0 to disable auto archiving
val archiveAutoInterval = Play.application.configuration.getLong("archiveAutoInterval", 0)
if (archiveAutoInterval > 0) {
val interval = FiniteDuration(archiveAutoInterval, SECONDS)
val archiveAutoDelay = Play.application.configuration.getLong("archiveAutoDelay", 0)
val delay = FiniteDuration(archiveAutoDelay, SECONDS)

Logger.info("Starting archival loop - first iteration in " + delay + ", next iteration after " + interval)
archivalTimer = Akka.system.scheduler.schedule(delay, interval) {
Logger.info("Starting auto archive process...")
files.autoArchiveCandidateFiles()
}
}
}

Expand Down
6 changes: 6 additions & 0 deletions app/api/Admin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,10 @@ class Admin @Inject() (userService: UserService,
if (success) Ok(toJson(Map("status" -> "reindex successfully queued")))
else BadRequest(toJson(Map("status" -> "reindex queuing failed, Elasticsearch may be disabled")))
}

def deleteIndex = ServerAdminAction { implicit request =>
val success = esqueue.queue("delete_index")
if (success) Ok(toJson(Map("status" -> "deindex successfully queued")))
else BadRequest(toJson(Map("status" -> "deindex queuing failed, Elasticsearch may be disabled")))
}
}
9 changes: 6 additions & 3 deletions app/api/Reporting.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class Reporting @Inject()(selections: SelectionService,
var headerRow = true
val enum = Enumerator.generateM({
val chunk = if (headerRow) {
val header = "type,id,name,owner,owner_id,size_kb,uploaded,views,downloads,last_viewed,last_downloaded,location,parent_datasets,parent_collections,parent_spaces\n"
val header = "type,id,name,owner,owner_id,size_kb,uploaded,views,downloads,last_viewed,last_downloaded,location,parent_datasets,parent_collections,parent_spaces,status\n"
headerRow = false
Some(header.getBytes("UTF-8"))
} else {
Expand Down Expand Up @@ -137,7 +137,7 @@ class Reporting @Inject()(selections: SelectionService,

// TODO: This will still fail on excessively large instances without Enumerator refactor - should we maintain this endpoint or remove?

var contents: String = "type,id,name,owner,owner_id,size_kb,uploaded/created,views,downloads,last_viewed,last_downloaded,location,parent_datasets,parent_collections,parent_spaces\n"
var contents: String = "type,id,name,owner,owner_id,size_kb,uploaded/created,views,downloads,last_viewed,last_downloaded,location,parent_datasets,parent_collections,parent_spaces,status\n"

collections.getMetrics().foreach(coll => {
contents += _buildCollectionRow(coll, true)
Expand Down Expand Up @@ -288,7 +288,8 @@ class Reporting @Inject()(selections: SelectionService,
contents += "\""+f.loader_id+"\","
contents += "\""+ds_list+"\","
contents += "\""+coll_list+"\","
contents += "\""+space_list+"\""
contents += "\""+space_list+"\","
contents += "\""+f.status+"\""
contents += "\n"

return contents
Expand Down Expand Up @@ -343,6 +344,7 @@ class Reporting @Inject()(selections: SelectionService,
if (returnAllColums) contents += "," // datasets do not have parent_datasets
contents += "\""+coll_list+"\","
contents += "\""+space_list+"\""
if (returnAllColums) contents += "," // datasets do not have status
contents += "\n"

return contents
Expand Down Expand Up @@ -391,6 +393,7 @@ class Reporting @Inject()(selections: SelectionService,
if (returnAllColums) contents += "," // collections do not have parent_datasets
contents += "\""+coll_list+"\","
contents += "\""+space_list+"\""
if (returnAllColums) contents += "," // collections do not have status
contents += "\n"

return contents
Expand Down
8 changes: 5 additions & 3 deletions app/api/Search.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class Search @Inject() (
/** Search using a simple text string with filters */
def search(query: String, resource_type: Option[String], datasetid: Option[String], collectionid: Option[String],
spaceid: Option[String], folderid: Option[String], field: Option[String], tag: Option[String],
from: Option[Int], size: Option[Int], page: Option[Int]) = PermissionAction(Permission.ViewDataset) { implicit request =>
from: Option[Int], size: Option[Int], page: Option[Int], sort: Option[String], order: Option[String]) = PermissionAction(Permission.ViewDataset) { implicit request =>
current.plugin[ElasticsearchPlugin] match {
case Some(plugin) => {
// If from is specified, use it. Otherwise use page * size of page if possible, otherwise use 0.
Expand All @@ -42,7 +42,9 @@ class Search @Inject() (
(spaceid match {case Some(x) => s"&spaceid=$x" case None => ""}) +
(folderid match {case Some(x) => s"&folderid=$x" case None => ""}) +
(field match {case Some(x) => s"&field=$x" case None => ""}) +
(tag match {case Some(x) => s"&tag=$x" case None => ""})
(tag match {case Some(x) => s"&tag=$x" case None => ""}) +
(sort match {case Some(x) => s"&sort=$x" case None => ""}) +
(order match {case Some(x) => s"&order=$x" case None => ""})

// Add space filter to search here as a simple permissions check
val superAdmin = request.user match {
Expand All @@ -54,7 +56,7 @@ class Search @Inject() (
else
spaces.listAccess(0, Set[Permission](Permission.ViewSpace), request.user, true, true, false, false).map(sp => sp.id)

val response = plugin.search(query, resource_type, datasetid, collectionid, spaceid, folderid, field, tag, from_index, size, permitted, request.user)
val response = plugin.search(query, resource_type, datasetid, collectionid, spaceid, folderid, field, tag, from_index, size, sort, order, permitted, request.user)
val result = SearchUtils.prepareSearchResponse(response, source_url, request.user)
Ok(toJson(result))
}
Expand Down
53 changes: 40 additions & 13 deletions app/services/ElasticsearchPlugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import play.api.libs.json._
import _root_.util.SearchUtils
import org.apache.commons.lang.StringUtils
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest
import org.elasticsearch.search.sort.SortOrder


/**
Expand Down Expand Up @@ -130,7 +131,8 @@ class ElasticsearchPlugin(application: Application) extends Plugin {
* "field_leaf_key": name of immediate field only, e.g. 'lines'
*/
val queryObj = prepareElasticJsonQuery(query, grouping, permitted, user)
accumulatePageResult(queryObj, user, from.getOrElse(0), size.getOrElse(maxResults))
// TODO: Support sorting in GUI search
accumulatePageResult(queryObj, user, from.getOrElse(0), size.getOrElse(maxResults), None, None)
}

/**
Expand All @@ -152,8 +154,8 @@ class ElasticsearchPlugin(application: Application) extends Plugin {
*/
def search(query: String, resource_type: Option[String], datasetid: Option[String], collectionid: Option[String],
spaceid: Option[String], folderid: Option[String], field: Option[String], tag: Option[String],
from: Option[Int], size: Option[Int], permitted: List[UUID], user: Option[User],
index: String = nameOfIndex): ElasticsearchResult = {
from: Option[Int], size: Option[Int], sort: Option[String], order: Option[String], permitted: List[UUID],
user: Option[User], index: String = nameOfIndex): ElasticsearchResult = {

// Convert any parameters from API into the query syntax equivalent so we can parse it all together later
var expanded_query = query
Expand All @@ -166,16 +168,16 @@ class ElasticsearchPlugin(application: Application) extends Plugin {
folderid.foreach(fid => expanded_query += s" in:$fid")

val queryObj = prepareElasticJsonQuery(expanded_query.stripPrefix(" "), permitted, user)
accumulatePageResult(queryObj, user, from.getOrElse(0), size.getOrElse(maxResults))
accumulatePageResult(queryObj, user, from.getOrElse(0), size.getOrElse(maxResults), sort, order)
}

/** Perform search, check permissions, and keep searching again if page isn't filled with permitted resources */
def accumulatePageResult(queryObj: XContentBuilder, user: Option[User], from: Int, size: Int,
index: String = nameOfIndex): ElasticsearchResult = {
sort: Option[String], order: Option[String], index: String = nameOfIndex): ElasticsearchResult = {
var total_results = ListBuffer.empty[ResourceRef]

// Fetch initial page & filter by permissions
val (results, total_size) = _search(queryObj, index, Some(from), Some(size))
val (results, total_size) = _search(queryObj, index, Some(from), Some(size), sort, order)
Logger.debug(s"Found ${results.length} results with ${total_size} total")
val filtered = checkResultPermissions(results, user)
Logger.debug(s"Permission to see ${filtered.length} results")
Expand All @@ -187,7 +189,7 @@ class ElasticsearchPlugin(application: Application) extends Plugin {
var exhausted = false
while (total_results.length < size && !exhausted) {
Logger.debug(s"Only have ${total_results.length} total results; searching for ${size*2} more from ${new_from}")
val (results, total_size) = _search(queryObj, index, Some(new_from), Some(size*2))
val (results, total_size) = _search(queryObj, index, Some(new_from), Some(size*2), sort, order)
Logger.debug(s"Found ${results.length} results with ${total_size} total")
if (results.length == 0 || new_from+results.length == total_size) exhausted = true // No more results to find
val filtered = checkResultPermissions(results, user)
Expand Down Expand Up @@ -251,17 +253,39 @@ class ElasticsearchPlugin(application: Application) extends Plugin {

/*** Execute query and return list of results and total result count as tuple */
def _search(queryObj: XContentBuilder, index: String = nameOfIndex,
from: Option[Int] = Some(0), size: Option[Int] = Some(maxResults)): (List[ResourceRef], Long) = {
from: Option[Int] = Some(0), size: Option[Int] = Some(maxResults),
sort: Option[String], order: Option[String]): (List[ResourceRef], Long) = {
connect()
val response = client match {
case Some(x) => {
Logger.info("Searching Elasticsearch: "+queryObj.string())
Logger.debug("Searching Elasticsearch: " + queryObj.string())

// Exclude _sort fields in response object
var sortFilter = jsonBuilder().startObject().startArray("exclude").value("*._sort").endArray().endObject()

var responsePrep = x.prepareSearch(index)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setSource(sortFilter)
.setQuery(queryObj)

responsePrep = responsePrep.setFrom(from.getOrElse(0))
responsePrep = responsePrep.setSize(size.getOrElse(maxResults))
// Default to ascending if no order provided but a field is
val searchOrder = order match {
case Some("asc") => SortOrder.ASC
case Some("desc") => SortOrder.DESC
case Some("DESC") => SortOrder.DESC
case _ => SortOrder.ASC
}
// Default to created field if order is provided but no field is
sort match {
// case Some("name") => responsePrep = responsePrep.addSort("name._sort", searchOrder) TODO: Not yet supported
case Some(x) => responsePrep = responsePrep.addSort(x, searchOrder)
case None => order match {
case Some(o) => responsePrep = responsePrep.addSort("created", searchOrder)
case None => {}
}
}

val response = responsePrep.setExplain(true).execute().actionGet()
Logger.debug("Search hits: " + response.getHits().getTotalHits())
Expand Down Expand Up @@ -291,8 +315,7 @@ class ElasticsearchPlugin(application: Application) extends Plugin {
.field("type", "custom")
.field("tokenizer", "uax_url_email")
.endObject()
.endObject()
.endObject()
.endObject().endObject()
.startObject("index")
.startObject("mapping")
.field("ignore_malformed", true)
Expand Down Expand Up @@ -697,10 +720,14 @@ class ElasticsearchPlugin(application: Application) extends Plugin {
* as strings for datatypes besides Objects. In the future, this could
* be removed, but only once the Search API better supports those data types (e.g. Date).
*/

// TODO: With Elastic 6.8+ we can use "normalizer": "case_insensitive" for _sort fields

"""{"clowder_object": {
|"numeric_detection": true,
|"properties": {
|"name": {"type": "string"},
|"name": {"type": "string", "fields": {
| "_sort": {"type":"string", "index": "not_analyzed"}}},
|"description": {"type": "string"},
|"resource_type": {"type": "string", "include_in_all": false},
|"child_of": {"type": "string", "include_in_all": false},
Expand Down Expand Up @@ -925,7 +952,7 @@ class ElasticsearchPlugin(application: Application) extends Plugin {
}
}

// If a term is specified that isn't in this list, it's assumed to be a metadata field
// If a term is specified that isn't in this list, it's assumed to be a metadata field (for sorting and filtering)
val official_terms = List("name", "creator", "created", "email", "resource_type", "in", "contains", "tag", "exists", "missing")

// Create list of (key, operator, value) for passing to builder
Expand Down
8 changes: 8 additions & 0 deletions app/services/mongodb/ElasticsearchQueue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class ElasticsearchQueue @Inject() (
}
}
case "index_all" => _indexAll()
case "delete_index" => _deleteIndex()
case "index_swap" => _swapIndex()
case _ => throw new IllegalArgumentException(s"Unrecognized action: ${action.action}")
}
Expand All @@ -63,6 +64,7 @@ class ElasticsearchQueue @Inject() (
case "index_dataset" => throw new IllegalArgumentException(s"No target specified for action ${action.action}")
case "index_collection" => throw new IllegalArgumentException(s"No target specified for action ${action.action}")
case "index_all" => _indexAll()
case "delete_index" => _deleteIndex()
case "index_swap" => _swapIndex()
case _ => throw new IllegalArgumentException(s"Unrecognized action: ${action.action}")
}
Expand Down Expand Up @@ -97,6 +99,12 @@ class ElasticsearchQueue @Inject() (
})
}

def _deleteIndex() = {
current.plugin[ElasticsearchPlugin].foreach(p => {
p.deleteAll()
})
}

// Replace the main index with the newly reindexed temp file
def _swapIndex() = {
Logger.debug("Swapping temporary reindex for main index")
Expand Down
Loading

0 comments on commit 1e41093

Please sign in to comment.