Skip to content

Commit

Permalink
add support for sse redirect and http proxy settings (#1718)
Browse files Browse the repository at this point in the history
* add support for sse redirect, add v2.1/track for aad auth path and v2/track for non aad auth path

* use lazyAzureHttpClient in telemetry channel

* add logging policy to httppipeline and handle 401,403 in response

* add 307 support
  • Loading branch information
kryalama authored Jun 16, 2021
1 parent 9219bbf commit 94ac07b
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,14 @@ private static void start(Instrumentation instrumentation) {
validateProcessorConfiguration(config);
config.preview.authentication.validate();
//Inject authentication configuration
Configuration.AadAuthentication authentication = config.preview.authentication;
AadAuthentication.init(authentication.type, authentication.clientId, authentication.keePassDatabasePath,
authentication.tenantId, authentication.clientSecret, authentication.authorityHost);

if(config.preview.authentication.enabled) {
Configuration.AadAuthentication authentication = config.preview.authentication;
AadAuthentication.init(authentication.type, authentication.clientId, authentication.keePassDatabasePath,
authentication.tenantId, authentication.clientSecret, authentication.authorityHost);
} else {
// TODO revisit this, not ideal to initialize when authentication is disabled
AadAuthentication.init(null, null, null, null, null, null);
}
// FIXME do something with config

// FIXME set doNotWeavePrefixes = "com.microsoft.applicationinsights.agent."
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package com.microsoft.applicationinsights;

import com.azure.core.http.*;
import com.azure.core.http.policy.HttpLogOptions;
import com.azure.core.http.policy.HttpLoggingPolicy;
import com.azure.core.http.policy.HttpPipelinePolicy;
import com.azure.core.http.policy.RetryPolicy;
import com.azure.core.util.tracing.Tracer;
import com.azure.monitor.opentelemetry.exporter.implementation.models.TelemetryItem;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.microsoft.applicationinsights.internal.authentication.AadAuthentication;
import com.microsoft.applicationinsights.internal.authentication.AzureMonitorRedirectPolicy;
import com.microsoft.applicationinsights.internal.channel.common.LazyAzureHttpClient;
import io.opentelemetry.sdk.common.CompletableResultCode;
import org.apache.http.HttpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -39,18 +44,23 @@ class TelemetryChannel {

TelemetryChannel(URL endpoint) {
List<HttpPipelinePolicy> policies = new ArrayList<>();
HttpClient client = HttpClient.createDefault();
HttpPipelineBuilder pipeline = new HttpPipelineBuilder()
HttpClient client = LazyAzureHttpClient.getInstance();
HttpPipelineBuilder pipelineBuilder = new HttpPipelineBuilder()
.httpClient(client);
// Add Azure monitor redirect policy to be able to handle v2.1/track redirects
policies.add(new AzureMonitorRedirectPolicy());
// Retry policy for failed requests
policies.add(new RetryPolicy());
// TODO handle authentication exceptions
HttpPipelinePolicy authenticationPolicy = AadAuthentication.getInstance().getAuthenticationPolicy();
if (authenticationPolicy != null) {
policies.add(authenticationPolicy);
}
pipeline.policies(policies.toArray(new HttpPipelinePolicy[0]));
this.pipeline = pipeline.build();
// Add Logging Policy. Can be enabled using AZURE_LOG_LEVEL.
// TODO set the logging level based on self diagnostic log level set by user
policies.add(new HttpLoggingPolicy(new HttpLogOptions()));
pipelineBuilder.policies(policies.toArray(new HttpPipelinePolicy[0]));
this.pipeline = pipelineBuilder.build();
this.endpoint = endpoint;
}

Expand Down Expand Up @@ -115,13 +125,13 @@ private CompletableResultCode internalSend(List<ByteBuffer> byteBuffers) {
.contextWrite(Context.of(Tracer.DISABLE_TRACING_KEY, true))
.subscribe(response -> {
// TODO parse response, looking for throttling, partial successes, etc
// System.out.println("on response: " + response);
if(response.getStatusCode() == HttpStatus.SC_UNAUTHORIZED || response.getStatusCode() == HttpStatus.SC_FORBIDDEN) {
logger.warn("Failed to send telemetry with status code:{} ,please check your credentials", response.getStatusCode());
}
}, error -> {
// System.out.println("on error...");
byteBufferPool.offer(byteBuffers);
result.fail();
}, () -> {
// System.out.println("on complete...");
byteBufferPool.offer(byteBuffers);
result.succeed();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import com.azure.core.http.HttpPipeline;
import com.azure.core.http.HttpPipelineBuilder;
import com.azure.core.http.policy.BearerTokenAuthenticationPolicy;
import com.azure.core.http.policy.HttpLogOptions;
import com.azure.core.http.policy.HttpLoggingPolicy;
import com.azure.core.http.policy.HttpPipelinePolicy;
import com.azure.core.http.policy.RetryPolicy;
import com.azure.identity.ClientSecretCredentialBuilder;
Expand Down Expand Up @@ -121,6 +123,9 @@ public HttpPipeline newHttpPipeLineWithAuthentication() {
if(authenticationPolicy != null) {
policies.add(authenticationPolicy);
}
// Add Logging Policy. Can be enabled using AZURE_LOG_LEVEL.
// TODO set the logging level based on self diagnostic log level set by user
policies.add(new HttpLoggingPolicy(new HttpLogOptions()));
pipelineBuilder.policies(policies.toArray(new HttpPipelinePolicy[0]));
return pipelineBuilder.build();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* ApplicationInsights-Java
* Copyright (c) Microsoft Corporation
* All rights reserved.
*
* MIT License
* Permission is hereby granted, free of charge, to any person obtaining a copy of this
* software and associated documentation files (the ""Software""), to deal in the Software
* without restriction, including without limitation the rights to use, copy, modify, merge,
* publish, distribute, sublicense, and/or sell copies of the Software, and to permit
* persons to whom the Software is furnished to do so, subject to the following conditions:
* The above copyright notice and this permission notice shall be included in all copies or
* substantial portions of the Software.
* THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
* INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
* PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE
* FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
* DEALINGS IN THE SOFTWARE.
*/

package com.microsoft.applicationinsights.internal.authentication;

import com.azure.core.http.HttpPipelineCallContext;
import com.azure.core.http.HttpPipelineNextPolicy;
import com.azure.core.http.HttpRequest;
import com.azure.core.http.HttpResponse;
import com.azure.core.http.policy.HttpPipelinePolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

import java.net.HttpURLConnection;

// This is a copy from Azure Monitor Open Telemetry Exporter SDK AzureMonitorRedirectPolicy
public final class AzureMonitorRedirectPolicy implements HttpPipelinePolicy {

private static final int PERMANENT_REDIRECT_STATUS_CODE = 308;
private static final int TEMP_REDIRECT_STATUS_CODE = 307;
// Based on Stamp specific redirects design doc
private static final int MAX_REDIRECT_RETRIES = 10;
private static final Logger logger = LoggerFactory.getLogger(AzureMonitorRedirectPolicy.class);
private volatile String redirectedEndpointUrl;

@Override
public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineNextPolicy next) {
return attemptRetry(context, next, context.getHttpRequest(), 0);
}

/**
* Function to process through the HTTP Response received in the pipeline
* and retry sending the request with new redirect url.
*/
private Mono<HttpResponse> attemptRetry(final HttpPipelineCallContext context,
final HttpPipelineNextPolicy next,
final HttpRequest originalHttpRequest,
final int retryCount) {
// make sure the context is not modified during retry, except for the URL
context.setHttpRequest(originalHttpRequest.copy());
if (this.redirectedEndpointUrl != null) {
context.getHttpRequest().setUrl(this.redirectedEndpointUrl);
}
return next.clone().process()
.flatMap(httpResponse -> {
if (shouldRetryWithRedirect(httpResponse.getStatusCode(), retryCount)) {
String responseLocation = httpResponse.getHeaderValue("Location");
if (responseLocation != null) {
this.redirectedEndpointUrl = responseLocation;
return attemptRetry(context, next, originalHttpRequest, retryCount + 1);
}
}
return Mono.just(httpResponse);
});
}

/**
* Determines if it's a valid retry scenario based on statusCode and tryCount.
*
* @param statusCode HTTP response status code
* @param tryCount Redirect retries so far
* @return True if statusCode corresponds to HTTP redirect response codes and redirect
* retries is less than {@code MAX_REDIRECT_RETRIES}.
*/
private boolean shouldRetryWithRedirect(int statusCode, int tryCount) {
if (tryCount >= MAX_REDIRECT_RETRIES) {
logger.warn("Max redirect retries limit reached:{}.", MAX_REDIRECT_RETRIES);
return false;
}
return statusCode == HttpURLConnection.HTTP_MOVED_TEMP
|| statusCode == HttpURLConnection.HTTP_MOVED_PERM
|| statusCode == PERMANENT_REDIRECT_STATUS_CODE
|| statusCode == TEMP_REDIRECT_STATUS_CODE;
}

}

0 comments on commit 94ac07b

Please sign in to comment.