Skip to content

Commit

Permalink
[allow_configuring_avro_consumer] Debugging / fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
emagdel committed Mar 21, 2022
1 parent ad8aa0f commit 86cc3d5
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 9 deletions.
22 changes: 22 additions & 0 deletions src/main/java/kafdrop/config/MessageFormatConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,26 @@ public void setFormat(MessageFormat format) {
this.format = format;
}
}

@Component
@ConfigurationProperties(prefix = "key")
public static final class KeyFormatProperties {
private MessageFormat format;

@PostConstruct
public void init() {
// Set a default message format if not configured.
if (format == null) {
format = MessageFormat.DEFAULT;
}
}

public MessageFormat getFormat() {
return format;
}

public void setFormat(MessageFormat format) {
this.format = format;
}
}
}
11 changes: 6 additions & 5 deletions src/main/java/kafdrop/controller/MessageController.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

import kafdrop.config.MessageFormatConfiguration;
import kafdrop.util.*;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
Expand Down Expand Up @@ -62,19 +63,19 @@ public final class MessageController {
private final MessageInspector messageInspector;

private final MessageFormatProperties messageFormatProperties;
private final MessageFormatProperties keyFormatProperties;
private final MessageFormatConfiguration.KeyFormatProperties keyFormatProperties;

private final SchemaRegistryProperties schemaRegistryProperties;

private final ProtobufDescriptorProperties protobufProperties;

public MessageController(KafkaMonitor kafkaMonitor, MessageInspector messageInspector, MessageFormatProperties messageFormatProperties, MessageFormatProperties keyFormatProperties, SchemaRegistryProperties schemaRegistryProperties, ProtobufDescriptorProperties protobufProperties) {
public MessageController(KafkaMonitor kafkaMonitor, MessageInspector messageInspector, MessageFormatProperties messageFormatProperties, MessageFormatConfiguration.KeyFormatProperties keyFormatProperties, SchemaRegistryProperties schemaRegistryProperties, ProtobufDescriptorProperties protobufProperties) {
this.kafkaMonitor = kafkaMonitor;
this.messageInspector = messageInspector;
this.messageFormatProperties = messageFormatProperties;
this.keyFormatProperties = keyFormatProperties;
this.schemaRegistryProperties = schemaRegistryProperties;
this.protobufProperties = protobufProperties;
this.protobufProperties = protobufProperties;
}

/**
Expand Down Expand Up @@ -312,9 +313,9 @@ public static class PartitionOffsetInfo {
private MessageFormat format;

private MessageFormat keyFormat;

private String descFile;

private String msgTypeName;

public PartitionOffsetInfo(int partition, long offset, long count, MessageFormat format) {
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/kafdrop/util/AvroMessageDeserializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,18 @@ private static KafkaAvroDeserializer getDeserializer(String schemaRegistryUrl, S
}

private static void setConfigFromEnvIfAvailable(String topicName, String configPath, Map<String,Object> config){

String configPrefix = "SCHEMA_REGISTRY";
String topicScopedEnvPath = Arrays.stream(new String[]{configPrefix, configPath.replace(".", "_"), topicName.replace("-", "_") } )
String topicScopedEnvPath = Arrays.stream(new String[]{configPrefix, configPath.replaceAll("\\.", "_"), topicName.replaceAll("-", "_") } )
.map(String::toUpperCase).collect(Collectors.joining("_"));

String noTopicScopedEnvPath = Arrays.stream(new String[]{ "SCHEMA_REGISTRY", configPath.replace(".", "_") })
String noTopicScopedEnvPath = Arrays.stream(new String[]{ configPrefix, configPath.replaceAll("\\.", "_") })
.map(String::toUpperCase).collect(Collectors.joining("_"));

for(String envPath : new String[]{topicScopedEnvPath, noTopicScopedEnvPath}) {

String namingStrategyValue = System.getenv(envPath);
if (namingStrategyValue != null) {
config.put(envPath, namingStrategyValue);
config.put(configPath, namingStrategyValue);
}
}
}
Expand Down

0 comments on commit 86cc3d5

Please sign in to comment.