Skip to content

Commit

Permalink
[Broker] Support disabling non-TLS service ports (apache#11681)
Browse files Browse the repository at this point in the history
* Support disabling non-tls service ports

* Add docs for disabling non-TLS ports

* Update site2/docs/security-tls-keystore.md

Co-authored-by: Anonymitaet <[email protected]>
  • Loading branch information
lhotari and Anonymitaet authored Aug 19, 2021
1 parent 6f747f8 commit 50b6e79
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1548,12 +1548,15 @@ private void startWorkerService(AuthenticationService authenticationService,
AuthorizationService authorizationService)
throws Exception {
if (functionWorkerService.isPresent()) {
if (workerConfig.isUseTls()) {
if (workerConfig.isUseTls() || brokerServiceUrl == null) {
workerConfig.setPulsarServiceUrl(brokerServiceUrlTls);
} else {
workerConfig.setPulsarServiceUrl(brokerServiceUrl);
}
if (workerConfig.isUseTls() || webServiceAddress == null) {
workerConfig.setPulsarWebServiceUrl(webServiceAddressTls);
workerConfig.setFunctionWebServiceUrl(webServiceAddressTls);
} else {
workerConfig.setPulsarServiceUrl(brokerServiceUrl);
workerConfig.setPulsarWebServiceUrl(webServiceAddress);
workerConfig.setFunctionWebServiceUrl(webServiceAddress);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ public void initialize(PulsarService pulsar) {

@Override
public void start() throws PulsarServerException {
lookupServiceAddress = pulsar.getAdvertisedAddress() + ":"
+ pulsar.getConfiguration().getWebServicePort().get();
lookupServiceAddress = getBrokerAddress();
localResourceUnit = new SimpleResourceUnit(String.format("http://%s", lookupServiceAddress),
new PulsarResourceDescription());

Expand All @@ -71,6 +70,13 @@ public void start() throws PulsarServerException {
}
}

private String getBrokerAddress() {
return String.format("%s:%s", pulsar.getAdvertisedAddress(),
pulsar.getConfiguration().getWebServicePort().isPresent()
? pulsar.getConfiguration().getWebServicePort().get()
: pulsar.getConfiguration().getWebServicePortTls().get());
}

@Override
public boolean isCentralized() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1122,7 +1122,7 @@ private String getBrokerAddress() {
return String.format("%s:%s", pulsar.getAdvertisedAddress(),
pulsar.getConfiguration().getWebServicePort().isPresent()
? pulsar.getConfiguration().getWebServicePort().get()
: pulsar.getConfiguration().getWebServicePortTls());
: pulsar.getConfiguration().getWebServicePortTls().get());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,15 @@ protected void startBroker() throws Exception {
}
this.pulsar = startBroker(conf);

brokerUrl = new URL(pulsar.getWebServiceAddress());
brokerUrlTls = new URL(pulsar.getWebServiceAddressTls());
brokerUrl = pulsar.getWebServiceAddress() != null ? new URL(pulsar.getWebServiceAddress()) : null;
brokerUrlTls = pulsar.getWebServiceAddressTls() != null ? new URL(pulsar.getWebServiceAddressTls()) : null;

if (admin != null) {
admin.close();
}
PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString());
PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
? brokerUrl.toString()
: brokerUrlTls.toString());
customizeNewPulsarAdminBuilder(pulsarAdminBuilder);
admin = spy(pulsarAdminBuilder.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,38 @@ public void testTlsEnabled() throws Exception {
}
}

@Test
public void testTlsEnabledWithoutNonTlsServicePorts() throws Exception {
final String topicName = "persistent://prop/ns-abc/newTopic";
final String subName = "newSub";

conf.setAuthenticationEnabled(false);
conf.setBrokerServicePort(Optional.empty());
conf.setBrokerServicePortTls(Optional.of(0));
conf.setWebServicePort(Optional.empty());
conf.setWebServicePortTls(Optional.of(0));
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf.setNumExecutorThreadPoolSize(5);
restartBroker();

// Access with TLS (Allow insecure TLS connection)
try {
pulsarClient = PulsarClient.builder().serviceUrl(brokerUrlTls.toString()).enableTls(true)
.allowTlsInsecureConnection(true).statsInterval(0, TimeUnit.SECONDS)
.operationTimeout(1000, TimeUnit.MILLISECONDS).build();

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscribe();

} catch (Exception e) {
fail("should not fail");
} finally {
pulsarClient.close();
}
}

@SuppressWarnings("deprecation")
@Test
public void testTlsAuthAllowInsecure() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ public void testOptionalSettingPresent() throws Exception {
assertEquals(config.getLoadBalancerOverrideBrokerNicSpeedGbps(), Optional.of(5.0));
}

@Test
public void testServicePortsEmpty() throws Exception {
String confFile = "brokerServicePort=\nwebServicePort=\n";
InputStream stream = new ByteArrayInputStream(confFile.getBytes());
final ServiceConfiguration config = PulsarConfigurationLoader.create(stream, ServiceConfiguration.class);
assertEquals(config.getBrokerServicePort(), Optional.empty());
assertEquals(config.getWebServicePort(), Optional.empty());
}

/**
* test {@link ServiceConfiguration} with incorrect values.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,13 @@ public PulsarAdmin newPulsarAdmin(String pulsarServiceUrl, WorkerConfig workerCo
workerConfig.isTlsAllowInsecureConnection(),
workerConfig.isTlsEnableHostnameVerification());
} else {
return WorkerUtils.getPulsarAdminClient(pulsarServiceUrl);
return WorkerUtils.getPulsarAdminClient(
pulsarServiceUrl,
null,
null,
null,
workerConfig.isTlsAllowInsecureConnection(),
workerConfig.isTlsEnableHostnameVerification());
}
}

Expand All @@ -158,7 +164,14 @@ public PulsarClient newPulsarClient(String pulsarServiceUrl, WorkerConfig worker
workerConfig.isTlsAllowInsecureConnection(),
workerConfig.isTlsEnableHostnameVerification());
} else {
return WorkerUtils.getPulsarClient(pulsarServiceUrl);
return WorkerUtils.getPulsarClient(
pulsarServiceUrl,
null,
null,
null,
null,
workerConfig.isTlsAllowInsecureConnection(),
workerConfig.isTlsEnableHostnameVerification());
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@

import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.TimeZone;

import java.util.stream.Collectors;
import javax.servlet.Servlet;
import javax.servlet.ServletException;
import javax.websocket.DeploymentException;
Expand Down Expand Up @@ -120,7 +122,9 @@ public void addRestResources(String basePath, String javaPackages, String attrib
}

public void start() throws PulsarServerException {
log.info("Starting web socket proxy at port {}", conf.getWebServicePort().get());
log.info("Starting web socket proxy at port {}", Arrays.stream(server.getConnectors())
.map(ServerConnector.class::cast).map(ServerConnector::getPort).map(Object::toString)
.collect(Collectors.joining(",")));
RequestLogHandler requestLogHandler = new RequestLogHandler();
Slf4jRequestLog requestLog = new Slf4jRequestLog();
requestLog.setExtended(true);
Expand Down
13 changes: 13 additions & 0 deletions site2/docs/security-tls-keystore.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,19 @@ brokerClientTlsTrustStorePassword=clientpw

NOTE: it is important to restrict access to the store files via filesystem permissions.

If you have configured TLS on the broker, to disable non-TLS ports, you can set the values of the following configurations to empty as below.
```
brokerServicePort=
webServicePort=
```
In this case, you need to set the following configurations.

```conf
brokerClientTlsEnabled=true // Set this to true
brokerClientTlsEnabledWithKeyStore=true // Set this to true
brokerClientTlsTrustStore= // Set this to your desired value
brokerClientTlsTrustStorePassword= // Set this to your desired value
Optional settings that may worth consider:
1. tlsClientAuthentication=false: Enable/Disable using TLS for authentication. This config when enabled will authenticate the other end
Expand Down

0 comments on commit 50b6e79

Please sign in to comment.