Skip to content

Commit

Permalink
[Broker Interceptor] Fix Pulsar didn't respond error messages when th…
Browse files Browse the repository at this point in the history
…row InterceptException (apache#11650)
  • Loading branch information
Technoboy- authored Aug 16, 2021
1 parent e505018 commit 4793ff7
Show file tree
Hide file tree
Showing 13 changed files with 173 additions and 32 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.web;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Response;
import org.apache.pulsar.common.intercept.InterceptException;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;

/**
* Exception handler for handle exception.
*/
public class ExceptionHandler {

public void handle(ServletResponse response, Exception ex) throws IOException {
if (ex instanceof InterceptException) {
String reason = ex.getMessage();
byte[] content = reason.getBytes(StandardCharsets.UTF_8);
MetaData.Response info = new MetaData.Response();
info.setHttpVersion(HttpVersion.HTTP_1_1);
info.setReason(reason);
info.setStatus(((InterceptException) ex).getErrorCode());
info.setContentLength(content.length);
if (response instanceof org.eclipse.jetty.server.Response) {
((org.eclipse.jetty.server.Response) response).getHttpChannel().sendResponse(info,
ByteBuffer.wrap(content), true);
} else {
((HttpServletResponse) response).sendError(((InterceptException) ex).getErrorCode(),
ex.getMessage());
}
} else {
((HttpServletResponse) response).sendError(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
ex.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.MediaType;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -38,8 +37,11 @@ public class PreInterceptFilter implements Filter {

private final BrokerInterceptor interceptor;

public PreInterceptFilter(BrokerInterceptor interceptor) {
private final ExceptionHandler exceptionHandler;

public PreInterceptFilter(BrokerInterceptor interceptor, ExceptionHandler exceptionHandler) {
this.interceptor = interceptor;
this.exceptionHandler = exceptionHandler;
}

@Override
Expand Down Expand Up @@ -67,7 +69,7 @@ public void doFilter(ServletRequest servletRequest, ServletResponse servletRespo
interceptor.onWebserviceRequest(requestWrapper);
filterChain.doFilter(requestWrapper, servletResponse);
} catch (InterceptException e) {
((HttpServletResponse) servletResponse).sendError(e.getErrorCode(), e.getMessage());
exceptionHandler.handle(servletResponse, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,9 @@ public void addServlet(String path, ServletHolder servletHolder, boolean require

if (!pulsar.getConfig().getBrokerInterceptors().isEmpty()
|| !pulsar.getConfig().isDisableBrokerInterceptors()) {
ExceptionHandler handler = new ExceptionHandler();
// Enable PreInterceptFilter only when interceptors are enabled
context.addFilter(new FilterHolder(new PreInterceptFilter(pulsar.getBrokerInterceptor())),
context.addFilter(new FilterHolder(new PreInterceptFilter(pulsar.getBrokerInterceptor(), handler)),
MATCH_ALL, EnumSet.allOf(DispatcherType.class));
context.addFilter(new FilterHolder(new ProcessHandlerFilter(pulsar)),
MATCH_ALL, EnumSet.allOf(DispatcherType.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public void testPostSchemaCompatibilityStrategy(ApiVersion version) throws Pulsa
admin.schemas().createSchema(topicName, foo1SchemaInfo);
fail("Should have failed");
} catch (PulsarAdminException.ConflictException e) {
assertTrue(e.getMessage().contains("HTTP 409 Conflict"));
assertTrue(e.getMessage().contains("HTTP 409"));
}

namespace = "schematest/testnotfound";
Expand All @@ -203,7 +203,7 @@ public void testPostSchemaCompatibilityStrategy(ApiVersion version) throws Pulsa
admin.schemas().createSchema(topicName, fooSchemaInfo);
fail("Should have failed");
} catch (PulsarAdminException.NotFoundException e) {
assertTrue(e.getMessage().contains("HTTP 404 Not Found"));
assertTrue(e.getMessage().contains("HTTP 404"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.web.ExceptionHandler;
import org.apache.pulsar.broker.web.PreInterceptFilter;
import org.apache.pulsar.broker.web.ProcessHandlerFilter;
import org.apache.pulsar.broker.web.ResponseHandlerFilter;
Expand Down Expand Up @@ -61,7 +62,8 @@ public class InterceptFilterOutTest {
@Test
public void testFilterOutForPreInterceptFilter() throws Exception {
CounterBrokerInterceptor interceptor = new CounterBrokerInterceptor();
PreInterceptFilter filter = new PreInterceptFilter(interceptor);
ExceptionHandler handler = new ExceptionHandler();
PreInterceptFilter filter = new PreInterceptFilter(interceptor, handler);

HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.web;

import lombok.SneakyThrows;
import org.apache.pulsar.common.intercept.InterceptException;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.Response;
import org.mockito.Mockito;
import org.testng.annotations.Test;

import javax.servlet.http.HttpServletResponse;

import static org.eclipse.jetty.http.HttpStatus.INTERNAL_SERVER_ERROR_500;
import static org.eclipse.jetty.http.HttpStatus.PRECONDITION_FAILED_412;

/**
* Unit test for ExceptionHandler.
*/
@Test(groups = "broker")
public class ExceptionHandlerTest {

@Test
@SneakyThrows
public void testHandle() {
String restriction = "Reach the max tenants [5] restriction";
String internal = "internal exception";
String illegal = "illegal argument exception ";
ExceptionHandler handler = new ExceptionHandler();
HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
handler.handle(response, new InterceptException(PRECONDITION_FAILED_412, restriction));
Mockito.verify(response).sendError(PRECONDITION_FAILED_412, restriction);

handler.handle(response, new InterceptException(INTERNAL_SERVER_ERROR_500, internal));
Mockito.verify(response).sendError(INTERNAL_SERVER_ERROR_500, internal);

handler.handle(response, new IllegalArgumentException(illegal));
Mockito.verify(response).sendError(INTERNAL_SERVER_ERROR_500, illegal);

Response response2 = Mockito.mock(Response.class);
HttpChannel httpChannel = Mockito.mock(HttpChannel.class);
Mockito.when(response2.getHttpChannel()).thenReturn(httpChannel);
handler.handle(response2, new InterceptException(PRECONDITION_FAILED_412, restriction));
Mockito.verify(httpChannel).sendResponse(Mockito.any(), Mockito.any(), Mockito.anyBoolean());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ public PulsarAdminException getApiException(Throwable e) {
}

public PulsarAdminException getApiException(Response response) {
if (response.getStatusInfo().equals(Response.Status.OK)) {
if (response.getStatus() == Response.Status.OK.getStatusCode()) {
return null;
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public CompletableFuture<List<String>> getFunctionsAsync(String tenant, String n
new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
if (!response.getStatusInfo().equals(Response.Status.OK)) {
if (response.getStatus() != Response.Status.OK.getStatusCode()) {
future.completeExceptionally(getApiException(response));
} else {
List<String> functions = response.readEntity(new GenericType<List<String>>() {});
Expand Down Expand Up @@ -142,7 +142,7 @@ public CompletableFuture<FunctionConfig> getFunctionAsync(String tenant, String
new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
if (!response.getStatusInfo().equals(Response.Status.OK)) {
if (response.getStatus() != Response.Status.OK.getStatusCode()) {
future.completeExceptionally(getApiException(response));
} else {
future.complete(response.readEntity(FunctionConfig.class));
Expand Down Expand Up @@ -181,7 +181,7 @@ public CompletableFuture<FunctionStatus> getFunctionStatusAsync(String tenant, S
new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
if (!response.getStatusInfo().equals(Response.Status.OK)) {
if (response.getStatus() != Response.Status.OK.getStatusCode()) {
future.completeExceptionally(getApiException(response));
} else {
future.complete(response.readEntity(FunctionStatus.class));
Expand Down Expand Up @@ -224,7 +224,7 @@ public CompletableFuture<FunctionStatus.FunctionInstanceStatus.FunctionInstanceS
new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
if (!response.getStatusInfo().equals(Response.Status.OK)) {
if (response.getStatus() != Response.Status.OK.getStatusCode()) {
future.completeExceptionally(getApiException(response));
} else {
future.complete(response.readEntity(
Expand Down Expand Up @@ -267,7 +267,7 @@ public CompletableFuture<FunctionInstanceStatsData> getFunctionStatsAsync(
new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
if (!response.getStatusInfo().equals(Response.Status.OK)) {
if (response.getStatus() != Response.Status.OK.getStatusCode()) {
future.completeExceptionally(getApiException(response));
} else {
future.complete(response.readEntity(FunctionInstanceStatsDataImpl.class));
Expand Down Expand Up @@ -307,7 +307,7 @@ public CompletableFuture<FunctionStats> getFunctionStatsAsync(String tenant,
new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
if (!response.getStatusInfo().equals(Response.Status.OK)) {
if (response.getStatus() != Response.Status.OK.getStatusCode()) {
future.completeExceptionally(getApiException(response));
} else {
future.complete(response.readEntity(FunctionStatsImpl.class));
Expand Down Expand Up @@ -920,7 +920,7 @@ public void onThrowable(Throwable t) {
public List<ConnectorDefinition> getConnectorsList() throws PulsarAdminException {
try {
Response response = request(functions.path("connectors")).get();
if (!response.getStatusInfo().equals(Response.Status.OK)) {
if (response.getStatus() != Response.Status.OK.getStatusCode()) {
throw getApiException(response);
}
return response.readEntity(new GenericType<List<ConnectorDefinition>>() {
Expand Down Expand Up @@ -977,7 +977,7 @@ public CompletableFuture<FunctionState> getFunctionStateAsync(
new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
if (!response.getStatusInfo().equals(Response.Status.OK)) {
if (response.getStatus() != Response.Status.OK.getStatusCode()) {
future.completeExceptionally(getApiException(response));
} else {
future.complete(response.readEntity(FunctionState.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public CompletableFuture<Void> downloadAsync(String packageName, String path) {
asyncGetRequest(webTarget, new InvocationCallback<Response>(){
@Override
public void completed(Response response) {
if (response.getStatusInfo().equals(Response.Status.OK)) {
if (response.getStatus() == Response.Status.OK.getStatusCode()) {
try (InputStream inputStream = response.readEntity(InputStream.class)) {
Path destinyPath = Paths.get(path);
if (destinyPath.getParent() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public CompletableFuture<List<String>> listSinksAsync(String tenant, String name
new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
if (!response.getStatusInfo().equals(Response.Status.OK)) {
if (response.getStatus() != Response.Status.OK.getStatusCode()) {
future.completeExceptionally(getApiException(response));
} else {
future.complete(response.readEntity(new GenericType<List<String>>() {}));
Expand Down Expand Up @@ -129,7 +129,7 @@ public CompletableFuture<SinkConfig> getSinkAsync(String tenant, String namespac
new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
if (!response.getStatusInfo().equals(Response.Status.OK)) {
if (response.getStatus() != Response.Status.OK.getStatusCode()) {
future.completeExceptionally(getApiException(response));
} else {
future.complete(response.readEntity(SinkConfig.class));
Expand Down Expand Up @@ -170,7 +170,7 @@ public CompletableFuture<SinkStatus> getSinkStatusAsync(String tenant, String na
new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
if (!response.getStatusInfo().equals(Response.Status.OK)) {
if (response.getStatus() != Response.Status.OK.getStatusCode()) {
future.completeExceptionally(getApiException(response));
} else {
future.complete(response.readEntity(SinkStatus.class));
Expand Down Expand Up @@ -213,7 +213,7 @@ public CompletableFuture<SinkStatus.SinkInstanceStatus.SinkInstanceStatusData> g
new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
if (!response.getStatusInfo().equals(Response.Status.OK)) {
if (response.getStatus() != Response.Status.OK.getStatusCode()) {
future.completeExceptionally(getApiException(response));
} else {
future.complete(response.readEntity(
Expand Down Expand Up @@ -637,7 +637,7 @@ public CompletableFuture<List<ConnectorDefinition>> getBuiltInSinksAsync() {
new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
if (!response.getStatusInfo().equals(Response.Status.OK)) {
if (response.getStatus() != Response.Status.OK.getStatusCode()) {
future.completeExceptionally(getApiException(response));
} else {
future.complete(response.readEntity(
Expand Down
Loading

0 comments on commit 4793ff7

Please sign in to comment.