Skip to content

Commit

Permalink
Merge branch 'master' into fix_pattern_error_in_PulsarLedgerManager
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari authored Feb 18, 2025
2 parents 79fe158 + 4bfdcd8 commit 8959e2e
Show file tree
Hide file tree
Showing 94 changed files with 1,992 additions and 1,197 deletions.
7 changes: 7 additions & 0 deletions buildtools/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@
<packaging>jar</packaging>
<name>Pulsar Build Tools</name>

<developers>
<developer>
<organization>Apache Pulsar developers</organization>
<organizationUrl>http://pulsar.apache.org/</organizationUrl>
</developer>
</developers>

<properties>
<project.build.outputTimestamp>2024-10-14T13:32:50Z</project.build.outputTimestamp>
<maven.compiler.source>1.8</maven.compiler.source>
Expand Down
11 changes: 11 additions & 0 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,21 @@ bindAddress=0.0.0.0
# If not set, the value of `InetAddress.getLocalHost().getCanonicalHostName()` is used.
advertisedAddress=

# Specifies the interval (in seconds) for sending ping messages to the client. Set to 0 to disable
# ping messages. This setting applies to client connections used for topic lookups and
# partition metadata requests. When a client establishes a broker connection via the proxy,
# the client and broker will communicate directly without the proxy intercepting the messages.
# In that case, the broker's keepAliveIntervalSeconds configuration becomes relevant.
keepAliveIntervalSeconds=30

# Enable or disable the HAProxy protocol.
# If true, the real IP addresses of consumers and producers can be obtained when getting topic statistics data.
haProxyProtocolEnabled=false

# Default http header map to add into http-proxy for the any security requirements.
# eg: {"header1":"value"}
proxyHttpResponseHeadersJson=

# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests.
webServiceHaProxyProtocolEnabled=false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -802,41 +802,33 @@ public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback
buffer.retain();

// Jump to specific thread to avoid contention from writers writing from different threads
final var addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx,
currentLedgerTimeoutTriggered);
var added = false;
try {
// Use synchronized to ensure if `addOperation` is added to queue and fails later, it will be the first
// element in `pendingAddEntries`.
synchronized (this) {
if (managedLedgerInterceptor != null) {
managedLedgerInterceptor.beforeAddEntry(addOperation, addOperation.getNumberOfMessages());
}
final var state = STATE_UPDATER.get(this);
beforeAddEntryToQueue(state);
pendingAddEntries.add(addOperation);
added = true;
afterAddEntryToQueue(state, addOperation);
}
} catch (Throwable throwable) {
if (!added) {
addOperation.failed(ManagedLedgerException.getManagedLedgerException(throwable));
} // else: all elements of `pendingAddEntries` will fail in another thread
}
executor.execute(() -> {
OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx,
currentLedgerTimeoutTriggered);
internalAsyncAddEntry(addOperation);
});
}

protected void beforeAddEntryToQueue(State state) throws ManagedLedgerException {
if (state.isFenced()) {
throw new ManagedLedgerFencedException();
protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
if (!beforeAddEntry(addOperation)) {
return;
}
switch (state) {
case Terminated -> throw new ManagedLedgerTerminatedException("Managed ledger was already terminated");
case Closed -> throw new ManagedLedgerAlreadyClosedException("Managed ledger was already closed");
case WriteFailed -> throw new ManagedLedgerAlreadyClosedException("Waiting to recover from failure");
final State state = STATE_UPDATER.get(this);
if (state.isFenced()) {
addOperation.failed(new ManagedLedgerFencedException());
return;
} else if (state == State.Terminated) {
addOperation.failed(new ManagedLedgerTerminatedException("Managed ledger was already terminated"));
return;
} else if (state == State.Closed) {
addOperation.failed(new ManagedLedgerAlreadyClosedException("Managed ledger was already closed"));
return;
} else if (state == State.WriteFailed) {
addOperation.failed(new ManagedLedgerAlreadyClosedException("Waiting to recover from failure"));
return;
}
}
pendingAddEntries.add(addOperation);

protected void afterAddEntryToQueue(State state, OpAddEntry addOperation) throws ManagedLedgerException {
if (state == State.ClosingLedger || state == State.CreatingLedger) {
// We don't have a ready ledger to write into
// We are waiting for a new ledger to be created
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,23 +223,25 @@ private void initLastConfirmedEntry() {
}

@Override
protected void beforeAddEntryToQueue(State state) throws ManagedLedgerException {
protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
if (!beforeAddEntry(addOperation)) {
return;
}
if (state != State.LedgerOpened) {
throw new ManagedLedgerException("Managed ledger is not opened");
addOperation.failed(new ManagedLedgerException("Managed ledger is not opened"));
return;
}
}

@Override
protected void afterAddEntryToQueue(State state, OpAddEntry addOperation) throws ManagedLedgerException {
if (addOperation.getCtx() == null || !(addOperation.getCtx() instanceof Position position)) {
pendingAddEntries.poll();
throw new ManagedLedgerException("Illegal addOperation context object.");
addOperation.failed(new ManagedLedgerException("Illegal addOperation context object."));
return;
}

if (log.isDebugEnabled()) {
log.debug("[{}] Add entry into shadow ledger lh={} entries={}, pos=({},{})",
name, currentLedger.getId(), currentLedgerEntries, position.getLedgerId(), position.getEntryId());
}
pendingAddEntries.add(addOperation);
if (position.getLedgerId() <= currentLedger.getId()) {
// Write into lastLedger
if (position.getLedgerId() == currentLedger.getId()) {
Expand Down
81 changes: 81 additions & 0 deletions pip/pip-395.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# PIP-395: Add Proxy configuration to support configurable response headers for http reverse-proxy


# Motivation

Pulsar Proxy has a support to use it as HTTP reverse proxy to access Broker’s admin API and can also be extended to act as a reverse proxy other HTTP endpoints. Sometimes, it is very crucial to add customizable headers into the HTTP response returned by Proxy to enhance the security experience while using Proxy over HTTP.

Response headers in a reverse HTTP proxy are critical for maintaining and enhancing the security of the web applications and services behind the proxy. These headers act as a first line of defense, hardening the web server infrastructure and protecting clients from common web vulnerabilities.

For example, when implementing a reverse HTTP proxy, security headers such as `Referrer-Policy`, `X-Content-Type-Options`, `Strict-Transport-Security`, `X-Content-Type-Options`, etc., are useful to prevent security attacks like clickjacking, MIME-sniffing, data leakage, and more. So, such headers play a crucial role in enhancing the security posture of proxy infrastructure.

Therefore, we would like to add support into Pulsar Proxy where users can add custom response headers by passing them into the configuration. This PIP will add this support by adding a new configuration called `proxyHttpResponseHeadersJson` where user can pass multiple headers with key-value map into the json format. Proxy server will retrieve headers from this configuration and pass it as response headers for every http request when user wants to use Pulsar Proxy as an HTTP reverse proxy.


# Goals

## In Scope

Add a new configuration `proxyHttpResponseHeadersJson` to the Proxy configuration.
eg:
```
proxyHttpResponseHeadersJson=`{"header1":"value1","header2":"value2"}`
```

## Out of Scope

# High Level Design

# Detailed Design

## Design & Implementation Details

Add a new configuration `proxyHttpResponseHeadersJson` to the Proxy configuration.
This configuration will allow the user to set default headers which proxy will return into the response headers for every http request which proxy will receive as a reverse proxy.


### Public API
NA
### Binary protocol

### Configuration

### CLI

### Metrics

NA

# Monitoring

NA

# Security Considerations

NA

# Backward & Forward Compatibility

## Upgrade

This is a new feature, and it does not affect the existing configuration.

## Downgrade / Rollback

Rollback will not impact the existing functionality.

## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations

<!--
Describe what needs to be considered in Pulsar Geo-Replication in the upgrade and possible downgrade/rollback of this feature.
-->

# Alternatives

<!--
If there are alternatives that were already considered by the authors or, after the discussion, by the community, and were rejected, please list them here along with the reason why they were rejected.
-->

# General Notes

# Links
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public boolean authenticateHttpRequest(HttpServletRequest request, HttpServletRe
} else {
checkState(request.getHeader(SASL_HEADER_STATE).equalsIgnoreCase(SASL_STATE_SERVER_CHECK_TOKEN));
setResponseHeaderState(response, SASL_STATE_COMPLETE);
response.setHeader(SASL_STATE_SERVER, request.getHeader(SASL_STATE_SERVER));
response.setHeader(SASL_STATE_SERVER, sanitizeHeaderValue(request.getHeader(SASL_STATE_SERVER)));
response.setStatus(HttpServletResponse.SC_OK);
if (log.isDebugEnabled()) {
log.debug("[{}] Server side role token verified success: {}", request.getRequestURI(),
Expand Down Expand Up @@ -325,4 +325,12 @@ public boolean authenticateHttpRequest(HttpServletRequest request, HttpServletRe
}
}
}

private String sanitizeHeaderValue(String value) {
if (value == null) {
return null;
}
// Remove CRLF and other special characters
return value.replaceAll("[\\r\\n]", "").replaceAll("[^\\x20-\\x7E]", "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -285,15 +285,15 @@ public static CompletableFuture<Set<String>> applyNamespacePoliciesAsync(
public static String getBundleRangeFromBundleName(String bundleName) {
// the bundle format is property/cluster/namespace/0x00000000_0xFFFFFFFF
int pos = bundleName.lastIndexOf("/");
checkArgument(pos != -1);
checkArgument(pos != -1, "Invalid bundle name format: %s", bundleName);
return bundleName.substring(pos + 1);
}

// From a full bundle name, extract the namespace name.
public static String getNamespaceNameFromBundleName(String bundleName) {
// the bundle format is property/cluster/namespace/0x00000000_0xFFFFFFFF
int pos = bundleName.lastIndexOf('/');
checkArgument(pos != -1);
checkArgument(pos != -1, "Invalid bundle name format: %s", bundleName);
return bundleName.substring(0, pos);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ private void updateBundleData() {
for (String bundle : bundleData.keySet()) {
if (!activeBundles.contains(bundle)){
bundleData.remove(bundle);
if (pulsar.getLeaderElectionService().isLeader()){
if (pulsar.getLeaderElectionService() != null && pulsar.getLeaderElectionService().isLeader()){
deleteBundleDataFromMetadataStore(bundle);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,7 @@ private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String,
ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, position, ackOwnerConsumer);
if (checkCanRemovePendingAcksAndHandle(ackOwnerConsumer, position, msgId)) {
addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
updateBlockedConsumerOnUnackedMsgs(ackOwnerConsumer);
}
}

Expand Down Expand Up @@ -1081,6 +1082,11 @@ private boolean removePendingAcks(Consumer ackOwnedConsumer, Position position)
if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer {} received ack {}", topicName, subscription, consumerId, position);
}
updateBlockedConsumerOnUnackedMsgs(ackOwnedConsumer);
return true;
}

public void updateBlockedConsumerOnUnackedMsgs(Consumer ackOwnedConsumer) {
// unblock consumer-throttling when limit check is disabled or receives half of maxUnackedMessages =>
// consumer can start again consuming messages
int unAckedMsgs = UNACKED_MESSAGES_UPDATER.get(ackOwnedConsumer);
Expand All @@ -1090,7 +1096,6 @@ private boolean removePendingAcks(Consumer ackOwnedConsumer, Position position)
ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
flowConsumerBlockedPermits(ackOwnedConsumer);
}
return true;
}

public PendingAcksMap getPendingAcks() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1012,7 +1012,7 @@ protected static boolean isLeaderBroker(PulsarService pulsar) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return true;
}
return pulsar.getLeaderElectionService().isLeader();
return pulsar.getLeaderElectionService() != null && pulsar.getLeaderElectionService().isLeader();
}

public void validateTenantOperation(String tenant, TenantOperation operation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public ByteBuf toByteBuf() {
}
}

ByteBuf encryptedPayload = encrypt(getCompressedBatchMetadataAndPayload());
ByteBuf encryptedPayload = encrypt(getCompressedBatchMetadataAndPayload(false));
updateAndReserveBatchAllocatedSize(encryptedPayload.capacity());
ByteBuf metadataAndPayload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
messageMetadata, encryptedPayload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,11 @@ public int size() {

public void validateBundle(NamespaceBundle nsBundle) throws Exception {
int idx = Arrays.binarySearch(partitions, nsBundle.getLowerEndpoint());
checkArgument(idx >= 0, "Cannot find bundle in the bundles list");
checkArgument(nsBundle.getUpperEndpoint().equals(bundles.get(idx).getUpperEndpoint()),
"Invalid upper boundary for bundle");
checkArgument(idx >= 0, "Cannot find bundle %s in the bundles list", nsBundle);
NamespaceBundle foundBundle = bundles.get(idx);
Long upperEndpoint = foundBundle.getUpperEndpoint();
checkArgument(nsBundle.getUpperEndpoint().equals(upperEndpoint),
"Invalid upper boundary for bundle %s. Expected upper boundary of %s", nsBundle, foundBundle);
}

public NamespaceBundle getFullBundle() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1257,11 +1257,12 @@ public void testExamineMessageMetadata() throws Exception {

admin.topics().createPartitionedTopic(topicName, 2);
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
ProducerImpl<String> producer = (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING)
.producerName("testExamineMessageMetadataProducer")
.compressionType(CompressionType.LZ4)
.topic(topicName + "-partition-0")
.create();
producer.getConfiguration().setCompressMinMsgBodySize(1);

producer.newMessage()
.keyBytes("partition123".getBytes())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ public static String getTlsFileForClient(String name) {

private final List<AutoCloseable> closeables = new ArrayList<>();

// Set to true in test's constructor to use a real Zookeeper (TestZKServer)
protected boolean useTestZookeeper;

public MockedPulsarServiceBaseTest() {
resetConfig();
}
Expand Down Expand Up @@ -363,7 +366,14 @@ protected void afterPulsarStart(PulsarService pulsar) throws Exception {
* @throws Exception if an error occurs
*/
protected void restartBroker() throws Exception {
restartBroker(null);
}

protected void restartBroker(Consumer<ServiceConfiguration> configurationChanger) throws Exception {
stopBroker();
if (configurationChanger != null) {
configurationChanger.accept(conf);
}
startBroker();
if (pulsarClient == null) {
pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
Expand Down Expand Up @@ -461,7 +471,6 @@ protected PulsarTestContext.Builder createPulsarTestContextBuilder(ServiceConfig
PulsarTestContext.Builder builder = PulsarTestContext.builder()
.spyByDefault()
.config(conf)
.withMockZookeeper(true)
.pulsarServiceCustomizer(pulsarService -> {
try {
beforePulsarStart(pulsarService);
Expand All @@ -470,9 +479,25 @@ protected PulsarTestContext.Builder createPulsarTestContextBuilder(ServiceConfig
}
})
.brokerServiceCustomizer(this::customizeNewBrokerService);
configureMetadataStores(builder);
return builder;
}

/**
* Configures the metadata stores for the PulsarTestContext.Builder instance.
* Set useTestZookeeper to true in the test's constructor to use TestZKServer which is a real ZooKeeper
* implementation.
*
* @param builder the PulsarTestContext.Builder instance to configure
*/
protected void configureMetadataStores(PulsarTestContext.Builder builder) {
if (useTestZookeeper) {
builder.withTestZookeeper();
} else {
builder.withMockZookeeper(true);
}
}

protected PulsarTestContext createAdditionalPulsarTestContext(ServiceConfiguration conf) throws Exception {
return createAdditionalPulsarTestContext(conf, null);
}
Expand Down
Loading

0 comments on commit 8959e2e

Please sign in to comment.