-
Notifications
You must be signed in to change notification settings - Fork 857
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP - Do not Merge: Added support for AWS Glue Schema registry #488
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
package kafdrop.config; | ||
|
||
import org.apache.commons.lang3.StringUtils; | ||
import org.springframework.boot.context.properties.ConfigurationProperties; | ||
import org.springframework.context.annotation.Configuration; | ||
import org.springframework.stereotype.Component; | ||
|
||
@Configuration | ||
public class GlueSchemaRegistryConfiguration { | ||
@Component | ||
@ConfigurationProperties(prefix = "schemaregistry.glue") | ||
public static final class GlueSchemaRegistryProperties { | ||
private String registryName; | ||
private String region; | ||
|
||
private String awsEndpoint; | ||
|
||
public String getRegion() { | ||
return region; | ||
} | ||
|
||
public void setRegion(String region) { | ||
this.region = region; | ||
} | ||
|
||
public String getRegistryName() { | ||
return registryName; | ||
} | ||
|
||
public void setRegistryName(String registryName) { | ||
this.registryName = registryName; | ||
} | ||
|
||
public String getAwsEndpoint() { | ||
return awsEndpoint; | ||
} | ||
|
||
public void setAwsEndpoint(String awsEndpoint) { | ||
this.awsEndpoint = awsEndpoint; | ||
} | ||
|
||
public boolean isConfigured() { | ||
return (StringUtils.isNotEmpty(region) && StringUtils.isNotEmpty(registryName)); | ||
|
||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,6 +28,7 @@ | |
import javax.validation.constraints.Min; | ||
import javax.validation.constraints.NotNull; | ||
|
||
import kafdrop.config.GlueSchemaRegistryConfiguration; | ||
import kafdrop.util.*; | ||
import org.springframework.http.MediaType; | ||
import org.springframework.stereotype.Controller; | ||
|
@@ -48,6 +49,7 @@ | |
import kafdrop.config.MessageFormatConfiguration.MessageFormatProperties; | ||
import kafdrop.config.ProtobufDescriptorConfiguration.ProtobufDescriptorProperties; | ||
import kafdrop.config.SchemaRegistryConfiguration.SchemaRegistryProperties; | ||
import kafdrop.config.GlueSchemaRegistryConfiguration.GlueSchemaRegistryProperties; | ||
import kafdrop.model.MessageVO; | ||
import kafdrop.model.TopicPartitionVO; | ||
import kafdrop.model.TopicVO; | ||
|
@@ -65,14 +67,17 @@ public final class MessageController { | |
|
||
private final SchemaRegistryProperties schemaRegistryProperties; | ||
|
||
private final GlueSchemaRegistryProperties glueSchemaRegistryProperties; | ||
|
||
private final ProtobufDescriptorProperties protobufProperties; | ||
|
||
public MessageController(KafkaMonitor kafkaMonitor, MessageInspector messageInspector, MessageFormatProperties messageFormatProperties, SchemaRegistryProperties schemaRegistryProperties, ProtobufDescriptorProperties protobufProperties) { | ||
public MessageController(KafkaMonitor kafkaMonitor, MessageInspector messageInspector, MessageFormatProperties messageFormatProperties, SchemaRegistryProperties schemaRegistryProperties, ProtobufDescriptorProperties protobufProperties, GlueSchemaRegistryProperties glueSchemaRegistryProperties) { | ||
this.kafkaMonitor = kafkaMonitor; | ||
this.messageInspector = messageInspector; | ||
this.messageFormatProperties = messageFormatProperties; | ||
this.schemaRegistryProperties = schemaRegistryProperties; | ||
this.protobufProperties = protobufProperties; | ||
this.glueSchemaRegistryProperties = glueSchemaRegistryProperties; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move this one down, to maintain the same order as the parameter list |
||
this.protobufProperties = protobufProperties; | ||
} | ||
|
||
/** | ||
|
@@ -259,10 +264,7 @@ private MessageDeserializer getDeserializer(String topicName, MessageFormat form | |
final MessageDeserializer deserializer; | ||
|
||
if (format == MessageFormat.AVRO) { | ||
final var schemaRegistryUrl = schemaRegistryProperties.getConnect(); | ||
final var schemaRegistryAuth = schemaRegistryProperties.getAuth(); | ||
|
||
deserializer = new AvroMessageDeserializer(topicName, schemaRegistryUrl, schemaRegistryAuth); | ||
deserializer = new AvroMessageDeserializer(topicName, schemaRegistryProperties, glueSchemaRegistryProperties); | ||
} else if (format == MessageFormat.PROTOBUF && null != descFile) { | ||
// filter the input file name | ||
|
||
|
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -1,28 +1,59 @@ | ||||||||||||
package kafdrop.util; | ||||||||||||
|
||||||||||||
import com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer; | ||||||||||||
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; | ||||||||||||
import com.amazonaws.services.schemaregistry.utils.AvroRecordType; | ||||||||||||
import io.confluent.kafka.serializers.*; | ||||||||||||
import kafdrop.config.GlueSchemaRegistryConfiguration; | ||||||||||||
import kafdrop.config.SchemaRegistryConfiguration; | ||||||||||||
import org.apache.commons.lang3.StringUtils; | ||||||||||||
import org.apache.kafka.common.serialization.Deserializer; | ||||||||||||
import software.amazon.awssdk.services.glue.model.DataFormat; | ||||||||||||
|
||||||||||||
import java.nio.*; | ||||||||||||
import java.util.*; | ||||||||||||
|
||||||||||||
|
||||||||||||
public final class AvroMessageDeserializer implements MessageDeserializer { | ||||||||||||
private final String topicName; | ||||||||||||
private final KafkaAvroDeserializer deserializer; | ||||||||||||
|
||||||||||||
public AvroMessageDeserializer(String topicName, String schemaRegistryUrl, String schemaRegistryAuth) { | ||||||||||||
private final Deserializer avroDeserializer; | ||||||||||||
|
||||||||||||
public AvroMessageDeserializer(String topicName, | ||||||||||||
SchemaRegistryConfiguration.SchemaRegistryProperties schemaRegistryProperties, | ||||||||||||
GlueSchemaRegistryConfiguration.GlueSchemaRegistryProperties glueSchemaRegistryProperties) { | ||||||||||||
this.topicName = topicName; | ||||||||||||
this.deserializer = getDeserializer(schemaRegistryUrl, schemaRegistryAuth); | ||||||||||||
|
||||||||||||
if(glueSchemaRegistryProperties.isConfigured()){ | ||||||||||||
this.avroDeserializer = getAWSDeserializer(glueSchemaRegistryProperties.getRegion(), | ||||||||||||
glueSchemaRegistryProperties.getRegistryName(), | ||||||||||||
glueSchemaRegistryProperties.getAwsEndpoint()); | ||||||||||||
} | ||||||||||||
else{ | ||||||||||||
this.avroDeserializer = getDeserializer(schemaRegistryProperties.getConnect(), schemaRegistryProperties.getAuth()); | ||||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
@Override | ||||||||||||
public String deserializeMessage(ByteBuffer buffer) { | ||||||||||||
// Convert byte buffer to byte array | ||||||||||||
final var bytes = ByteUtils.convertToByteArray(buffer); | ||||||||||||
return deserializer.deserialize(topicName, bytes).toString(); | ||||||||||||
return avroDeserializer.deserialize(topicName, bytes).toString(); | ||||||||||||
} | ||||||||||||
|
||||||||||||
private static Deserializer getAWSDeserializer(String region, String registryName, String awsEndpoint) { | ||||||||||||
final var config = new HashMap<String, Object>(); | ||||||||||||
config.put(AWSSchemaRegistryConstants.AWS_REGION, region); | ||||||||||||
config.put(AWSSchemaRegistryConstants.REGISTRY_NAME, registryName); | ||||||||||||
config.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); | ||||||||||||
config.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.AVRO.name()); | ||||||||||||
if(StringUtils.isNotEmpty(awsEndpoint)) | ||||||||||||
config.put(AWSSchemaRegistryConstants.AWS_ENDPOINT, awsEndpoint); | ||||||||||||
Comment on lines
+50
to
+51
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
We'll need to establish and publish a code style, but from what I see, curly braces are used around all blocks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good suggestiosn - sorry slow response. Busy work wise. Will take a look shortly |
||||||||||||
final var awsKafkaAvroDeserializer = new AWSKafkaAvroDeserializer(config); | ||||||||||||
return awsKafkaAvroDeserializer; | ||||||||||||
} | ||||||||||||
|
||||||||||||
private static KafkaAvroDeserializer getDeserializer(String schemaRegistryUrl, String schemaRegistryAuth) { | ||||||||||||
private static Deserializer getDeserializer(String schemaRegistryUrl, String schemaRegistryAuth) { | ||||||||||||
final var config = new HashMap<String, Object>(); | ||||||||||||
config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); | ||||||||||||
if (schemaRegistryAuth != null) { | ||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Besides this, also document the other (apparently optional) properties.