Skip to content

Commit

Permalink
Merge branch 'master' into producer-new-message-schema-txn
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari authored Feb 28, 2025
2 parents 59dd60a + 689c16a commit c9b0771
Show file tree
Hide file tree
Showing 219 changed files with 7,400 additions and 2,389 deletions.
8 changes: 7 additions & 1 deletion .github/workflows/pulsar-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1494,15 +1494,21 @@ jobs:
restore-keys: |
owasp-dependency-check-data-
- name: Log warning when skipped
if: ${{ !steps.restore-owasp-dependency-check-data.outputs.cache-matched-key }}
run: |
echo "::warning::OWASP Dependency Check was skipped since the OWASP Dependency check data wasn't found in the cache. Run ci-owasp-dependency-check.yaml workflow to update the cache."
# Projects dependent on flume, hdfs, and hbase currently excluded from the scan.
- name: trigger dependency check
if: ${{ steps.restore-owasp-dependency-check-data.outputs.cache-matched-key }}
run: |
mvn -B -ntp verify -PskipDocker,skip-all,owasp-dependency-check -Dcheckstyle.skip=true -DskipTests \
-pl '!distribution/server,!distribution/io,!distribution/offloaders,!tiered-storage/file-system,!pulsar-io/flume,!pulsar-io/hbase,!pulsar-io/hdfs3,!pulsar-io/docs,!pulsar-io/jdbc/openmldb'
- name: Upload report
uses: actions/upload-artifact@v4
if: ${{ cancelled() || failure() }}
if: ${{ steps.restore-owasp-dependency-check-data.outputs.cache-matched-key && (cancelled() || failure()) }}
continue-on-error: true
with:
name: dependency report
Expand Down
2 changes: 1 addition & 1 deletion .mvn/extensions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<extension>
<groupId>com.gradle</groupId>
<artifactId>develocity-maven-extension</artifactId>
<version>1.22.2</version>
<version>1.23.1</version>
</extension>
<extension>
<groupId>com.gradle</groupId>
Expand Down
9 changes: 8 additions & 1 deletion buildtools/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@
<packaging>jar</packaging>
<name>Pulsar Build Tools</name>

<developers>
<developer>
<organization>Apache Pulsar developers</organization>
<organizationUrl>http://pulsar.apache.org/</organizationUrl>
</developer>
</developers>

<properties>
<project.build.outputTimestamp>2024-10-14T13:32:50Z</project.build.outputTimestamp>
<maven.compiler.source>1.8</maven.compiler.source>
Expand All @@ -47,7 +54,7 @@
<license-maven-plugin.version>4.1</license-maven-plugin.version>
<puppycrawl.checkstyle.version>10.14.2</puppycrawl.checkstyle.version>
<maven-checkstyle-plugin.version>3.1.2</maven-checkstyle-plugin.version>
<netty.version>4.1.117.Final</netty.version>
<netty.version>4.1.118.Final</netty.version>
<guice.version>4.2.3</guice.version>
<guava.version>32.1.2-jre</guava.version>
<ant.version>1.10.12</ant.version>
Expand Down
10 changes: 9 additions & 1 deletion conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ messageExpiryCheckIntervalInMinutes=5
# How long to delay rewinding cursor and dispatching messages when active consumer is changed
activeConsumerFailoverDelayTimeMillis=1000

# Enable consistent hashing for selecting the active consumer in partitioned topics with Failover subscription type.
# Enable consistent hashing for selecting the active consumer in partitioned topics with Failover subscription type.
# For non-partitioned topics, consistent hashing is used by default.
activeConsumerFailoverConsistentHashing=false

Expand Down Expand Up @@ -423,6 +423,11 @@ subscribeThrottlingRatePerConsumer=0
# Rate period for {subscribeThrottlingRatePerConsumer}. Default is 30s.
subscribeRatePeriodPerConsumerInSecond=30

# The class name of the factory that creates DispatchRateLimiter implementations. Current options are
# org.apache.pulsar.broker.service.persistent.DispatchRateLimiterFactoryAsyncTokenBucket (default, PIP-322 implementation)
# and org.apache.pulsar.broker.service.persistent.DispatchRateLimiterFactoryClassic (legacy implementation)
dispatchRateLimiterFactoryClassName=org.apache.pulsar.broker.service.persistent.DispatchRateLimiterFactoryAsyncTokenBucket

# Default messages per second dispatch throttling-limit for whole broker. Using a value of 0, is disabling default
# message dispatch-throttling
dispatchThrottlingRateInMsg=0
Expand Down Expand Up @@ -1750,6 +1755,9 @@ managedLedgerOffloadDriver=
# Maximum number of thread pool threads for ledger offloading
managedLedgerOffloadMaxThreads=2

# Maximum number of read thread pool threads for ledger offloading
managedLedgerOffloadReadThreads=2

# The extraction directory of the nar package.
# Available for Protocol Handler, Additional Servlets, Entry Filter, Offloaders, Broker Interceptor.
# Default is System.getProperty("java.io.tmpdir").
Expand Down
11 changes: 11 additions & 0 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,21 @@ bindAddress=0.0.0.0
# If not set, the value of `InetAddress.getLocalHost().getCanonicalHostName()` is used.
advertisedAddress=

# Specifies the interval (in seconds) for sending ping messages to the client. Set to 0 to disable
# ping messages. This setting applies to client connections used for topic lookups and
# partition metadata requests. When a client establishes a broker connection via the proxy,
# the client and broker will communicate directly without the proxy intercepting the messages.
# In that case, the broker's keepAliveIntervalSeconds configuration becomes relevant.
keepAliveIntervalSeconds=30

# Enable or disable the HAProxy protocol.
# If true, the real IP addresses of consumers and producers can be obtained when getting topic statistics data.
haProxyProtocolEnabled=false

# Default http header map to add into http-proxy for the any security requirements.
# eg: {"header1":"value"}
proxyHttpResponseHeadersJson=

# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests.
webServiceHaProxyProtocolEnabled=false

Expand Down
7 changes: 6 additions & 1 deletion conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ maxMessageSizeCheckIntervalInSeconds=60
# How long to delay rewinding cursor and dispatching messages when active consumer is changed
activeConsumerFailoverDelayTimeMillis=1000

# Enable consistent hashing for selecting the active consumer in partitioned topics with Failover subscription type.
# Enable consistent hashing for selecting the active consumer in partitioned topics with Failover subscription type.
# For non-partitioned topics, consistent hashing is used by default.
activeConsumerFailoverConsistentHashing=false

Expand Down Expand Up @@ -276,6 +276,11 @@ brokerPublisherThrottlingMaxMessageRate=0
# (Disable byte rate limit with value 0)
brokerPublisherThrottlingMaxByteRate=0

# The class name of the factory that creates DispatchRateLimiter implementations. Current options are
# org.apache.pulsar.broker.service.persistent.DispatchRateLimiterFactoryAsyncTokenBucket (default, PIP-322 implementation)
# and org.apache.pulsar.broker.service.persistent.DispatchRateLimiterFactoryClassic (legacy implementation)
dispatchRateLimiterFactoryClassName=org.apache.pulsar.broker.service.persistent.DispatchRateLimiterFactoryAsyncTokenBucket

# Default messages per second dispatch throttling-limit for every topic. Using a value of 0, is disabling default
# message dispatch-throttling
dispatchThrottlingRatePerTopicInMsg=0
Expand Down
54 changes: 27 additions & 27 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -293,33 +293,33 @@ The Apache Software License, Version 2.0
- org.apache.commons-commons-lang3-3.11.jar
- org.apache.commons-commons-text-1.10.0.jar
* Netty
- io.netty-netty-buffer-4.1.117.Final.jar
- io.netty-netty-codec-4.1.117.Final.jar
- io.netty-netty-codec-dns-4.1.117.Final.jar
- io.netty-netty-codec-http-4.1.117.Final.jar
- io.netty-netty-codec-http2-4.1.117.Final.jar
- io.netty-netty-codec-socks-4.1.117.Final.jar
- io.netty-netty-codec-haproxy-4.1.117.Final.jar
- io.netty-netty-common-4.1.117.Final.jar
- io.netty-netty-handler-4.1.117.Final.jar
- io.netty-netty-handler-proxy-4.1.117.Final.jar
- io.netty-netty-resolver-4.1.117.Final.jar
- io.netty-netty-resolver-dns-4.1.117.Final.jar
- io.netty-netty-resolver-dns-classes-macos-4.1.117.Final.jar
- io.netty-netty-resolver-dns-native-macos-4.1.117.Final-osx-aarch_64.jar
- io.netty-netty-resolver-dns-native-macos-4.1.117.Final-osx-x86_64.jar
- io.netty-netty-transport-4.1.117.Final.jar
- io.netty-netty-transport-classes-epoll-4.1.117.Final.jar
- io.netty-netty-transport-native-epoll-4.1.117.Final-linux-aarch_64.jar
- io.netty-netty-transport-native-epoll-4.1.117.Final-linux-x86_64.jar
- io.netty-netty-transport-native-unix-common-4.1.117.Final.jar
- io.netty-netty-tcnative-boringssl-static-2.0.69.Final.jar
- io.netty-netty-tcnative-boringssl-static-2.0.69.Final-linux-aarch_64.jar
- io.netty-netty-tcnative-boringssl-static-2.0.69.Final-linux-x86_64.jar
- io.netty-netty-tcnative-boringssl-static-2.0.69.Final-osx-aarch_64.jar
- io.netty-netty-tcnative-boringssl-static-2.0.69.Final-osx-x86_64.jar
- io.netty-netty-tcnative-boringssl-static-2.0.69.Final-windows-x86_64.jar
- io.netty-netty-tcnative-classes-2.0.69.Final.jar
- io.netty-netty-buffer-4.1.118.Final.jar
- io.netty-netty-codec-4.1.118.Final.jar
- io.netty-netty-codec-dns-4.1.118.Final.jar
- io.netty-netty-codec-http-4.1.118.Final.jar
- io.netty-netty-codec-http2-4.1.118.Final.jar
- io.netty-netty-codec-socks-4.1.118.Final.jar
- io.netty-netty-codec-haproxy-4.1.118.Final.jar
- io.netty-netty-common-4.1.118.Final.jar
- io.netty-netty-handler-4.1.118.Final.jar
- io.netty-netty-handler-proxy-4.1.118.Final.jar
- io.netty-netty-resolver-4.1.118.Final.jar
- io.netty-netty-resolver-dns-4.1.118.Final.jar
- io.netty-netty-resolver-dns-classes-macos-4.1.118.Final.jar
- io.netty-netty-resolver-dns-native-macos-4.1.118.Final-osx-aarch_64.jar
- io.netty-netty-resolver-dns-native-macos-4.1.118.Final-osx-x86_64.jar
- io.netty-netty-transport-4.1.118.Final.jar
- io.netty-netty-transport-classes-epoll-4.1.118.Final.jar
- io.netty-netty-transport-native-epoll-4.1.118.Final-linux-aarch_64.jar
- io.netty-netty-transport-native-epoll-4.1.118.Final-linux-x86_64.jar
- io.netty-netty-transport-native-unix-common-4.1.118.Final.jar
- io.netty-netty-tcnative-boringssl-static-2.0.70.Final.jar
- io.netty-netty-tcnative-boringssl-static-2.0.70.Final-linux-aarch_64.jar
- io.netty-netty-tcnative-boringssl-static-2.0.70.Final-linux-x86_64.jar
- io.netty-netty-tcnative-boringssl-static-2.0.70.Final-osx-aarch_64.jar
- io.netty-netty-tcnative-boringssl-static-2.0.70.Final-osx-x86_64.jar
- io.netty-netty-tcnative-boringssl-static-2.0.70.Final-windows-x86_64.jar
- io.netty-netty-tcnative-classes-2.0.70.Final.jar
- io.netty.incubator-netty-incubator-transport-classes-io_uring-0.0.26.Final.jar
- io.netty.incubator-netty-incubator-transport-native-io_uring-0.0.26.Final-linux-x86_64.jar
- io.netty.incubator-netty-incubator-transport-native-io_uring-0.0.26.Final-linux-aarch_64.jar
Expand Down
52 changes: 26 additions & 26 deletions distribution/shell/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -347,35 +347,35 @@ The Apache Software License, Version 2.0
- commons-text-1.10.0.jar
- commons-compress-1.26.0.jar
* Netty
- netty-buffer-4.1.117.Final.jar
- netty-codec-4.1.117.Final.jar
- netty-codec-dns-4.1.117.Final.jar
- netty-codec-http-4.1.117.Final.jar
- netty-codec-socks-4.1.117.Final.jar
- netty-codec-haproxy-4.1.117.Final.jar
- netty-common-4.1.117.Final.jar
- netty-handler-4.1.117.Final.jar
- netty-handler-proxy-4.1.117.Final.jar
- netty-resolver-4.1.117.Final.jar
- netty-resolver-dns-4.1.117.Final.jar
- netty-transport-4.1.117.Final.jar
- netty-transport-classes-epoll-4.1.117.Final.jar
- netty-transport-native-epoll-4.1.117.Final-linux-aarch_64.jar
- netty-transport-native-epoll-4.1.117.Final-linux-x86_64.jar
- netty-transport-native-unix-common-4.1.117.Final.jar
- netty-tcnative-boringssl-static-2.0.69.Final.jar
- netty-tcnative-boringssl-static-2.0.69.Final-linux-aarch_64.jar
- netty-tcnative-boringssl-static-2.0.69.Final-linux-x86_64.jar
- netty-tcnative-boringssl-static-2.0.69.Final-osx-aarch_64.jar
- netty-tcnative-boringssl-static-2.0.69.Final-osx-x86_64.jar
- netty-tcnative-boringssl-static-2.0.69.Final-windows-x86_64.jar
- netty-tcnative-classes-2.0.69.Final.jar
- netty-buffer-4.1.118.Final.jar
- netty-codec-4.1.118.Final.jar
- netty-codec-dns-4.1.118.Final.jar
- netty-codec-http-4.1.118.Final.jar
- netty-codec-socks-4.1.118.Final.jar
- netty-codec-haproxy-4.1.118.Final.jar
- netty-common-4.1.118.Final.jar
- netty-handler-4.1.118.Final.jar
- netty-handler-proxy-4.1.118.Final.jar
- netty-resolver-4.1.118.Final.jar
- netty-resolver-dns-4.1.118.Final.jar
- netty-transport-4.1.118.Final.jar
- netty-transport-classes-epoll-4.1.118.Final.jar
- netty-transport-native-epoll-4.1.118.Final-linux-aarch_64.jar
- netty-transport-native-epoll-4.1.118.Final-linux-x86_64.jar
- netty-transport-native-unix-common-4.1.118.Final.jar
- netty-tcnative-boringssl-static-2.0.70.Final.jar
- netty-tcnative-boringssl-static-2.0.70.Final-linux-aarch_64.jar
- netty-tcnative-boringssl-static-2.0.70.Final-linux-x86_64.jar
- netty-tcnative-boringssl-static-2.0.70.Final-osx-aarch_64.jar
- netty-tcnative-boringssl-static-2.0.70.Final-osx-x86_64.jar
- netty-tcnative-boringssl-static-2.0.70.Final-windows-x86_64.jar
- netty-tcnative-classes-2.0.70.Final.jar
- netty-incubator-transport-classes-io_uring-0.0.26.Final.jar
- netty-incubator-transport-native-io_uring-0.0.26.Final-linux-aarch_64.jar
- netty-incubator-transport-native-io_uring-0.0.26.Final-linux-x86_64.jar
- netty-resolver-dns-classes-macos-4.1.117.Final.jar
- netty-resolver-dns-native-macos-4.1.117.Final-osx-aarch_64.jar
- netty-resolver-dns-native-macos-4.1.117.Final-osx-x86_64.jar
- netty-resolver-dns-classes-macos-4.1.118.Final.jar
- netty-resolver-dns-native-macos-4.1.118.Final-osx-aarch_64.jar
- netty-resolver-dns-native-macos-4.1.118.Final-osx-x86_64.jar
* Prometheus client
- simpleclient-0.16.0.jar
- simpleclient_log4j2-0.16.0.jar
Expand Down
2 changes: 1 addition & 1 deletion docker/pulsar/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# under the License.
#

ARG ALPINE_VERSION=3.20
ARG ALPINE_VERSION=3.21
ARG IMAGE_JDK_MAJOR_VERSION=21

# First create a stage with just the Pulsar tarball and scripts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,27 @@ T create(OffloadPoliciesImpl offloadPolicies,
LedgerOffloaderStats offloaderStats)
throws IOException;

/**
* Create a ledger offloader with the provided configuration, user-metadata,
* scheduler, readExecutor and offloaderStats.
*
* @param offloadPolicies offload policies
* @param userMetadata user metadata
* @param scheduler scheduler
* @param readExecutor read executor
* @param offloaderStats offloaderStats
* @return the offloader instance
* @throws IOException when fail to create an offloader
*/
default T create(OffloadPoliciesImpl offloadPolicies,
Map<String, String> userMetadata,
OrderedScheduler scheduler,
OrderedScheduler readExecutor,
LedgerOffloaderStats offloaderStats)
throws IOException {
return create(offloadPolicies, userMetadata, scheduler, offloaderStats);
}


/**
* Create a ledger offloader with the provided configuration, user-metadata, schema storage and scheduler.
Expand Down Expand Up @@ -112,6 +133,30 @@ default T create(OffloadPoliciesImpl offloadPolicies,
return create(offloadPolicies, userMetadata, scheduler, offloaderStats);
}


/**
* Create a ledger offloader with the provided configuration, user-metadata, schema storage,
* scheduler, readExecutor and offloaderStats.
*
* @param offloadPolicies offload policies
* @param userMetadata user metadata
* @param schemaStorage used for schema lookup in offloader
* @param scheduler scheduler
* @param readExecutor read executor
* @param offloaderStats offloaderStats
* @return the offloader instance
* @throws IOException when fail to create an offloader
*/
default T create(OffloadPoliciesImpl offloadPolicies,
Map<String, String> userMetadata,
SchemaStorage schemaStorage,
OrderedScheduler scheduler,
OrderedScheduler readExecutor,
LedgerOffloaderStats offloaderStats)
throws IOException {
return create(offloadPolicies, userMetadata, scheduler, readExecutor, offloaderStats);
}

@Override
default void close() throws Exception {
// no-op
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -3810,26 +3811,51 @@ public int applyMaxSizeCap(int maxEntries, long maxSizeBytes) {
if (maxSizeBytes == NO_MAX_SIZE_LIMIT) {
return maxEntries;
}
int maxEntriesBasedOnSize =
Long.valueOf(estimateEntryCountBySize(maxSizeBytes, readPosition, ledger)).intValue();
return Math.min(maxEntriesBasedOnSize, maxEntries);
}

double avgEntrySize = ledger.getStats().getEntrySizeAverage();
if (!Double.isFinite(avgEntrySize)) {
// We don't have yet any stats on the topic entries. Let's try to use the cursor avg size stats
avgEntrySize = (double) entriesReadSize / (double) entriesReadCount;
}

if (!Double.isFinite(avgEntrySize)) {
// If we still don't have any information, it means this is the first time we attempt reading
// and there are no writes. Let's start with 1 to avoid any overflow and start the avg stats
return 1;
static long estimateEntryCountBySize(long bytesSize, Position readPosition, ManagedLedgerImpl ml) {
Position posToRead = readPosition;
if (!ml.isValidPosition(readPosition)) {
posToRead = ml.getNextValidPosition(readPosition);
}
long result = 0;
long remainingBytesSize = bytesSize;

int maxEntriesBasedOnSize = (int) (maxSizeBytes / avgEntrySize);
if (maxEntriesBasedOnSize < 1) {
// We need to read at least one entry
return 1;
while (remainingBytesSize > 0) {
// Last ledger.
if (posToRead.getLedgerId() == ml.getCurrentLedger().getId()) {
if (ml.getCurrentLedgerSize() == 0 || ml.getCurrentLedgerEntries() == 0) {
// Only read 1 entry if no entries to read.
return 1;
}
long avg = Math.max(1, ml.getCurrentLedgerSize() / ml.getCurrentLedgerEntries())
+ BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
result += remainingBytesSize / avg;
break;
}
// Skip empty ledger.
LedgerInfo ledgerInfo = ml.getLedgersInfo().get(posToRead.getLedgerId());
if (ledgerInfo.getSize() == 0 || ledgerInfo.getEntries() == 0) {
posToRead = ml.getNextValidPosition(PositionFactory.create(posToRead.getLedgerId(), Long.MAX_VALUE));
continue;
}
// Calculate entries by average of ledgers.
long avg = Math.max(1, ledgerInfo.getSize() / ledgerInfo.getEntries()) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
long remainEntriesOfLedger = ledgerInfo.getEntries() - posToRead.getEntryId();
if (remainEntriesOfLedger * avg >= remainingBytesSize) {
result += remainingBytesSize / avg;
break;
} else {
// Calculate for the next ledger.
result += remainEntriesOfLedger;
remainingBytesSize -= remainEntriesOfLedger * avg;
posToRead = ml.getNextValidPosition(PositionFactory.create(posToRead.getLedgerId(), Long.MAX_VALUE));
}
}

return Math.min(maxEntriesBasedOnSize, maxEntries);
return Math.max(result, 1);
}

@Override
Expand Down
Loading

0 comments on commit c9b0771

Please sign in to comment.