diff --git a/conf/broker.conf b/conf/broker.conf index 782b3755754b3..a543fa2db71c6 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1755,6 +1755,9 @@ managedLedgerOffloadDriver= # Maximum number of thread pool threads for ledger offloading managedLedgerOffloadMaxThreads=2 +# Maximum number of read thread pool threads for ledger offloading +managedLedgerOffloadReadThreads=2 + # The extraction directory of the nar package. # Available for Protocol Handler, Additional Servlets, Entry Filter, Offloaders, Broker Interceptor. # Default is System.getProperty("java.io.tmpdir"). diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java index 9fbf9b73c057e..80135925913d2 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java @@ -72,6 +72,27 @@ T create(OffloadPoliciesImpl offloadPolicies, LedgerOffloaderStats offloaderStats) throws IOException; + /** + * Create a ledger offloader with the provided configuration, user-metadata, + * scheduler, readExecutor and offloaderStats. + * + * @param offloadPolicies offload policies + * @param userMetadata user metadata + * @param scheduler scheduler + * @param readExecutor read executor + * @param offloaderStats offloaderStats + * @return the offloader instance + * @throws IOException when fail to create an offloader + */ + default T create(OffloadPoliciesImpl offloadPolicies, + Map userMetadata, + OrderedScheduler scheduler, + OrderedScheduler readExecutor, + LedgerOffloaderStats offloaderStats) + throws IOException { + return create(offloadPolicies, userMetadata, scheduler, offloaderStats); + } + /** * Create a ledger offloader with the provided configuration, user-metadata, schema storage and scheduler. @@ -112,6 +133,30 @@ default T create(OffloadPoliciesImpl offloadPolicies, return create(offloadPolicies, userMetadata, scheduler, offloaderStats); } + + /** + * Create a ledger offloader with the provided configuration, user-metadata, schema storage, + * scheduler, readExecutor and offloaderStats. + * + * @param offloadPolicies offload policies + * @param userMetadata user metadata + * @param schemaStorage used for schema lookup in offloader + * @param scheduler scheduler + * @param readExecutor read executor + * @param offloaderStats offloaderStats + * @return the offloader instance + * @throws IOException when fail to create an offloader + */ + default T create(OffloadPoliciesImpl offloadPolicies, + Map userMetadata, + SchemaStorage schemaStorage, + OrderedScheduler scheduler, + OrderedScheduler readExecutor, + LedgerOffloaderStats offloaderStats) + throws IOException { + return create(offloadPolicies, userMetadata, scheduler, readExecutor, offloaderStats); + } + @Override default void close() throws Exception { // no-op diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 96973fe0052dd..ca4408aa4e7c0 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -3395,6 +3395,12 @@ public double getLoadBalancerBandwidthOutResourceWeight() { ) private int managedLedgerOffloadMaxThreads = 2; + @FieldContext( + category = CATEGORY_STORAGE_OFFLOADING, + doc = "Maximum number of thread pool threads for offloaded ledger reading" + ) + private int managedLedgerOffloadReadThreads = 2; + @FieldContext( category = CATEGORY_STORAGE_OFFLOADING, doc = "The directory where nar Extraction of offloaders happens" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index a09ed5142b8ae..c63f17c031e49 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -231,6 +231,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { private final ScheduledExecutorService loadManagerExecutor; private ScheduledExecutorService compactorExecutor; private OrderedScheduler offloaderScheduler; + private OrderedScheduler offloaderReadExecutor; private OffloadersCache offloadersCache = new OffloadersCache(); private LedgerOffloader defaultOffloader; private LedgerOffloaderStats offloaderStats; @@ -657,6 +658,7 @@ public CompletableFuture closeAsync() { executorServicesShutdown.shutdown(compactorExecutor); executorServicesShutdown.shutdown(offloaderScheduler); + executorServicesShutdown.shutdown(offloaderReadExecutor); executorServicesShutdown.shutdown(executor); executorServicesShutdown.shutdown(orderedExecutor); @@ -1615,7 +1617,8 @@ public LedgerOffloader createManagedLedgerOffloader(OffloadPoliciesImpl offloadP LedgerOffloader.METADATA_PULSAR_CLUSTER_NAME.toLowerCase(), config.getClusterName() ), - schemaStorage, getOffloaderScheduler(offloadPolicies), this.offloaderStats); + schemaStorage, getOffloaderScheduler(offloadPolicies), + getOffloaderReadScheduler(offloadPolicies), this.offloaderStats); } catch (IOException ioe) { throw new PulsarServerException(ioe.getMessage(), ioe.getCause()); } @@ -1687,6 +1690,15 @@ protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImp return this.offloaderScheduler; } + protected synchronized OrderedScheduler getOffloaderReadScheduler(OffloadPoliciesImpl offloadPolicies) { + if (this.offloaderReadExecutor == null) { + this.offloaderReadExecutor = OrderedScheduler.newSchedulerBuilder() + .numThreads(offloadPolicies.getManagedLedgerOffloadReadThreads()) + .name("offloader-read").build(); + } + return this.offloaderReadExecutor; + } + public PulsarClientImpl createClientImpl(ClientConfigurationData clientConf) throws PulsarClientException { return PulsarClientImpl.builder() diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java index 9073de658d405..924153e71ffe2 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java @@ -87,6 +87,8 @@ interface Builder { Builder managedLedgerOffloadMaxThreads(Integer managedLedgerOffloadMaxThreads); + Builder managedLedgerOffloadReadThreads(Integer managedLedgerOffloadReadThreads); + Builder managedLedgerOffloadPrefetchRounds(Integer managedLedgerOffloadPrefetchRounds); Builder managedLedgerOffloadThresholdInBytes(Long managedLedgerOffloadThresholdInBytes); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java index 6c40aa3f2edd0..910075e387059 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java @@ -83,6 +83,7 @@ public class OffloadPoliciesImpl implements Serializable, OffloadPolicies { public static final int DEFAULT_GCS_MAX_BLOCK_SIZE_IN_BYTES = 128 * 1024 * 1024; // 128MiB public static final int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024; // 1MiB public static final int DEFAULT_OFFLOAD_MAX_THREADS = 2; + public static final int DEFAULT_OFFLOAD_READ_THREADS = 2; public static final int DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS = 1; public static final String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders"; public static final Long DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES = null; @@ -109,6 +110,9 @@ public class OffloadPoliciesImpl implements Serializable, OffloadPolicies { private Integer managedLedgerOffloadMaxThreads = DEFAULT_OFFLOAD_MAX_THREADS; @Configuration @JsonProperty(access = JsonProperty.Access.READ_WRITE) + private Integer managedLedgerOffloadReadThreads = DEFAULT_OFFLOAD_READ_THREADS; + @Configuration + @JsonProperty(access = JsonProperty.Access.READ_WRITE) private Integer managedLedgerOffloadPrefetchRounds = DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS; @Configuration @JsonProperty(access = JsonProperty.Access.READ_WRITE) @@ -500,6 +504,11 @@ public OffloadPoliciesImplBuilder managedLedgerOffloadMaxThreads(Integer managed return this; } + public OffloadPoliciesImplBuilder managedLedgerOffloadReadThreads(Integer managedLedgerOffloadReadThreads) { + impl.managedLedgerOffloadReadThreads = managedLedgerOffloadReadThreads; + return this; + } + public OffloadPoliciesImplBuilder managedLedgerOffloadPrefetchRounds( Integer managedLedgerOffloadPrefetchRounds) { impl.managedLedgerOffloadPrefetchRounds = managedLedgerOffloadPrefetchRounds; diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java index 60363cf8406db..ef2db046e3ceb 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java @@ -54,7 +54,19 @@ public BlobStoreManagedLedgerOffloader create(OffloadPoliciesImpl offloadPolicie TieredStorageConfiguration config = TieredStorageConfiguration.create(offloadPolicies.toProperties()); - return BlobStoreManagedLedgerOffloader.create(config, userMetadata, scheduler, offloaderStats, + return BlobStoreManagedLedgerOffloader.create(config, userMetadata, scheduler, scheduler, offloaderStats, + entryOffsetsCache); + } + + @Override + public BlobStoreManagedLedgerOffloader create(OffloadPoliciesImpl offloadPolicies, Map userMetadata, + OrderedScheduler scheduler, + OrderedScheduler readExecutor, + LedgerOffloaderStats offloaderStats) throws IOException { + + TieredStorageConfiguration config = + TieredStorageConfiguration.create(offloadPolicies.toProperties()); + return BlobStoreManagedLedgerOffloader.create(config, userMetadata, scheduler, readExecutor, offloaderStats, entryOffsetsCache); } diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java index d6d298159ea22..5ee62591b42fe 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java @@ -97,6 +97,7 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader { private static final String MANAGED_LEDGER_NAME = "ManagedLedgerName"; private final OrderedScheduler scheduler; + private final OrderedScheduler readExecutor; private final TieredStorageConfiguration config; private final OffloadPolicies policies; private final Location writeLocation; @@ -125,18 +126,21 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader { public static BlobStoreManagedLedgerOffloader create(TieredStorageConfiguration config, Map userMetadata, OrderedScheduler scheduler, + OrderedScheduler readExecutor, LedgerOffloaderStats offloaderStats, OffsetsCache entryOffsetsCache) throws IOException { - return new BlobStoreManagedLedgerOffloader(config, scheduler, userMetadata, offloaderStats, entryOffsetsCache); + return new BlobStoreManagedLedgerOffloader(config, scheduler, readExecutor, + userMetadata, offloaderStats, entryOffsetsCache); } BlobStoreManagedLedgerOffloader(TieredStorageConfiguration config, OrderedScheduler scheduler, + OrderedScheduler readExecutor, Map userMetadata, LedgerOffloaderStats offloaderStats, OffsetsCache entryOffsetsCache) { - this.scheduler = scheduler; + this.readExecutor = readExecutor; this.userMetadata = userMetadata; this.config = config; Properties properties = new Properties(); @@ -556,10 +560,10 @@ public CompletableFuture readOffloaded(long ledgerId, UUID uid, CompletableFuture promise = new CompletableFuture<>(); String key = DataBlockUtils.dataBlockOffloadKey(ledgerId, uid); String indexKey = DataBlockUtils.indexBlockOffloadKey(ledgerId, uid); - scheduler.chooseThread(ledgerId).execute(() -> { + readExecutor.chooseThread(ledgerId).execute(() -> { try { BlobStore readBlobstore = getBlobStore(config.getBlobStoreLocation()); - promise.complete(BlobStoreBackedReadHandleImpl.open(scheduler.chooseThread(ledgerId), + promise.complete(BlobStoreBackedReadHandleImpl.open(readExecutor.chooseThread(ledgerId), readBlobstore, readBucket, key, indexKey, DataBlockUtils.VERSION_CHECK, @@ -591,10 +595,10 @@ public CompletableFuture readOffloaded(long ledgerId, MLDataFormats. indexKeys.add(indexKey); }); - scheduler.chooseThread(ledgerId).execute(() -> { + readExecutor.chooseThread(ledgerId).execute(() -> { try { BlobStore readBlobstore = getBlobStore(config.getBlobStoreLocation()); - promise.complete(BlobStoreBackedReadHandleImplV2.open(scheduler.chooseThread(ledgerId), + promise.complete(BlobStoreBackedReadHandleImplV2.open(readExecutor.chooseThread(ledgerId), readBlobstore, readBucket, keys, indexKeys, DataBlockUtils.VERSION_CHECK, diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java index e706e4254cb11..67dcb73b9e58a 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java @@ -82,7 +82,7 @@ private BlobStoreManagedLedgerOffloader getOffloader(String bucket, Map(), scheduler, this.offloaderStats, entryOffsetsCache); + .create(mockedConfig, new HashMap(), scheduler, scheduler, this.offloaderStats, entryOffsetsCache); return offloader; } @@ -91,7 +91,7 @@ private BlobStoreManagedLedgerOffloader getOffloader(String bucket, BlobStore mo mockedConfig = mock(TieredStorageConfiguration.class, delegatesTo(getConfiguration(bucket, additionalConfig))); Mockito.doReturn(mockedBlobStore).when(mockedConfig).getBlobStore(); BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader - .create(mockedConfig, new HashMap(), scheduler, this.offloaderStats, entryOffsetsCache); + .create(mockedConfig, new HashMap(), scheduler, scheduler, this.offloaderStats, entryOffsetsCache); return offloader; } diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java index bf6ede896ab28..c8bcb67aa2cdd 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java @@ -98,7 +98,7 @@ private BlobStoreManagedLedgerOffloader getOffloader(BlobStore mockedBlobStore) private BlobStoreManagedLedgerOffloader getOffloader(String bucket) throws IOException { mockedConfig = mock(TieredStorageConfiguration.class, delegatesTo(getConfiguration(bucket))); Mockito.doReturn(blobStore).when(mockedConfig).getBlobStore(); // Use the REAL blobStore - BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader.create(mockedConfig, new HashMap(), scheduler, this.offloaderStats, + BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader.create(mockedConfig, new HashMap(), scheduler, scheduler, this.offloaderStats, entryOffsetsCache); return offloader; } @@ -106,7 +106,7 @@ private BlobStoreManagedLedgerOffloader getOffloader(String bucket) throws IOExc private BlobStoreManagedLedgerOffloader getOffloader(String bucket, BlobStore mockedBlobStore) throws IOException { mockedConfig = mock(TieredStorageConfiguration.class, delegatesTo(getConfiguration(bucket))); Mockito.doReturn(mockedBlobStore).when(mockedConfig).getBlobStore(); - BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader.create(mockedConfig, new HashMap(), scheduler, this.offloaderStats, + BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader.create(mockedConfig, new HashMap(), scheduler, scheduler, this.offloaderStats, entryOffsetsCache); return offloader; }