Skip to content

Commit

Permalink
[Pull-based Ingestion] Add basic NodeStats metrics (#17444)
Browse files Browse the repository at this point in the history
Signed-off-by: xuxiong1 <[email protected]>
  • Loading branch information
xuxiong1 authored Mar 3, 2025
1 parent bfdf019 commit 21f69ca
Show file tree
Hide file tree
Showing 22 changed files with 340 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.query.RangeQueryBuilder;
import org.opensearch.indices.pollingingest.PollingIngestStats;
import org.opensearch.plugins.PluginInfo;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Assert;
Expand Down Expand Up @@ -75,6 +76,11 @@ public void testKafkaIngestion() {
refresh("test");
SearchResponse response = client().prepareSearch("test").setQuery(query).get();
assertThat(response.getHits().getTotalHits().value(), is(1L));
PollingIngestStats stats = client().admin().indices().prepareStats("test").get().getIndex("test").getShards()[0]
.getPollingIngestStats();
assertNotNull(stats);
assertThat(stats.getMessageProcessorStats().getTotalProcessedCount(), is(2L));
assertThat(stats.getConsumerStats().getTotalPolledCount(), is(2L));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.pollingingest.PollingIngestStats;
import org.opensearch.node.NodeService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportRequest;
Expand Down Expand Up @@ -210,15 +211,18 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
CommitStats commitStats;
SeqNoStats seqNoStats;
RetentionLeaseStats retentionLeaseStats;
PollingIngestStats pollingIngestStats;
try {
commitStats = indexShard.commitStats();
seqNoStats = indexShard.seqNoStats();
retentionLeaseStats = indexShard.getRetentionLeaseStats();
pollingIngestStats = indexShard.pollingIngestStats();
} catch (final AlreadyClosedException e) {
// shard is closed - no stats is fine
commitStats = null;
seqNoStats = null;
retentionLeaseStats = null;
pollingIngestStats = null;
}
shardsStats.add(
new ShardStats(
Expand All @@ -227,7 +231,8 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
new CommonStats(indicesService.getIndicesQueryCache(), indexShard, commonStatsFlags),
commitStats,
seqNoStats,
retentionLeaseStats
retentionLeaseStats,
pollingIngestStats
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.action.admin.indices.stats;

import org.opensearch.Version;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.PublicApi;
Expand All @@ -44,6 +45,7 @@
import org.opensearch.index.seqno.RetentionLeaseStats;
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.indices.pollingingest.PollingIngestStats;

import java.io.IOException;

Expand All @@ -65,6 +67,9 @@ public class ShardStats implements Writeable, ToXContentFragment {
@Nullable
private RetentionLeaseStats retentionLeaseStats;

@Nullable
private PollingIngestStats pollingIngestStats;

/**
* Gets the current retention lease stats.
*
Expand All @@ -87,6 +92,9 @@ public ShardStats(StreamInput in) throws IOException {
isCustomDataPath = in.readBoolean();
seqNoStats = in.readOptionalWriteable(SeqNoStats::new);
retentionLeaseStats = in.readOptionalWriteable(RetentionLeaseStats::new);
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
pollingIngestStats = in.readOptionalWriteable(PollingIngestStats::new);
}
}

public ShardStats(
Expand All @@ -95,7 +103,8 @@ public ShardStats(
final CommonStats commonStats,
final CommitStats commitStats,
final SeqNoStats seqNoStats,
final RetentionLeaseStats retentionLeaseStats
final RetentionLeaseStats retentionLeaseStats,
final PollingIngestStats pollingIngestStats
) {
this.shardRouting = routing;
this.dataPath = shardPath.getRootDataPath().toString();
Expand All @@ -105,6 +114,7 @@ public ShardStats(
this.commonStats = commonStats;
this.seqNoStats = seqNoStats;
this.retentionLeaseStats = retentionLeaseStats;
this.pollingIngestStats = pollingIngestStats;
}

/**
Expand All @@ -128,6 +138,11 @@ public SeqNoStats getSeqNoStats() {
return this.seqNoStats;
}

@Nullable
public PollingIngestStats getPollingIngestStats() {
return this.pollingIngestStats;
}

public String getDataPath() {
return dataPath;
}
Expand All @@ -150,6 +165,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(isCustomDataPath);
out.writeOptionalWriteable(seqNoStats);
out.writeOptionalWriteable(retentionLeaseStats);
if (out.getVersion().onOrAfter((Version.V_3_0_0))) {
out.writeOptionalWriteable(pollingIngestStats);
}
}

@Override
Expand All @@ -171,6 +189,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (retentionLeaseStats != null) {
retentionLeaseStats.toXContent(builder, params);
}
if (pollingIngestStats != null) {
pollingIngestStats.toXContent(builder, params);
}
builder.startObject(Fields.SHARD_PATH);
builder.field(Fields.STATE_PATH, statePath);
builder.field(Fields.DATA_PATH, dataPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardNotFoundException;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.pollingingest.PollingIngestStats;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand Down Expand Up @@ -141,16 +142,27 @@ protected ShardStats shardOperation(IndicesStatsRequest request, ShardRouting sh
CommitStats commitStats;
SeqNoStats seqNoStats;
RetentionLeaseStats retentionLeaseStats;
PollingIngestStats pollingIngestStats;
try {
commitStats = indexShard.commitStats();
seqNoStats = indexShard.seqNoStats();
retentionLeaseStats = indexShard.getRetentionLeaseStats();
pollingIngestStats = indexShard.pollingIngestStats();
} catch (final AlreadyClosedException e) {
// shard is closed - no stats is fine
commitStats = null;
seqNoStats = null;
retentionLeaseStats = null;
pollingIngestStats = null;
}
return new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), commonStats, commitStats, seqNoStats, retentionLeaseStats);
return new ShardStats(
indexShard.routingEntry(),
indexShard.shardPath(),
commonStats,
commitStats,
seqNoStats,
retentionLeaseStats,
pollingIngestStats
);
}
}
8 changes: 8 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogManager;
import org.opensearch.indices.pollingingest.PollingIngestStats;
import org.opensearch.search.suggest.completion.CompletionStats;

import java.io.Closeable;
Expand Down Expand Up @@ -946,6 +947,13 @@ public SegmentsStats segmentsStats(boolean includeSegmentFileSizes, boolean incl
return stats;
}

/**
* @return Stats for pull-based ingestion.
*/
public PollingIngestStats pollingIngestStats() {
return null;
}

protected TranslogDeletionPolicy getTranslogDeletionPolicy(EngineConfig engineConfig) {
TranslogDeletionPolicy customTranslogDeletionPolicy = null;
if (engineConfig.getCustomTranslogDeletionPolicyFactory() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.opensearch.index.translog.TranslogStats;
import org.opensearch.index.translog.listener.CompositeTranslogEventListener;
import org.opensearch.indices.pollingingest.DefaultStreamPoller;
import org.opensearch.indices.pollingingest.PollingIngestStats;
import org.opensearch.indices.pollingingest.StreamPoller;

import java.io.IOException;
Expand Down Expand Up @@ -288,4 +289,9 @@ protected TranslogManager createTranslogManager(
protected Map<String, String> commitDataAsMap() {
return commitDataAsMap(indexWriter);
}

@Override
public PollingIngestStats pollingIngestStats() {
return streamPoller.getStats();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.indices.pollingingest.PollingIngestStats;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryFailedException;
import org.opensearch.indices.recovery.RecoveryListener;
Expand Down Expand Up @@ -1533,6 +1534,10 @@ public CompletionStats completionStats(String... fields) {
return getEngine().completionStats(fields);
}

public PollingIngestStats pollingIngestStats() {
return getEngine().pollingIngestStats();
}

/**
* Executes the given flush request against the engine.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.pollingingest.IngestionEngineFactory;
import org.opensearch.indices.pollingingest.PollingIngestStats;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoverySettings;
Expand Down Expand Up @@ -758,15 +759,18 @@ IndexShardStats indexShardStats(final IndicesService indicesService, final Index
CommitStats commitStats;
SeqNoStats seqNoStats;
RetentionLeaseStats retentionLeaseStats;
PollingIngestStats pollingIngestStats;
try {
commitStats = indexShard.commitStats();
seqNoStats = indexShard.seqNoStats();
retentionLeaseStats = indexShard.getRetentionLeaseStats();
pollingIngestStats = indexShard.pollingIngestStats();
} catch (AlreadyClosedException e) {
// shard is closed - no stats is fine
commitStats = null;
seqNoStats = null;
retentionLeaseStats = null;
pollingIngestStats = null;
}

return new IndexShardStats(
Expand All @@ -778,7 +782,8 @@ IndexShardStats indexShardStats(final IndicesService indicesService, final Index
new CommonStats(indicesService.getIndicesQueryCache(), indexShard, flags),
commitStats,
seqNoStats,
retentionLeaseStats
retentionLeaseStats,
pollingIngestStats
) }
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.Nullable;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.index.IngestionShardConsumer;
import org.opensearch.index.IngestionShardPointer;
import org.opensearch.index.Message;
Expand Down Expand Up @@ -60,6 +61,8 @@ public class DefaultStreamPoller implements StreamPoller {

private MessageProcessorRunnable processorRunnable;

private final CounterMetric totalPolledCount = new CounterMetric();

// A pointer to the max persisted pointer for optimizing the check
@Nullable
private IngestionShardPointer maxPersistedPointer;
Expand Down Expand Up @@ -204,6 +207,7 @@ protected void startPoll() {
logger.info("Skipping message with pointer {} as it is already processed", result.getPointer().asString());
continue;
}
totalPolledCount.inc();
blockingQueue.put(result);
logger.debug(
"Put message {} with pointer {} to the blocking queue",
Expand Down Expand Up @@ -297,6 +301,14 @@ public IngestionShardPointer getBatchStartPointer() {
return batchStartPointer;
}

@Override
public PollingIngestStats getStats() {
PollingIngestStats.Builder builder = new PollingIngestStats.Builder();
builder.setTotalPolledCount(totalPolledCount.count());
builder.setTotalProcessedCount(processorRunnable.getStats().count());
return builder.build();
}

public State getState() {
return state;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.lucene.index.Term;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.common.lucene.uid.Versions;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.common.bytes.BytesArray;
Expand Down Expand Up @@ -48,6 +49,7 @@ public class MessageProcessorRunnable implements Runnable {

private final BlockingQueue<IngestionShardConsumer.ReadResult<? extends IngestionShardPointer, ? extends Message>> blockingQueue;
private final MessageProcessor messageProcessor;
private final CounterMetric stats = new CounterMetric();

private static final String ID = "_id";
private static final String OP_TYPE = "_op_type";
Expand Down Expand Up @@ -229,8 +231,13 @@ public void run() {
Thread.currentThread().interrupt(); // Restore interrupt status
}
if (result != null) {
stats.inc();
messageProcessor.process(result.getMessage(), result.getPointer());
}
}
}

public CounterMetric getStats() {
return stats;
}
}
Loading

0 comments on commit 21f69ca

Please sign in to comment.