Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache segment metadata on the Overlord to speed up segment allocation and other task actions #17653

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,16 @@
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.metadata.segment.SqlSegmentsMetadataTransactionFactory;
import org.apache.druid.metadata.segment.cache.NoopSegmentsMetadataCache;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.segment.realtime.ChatHandlerProvider;
import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
Expand Down Expand Up @@ -105,6 +108,13 @@ public void setUp()
derbyConnector
);
indexerMetadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator(
new SqlSegmentsMetadataTransactionFactory(
objectMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnector,
new TestDruidLeaderSelector(),
new NoopSegmentsMetadataCache()
),
objectMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnector,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.metadata.segment.SqlSegmentsMetadataTransactionFactory;
import org.apache.druid.metadata.segment.cache.NoopSegmentsMetadataCache;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QueryRunnerTestHelper;
Expand All @@ -56,6 +58,7 @@
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
Expand Down Expand Up @@ -112,6 +115,13 @@ public void setUp() throws Exception
);

metadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator(
new SqlSegmentsMetadataTransactionFactory(
jsonMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnector,
new TestDruidLeaderSelector(),
new NoopSegmentsMetadataCache()
),
jsonMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnector,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.metadata.segment.cache.SegmentsMetadataCache;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig;

Expand Down Expand Up @@ -88,6 +89,7 @@ public DruidOverlord(
final OverlordDutyExecutor overlordDutyExecutor,
@IndexingService final DruidLeaderSelector overlordLeaderSelector,
final SegmentAllocationQueue segmentAllocationQueue,
final SegmentsMetadataCache segmentsMetadataCache,
final CompactionScheduler compactionScheduler,
final ObjectMapper mapper,
final TaskContextEnricher taskContextEnricher
Expand Down Expand Up @@ -132,6 +134,7 @@ public void becomeLeader()

// First add "half leader" services: everything required for APIs except the supervisor manager.
// Then, become "half leader" so those APIs light up and supervisor initialization can proceed.
leaderLifecycle.addManagedInstance(segmentsMetadataCache);
leaderLifecycle.addManagedInstance(taskRunner);
leaderLifecycle.addManagedInstance(taskQueue);
leaderLifecycle.addHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1170,7 +1170,7 @@ public void testSegmentIdMustNotBeReused()
// Allocate another id and ensure that it doesn't exist in the druid_segments table
final SegmentIdWithShardSpec theId =
allocate(task1, DateTimes.nowUtc(), Granularities.NONE, Granularities.ALL, "seq", "3");
Assert.assertNull(coordinator.retrieveSegmentForId(theId.asSegmentId().toString(), true));
Assert.assertNull(coordinator.retrieveSegmentForId(theId.getDataSource(), theId.asSegmentId().toString()));

lockbox.unlock(task1, Intervals.ETERNITY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@
import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.metadata.segment.SqlSegmentsMetadataTransactionFactory;
import org.apache.druid.metadata.segment.cache.NoopSegmentsMetadataCache;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.SegmentSchemaCache;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.easymock.EasyMock;
import org.joda.time.Period;
Expand Down Expand Up @@ -96,6 +99,13 @@ public void before()
final ObjectMapper objectMapper = new TestUtils().getTestObjectMapper();
segmentSchemaManager = new SegmentSchemaManager(metadataStorageTablesConfig, objectMapper, testDerbyConnector);
metadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator(
new SqlSegmentsMetadataTransactionFactory(
objectMapper,
metadataStorageTablesConfig,
testDerbyConnector,
new TestDruidLeaderSelector(),
new NoopSegmentsMetadataCache()
),
objectMapper,
metadataStorageTablesConfig,
testDerbyConnector,
Expand All @@ -113,7 +123,7 @@ public int getSqlMetadataMaxRetry()
segmentSchemaCache = new SegmentSchemaCache(NoopServiceEmitter.instance());
segmentsMetadataManager = new SqlSegmentsMetadataManager(
objectMapper,
Suppliers.ofInstance(new SegmentsMetadataManagerConfig()),
Suppliers.ofInstance(new SegmentsMetadataManagerConfig(null, null)),
Suppliers.ofInstance(metadataStorageTablesConfig),
testDerbyConnector,
segmentSchemaCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.metadata.segment.SqlSegmentsMetadataTransactionFactory;
import org.apache.druid.metadata.segment.cache.NoopSegmentsMetadataCache;
import org.apache.druid.segment.DataSegmentsWithSchemas;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9Factory;
Expand All @@ -85,6 +87,7 @@
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.segment.realtime.NoopChatHandlerProvider;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.testing.InitializedNullHandlingTest;
Expand Down Expand Up @@ -124,8 +127,6 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
private SegmentsMetadataManager segmentsMetadataManager;
private TaskLockbox lockbox;
private File baseDir;
private SegmentSchemaManager segmentSchemaManager;
private SegmentSchemaCache segmentSchemaCache;
private SupervisorManager supervisorManager;
private TestDataSegmentKiller dataSegmentKiller;
protected File reportsFile;
Expand All @@ -142,23 +143,30 @@ public void setUpIngestionTestBase() throws IOException
connector.createSegmentSchemasTable();
connector.createSegmentTable();
taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
segmentSchemaManager = new SegmentSchemaManager(
SegmentSchemaManager segmentSchemaManager = new SegmentSchemaManager(
derbyConnectorRule.metadataTablesConfigSupplier().get(),
objectMapper,
derbyConnectorRule.getConnector()
);

storageCoordinator = new IndexerSQLMetadataStorageCoordinator(
new SqlSegmentsMetadataTransactionFactory(
objectMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnectorRule.getConnector(),
new TestDruidLeaderSelector(),
new NoopSegmentsMetadataCache()
),
objectMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnectorRule.getConnector(),
segmentSchemaManager,
CentralizedDatasourceSchemaConfig.create()
);
segmentSchemaCache = new SegmentSchemaCache(NoopServiceEmitter.instance());
SegmentSchemaCache segmentSchemaCache = new SegmentSchemaCache(NoopServiceEmitter.instance());
segmentsMetadataManager = new SqlSegmentsMetadataManager(
objectMapper,
SegmentsMetadataManagerConfig::new,
() -> new SegmentsMetadataManagerConfig(null, null),
derbyConnectorRule.metadataTablesConfigSupplier(),
derbyConnectorRule.getConnector(),
segmentSchemaCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@
import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.metadata.segment.SqlSegmentsMetadataTransactionFactory;
import org.apache.druid.metadata.segment.cache.NoopSegmentsMetadataCache;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -80,6 +83,13 @@ public void setup()
lockbox = new TaskLockbox(
taskStorage,
new IndexerSQLMetadataStorageCoordinator(
new SqlSegmentsMetadataTransactionFactory(
objectMapper,
derby.metadataTablesConfigSupplier().get(),
derbyConnector,
new TestDruidLeaderSelector(),
new NoopSegmentsMetadataCache()
),
objectMapper,
derby.metadataTablesConfigSupplier().get(),
derbyConnector,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,13 @@
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.metadata.segment.SqlSegmentsMetadataTransactionFactory;
import org.apache.druid.metadata.segment.cache.NoopSegmentsMetadataCache;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
import org.apache.druid.timeline.partition.HashBasedNumberedPartialShardSpec;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.NumberedOverwritePartialShardSpec;
Expand Down Expand Up @@ -90,7 +93,6 @@ public class TaskLockboxTest
@Rule
public final TestDerbyConnector.DerbyConnectorRule derby = new TestDerbyConnector.DerbyConnectorRule();

private ObjectMapper objectMapper;
private TaskStorage taskStorage;
private IndexerMetadataStorageCoordinator metadataStorageCoordinator;
private TaskLockbox lockbox;
Expand All @@ -104,7 +106,7 @@ public class TaskLockboxTest
@Before
public void setup()
{
objectMapper = TestHelper.makeJsonMapper();
final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
objectMapper.registerSubtypes(NumberedShardSpec.class, HashBasedNumberedShardSpec.class);
final TestDerbyConnector derbyConnector = derby.getConnector();
derbyConnector.createTaskTables();
Expand All @@ -129,6 +131,13 @@ public void setup()
EasyMock.replay(emitter);

metadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator(
new SqlSegmentsMetadataTransactionFactory(
objectMapper,
tablesConfig,
derbyConnector,
new TestDruidLeaderSelector(),
new NoopSegmentsMetadataCache()
),
objectMapper,
tablesConfig,
derbyConnector,
Expand Down Expand Up @@ -463,6 +472,13 @@ public void testSyncWithUnknownTaskTypesFromModuleNotLoaded()
);

IndexerMetadataStorageCoordinator loadedMetadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator(
new SqlSegmentsMetadataTransactionFactory(
loadedMapper,
derby.metadataTablesConfigSupplier().get(),
derbyConnector,
new TestDruidLeaderSelector(),
new NoopSegmentsMetadataCache()
),
loadedMapper,
derby.metadataTablesConfigSupplier().get(),
derbyConnector,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,12 @@
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.metadata.segment.SqlSegmentsMetadataTransactionFactory;
import org.apache.druid.metadata.segment.cache.NoopSegmentsMetadataCache;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.joda.time.Duration;
import org.joda.time.Period;
Expand Down Expand Up @@ -103,6 +106,13 @@ public void setUp()
final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
segmentSchemaManager = new SegmentSchemaManager(derbyConnectorRule.metadataTablesConfigSupplier().get(), jsonMapper, derbyConnectorRule.getConnector());
final IndexerSQLMetadataStorageCoordinator storageCoordinator = new IndexerSQLMetadataStorageCoordinator(
new SqlSegmentsMetadataTransactionFactory(
jsonMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnectorRule.getConnector(),
new TestDruidLeaderSelector(),
new NoopSegmentsMetadataCache()
),
jsonMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnectorRule.getConnector(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.metadata.segment.cache.SegmentsMetadataCache;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
Expand Down Expand Up @@ -258,6 +259,7 @@ public MockTaskRunner get()
EasyMock.createNiceMock(OverlordDutyExecutor.class),
new TestDruidLeaderSelector(),
EasyMock.createNiceMock(SegmentAllocationQueue.class),
EasyMock.createNiceMock(SegmentsMetadataCache.class),
EasyMock.createNiceMock(CompactionScheduler.class),
new DefaultObjectMapper(),
new NoopTaskContextEnricher()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@
import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.metadata.segment.SqlSegmentsMetadataTransactionFactory;
import org.apache.druid.metadata.segment.cache.NoopSegmentsMetadataCache;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryPlus;
Expand Down Expand Up @@ -120,6 +122,7 @@
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CompressionUtils;
Expand Down Expand Up @@ -587,6 +590,13 @@ protected void makeToolboxFactory(TestUtils testUtils, ServiceEmitter emitter, b
);
segmentSchemaManager = new SegmentSchemaManager(derby.metadataTablesConfigSupplier().get(), objectMapper, derbyConnector);
metadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator(
new SqlSegmentsMetadataTransactionFactory(
objectMapper,
derby.metadataTablesConfigSupplier().get(),
derbyConnector,
new TestDruidLeaderSelector(),
new NoopSegmentsMetadataCache()
),
objectMapper,
derby.metadataTablesConfigSupplier().get(),
derbyConnector,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,14 +254,6 @@ public SegmentIdWithShardSpec allocatePendingSegment(
);
}

@Override
public List<PendingSegmentRecord> upgradePendingSegmentsOverlappingWith(
Set<DataSegment> replaceSegments
)
{
return Collections.emptyList();
}

@Override
public int deletePendingSegmentsCreatedInInterval(String dataSource, Interval deleteInterval)
{
Expand All @@ -288,7 +280,13 @@ public void updateSegmentMetadata(Set<DataSegment> segments)
}

@Override
public DataSegment retrieveSegmentForId(final String id, boolean includeUnused)
public DataSegment retrieveSegmentForId(String dataSource, String segmentId)
{
return null;
}

@Override
public DataSegment retrieveUsedSegmentForId(String dataSource, String segmentId)
{
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ public void addSegments(Iterator<DataSegment> segments)
);
}

public void add(DataSegment segment)
{
add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment));
}

public void remove(DataSegment segment)
{
remove(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment));
}

public boolean isOvershadowed(DataSegment segment)
{
return isOvershadowed(segment.getInterval(), segment.getVersion(), segment);
Expand Down
Loading
Loading