Skip to content

Commit

Permalink
MINOR: fix the path metadata shell uses for client quotas (apache#11437)
Browse files Browse the repository at this point in the history
Client quotas should appear under /client-quotas rather than /configs, since client quotas are
not configs. Additionally we should correctly handle the case where the entity name is null
(aka "default" quotas.)

Reviewers: Jason Gustafson <[email protected]>
  • Loading branch information
cmccabe authored Oct 26, 2021
1 parent a4333be commit d36832e
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 17 deletions.
36 changes: 26 additions & 10 deletions shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
Expand Down Expand Up @@ -50,6 +51,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

Expand Down Expand Up @@ -302,21 +307,32 @@ private void handleCommitImpl(MetadataRecordType type, ApiMessage message)
}
case CLIENT_QUOTA_RECORD: {
ClientQuotaRecord record = (ClientQuotaRecord) message;
DirectoryNode configsDirectory =
data.root.mkdirs("configs");
for (ClientQuotaRecord.EntityData entityData : record.entity()) {
String entityType = entityData.entityType();
String entityName = entityData.entityName();
DirectoryNode entityDirectory = configsDirectory.mkdirs(entityType).mkdirs(entityName);
if (record.remove())
entityDirectory.rmrf(record.key());
else
entityDirectory.create(record.key()).setContents(record.value() + "");
List<String> directories = clientQuotaRecordDirectories(record.entity());
DirectoryNode node = data.root;
for (String directory : directories) {
node = node.mkdirs(directory);
}
if (record.remove())
node.rmrf(record.key());
else
node.create(record.key()).setContents(record.value() + "");
break;
}
default:
throw new RuntimeException("Unhandled metadata record type");
}
}

static List<String> clientQuotaRecordDirectories(List<EntityData> entityData) {
List<String> result = new ArrayList<>();
result.add("client-quotas");
TreeMap<String, EntityData> entries = new TreeMap<>();
entityData.forEach(e -> entries.put(e.entityType(), e));
for (Map.Entry<String, EntityData> entry : entries.entrySet()) {
result.add(entry.getKey());
result.add(entry.getValue().entityName() == null ?
"<default>" : entry.getValue().entityName());
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.kafka.shell;


import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
Expand All @@ -41,6 +40,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;


public class MetadataNodeManagerTest {

private MetadataNodeManager metadataNodeManager;
Expand Down Expand Up @@ -264,16 +264,30 @@ public void testClientQuotaRecord() {
metadataNodeManager.handleMessage(record);

assertEquals("1000.0",
metadataNodeManager.getData().root().directory("configs", "user", "kraft").file("producer_byte_rate").contents());
assertEquals("1000.0",
metadataNodeManager.getData().root().directory("configs", "client", "kstream").file("producer_byte_rate").contents());
metadataNodeManager.getData().root().directory("client-quotas",
"client", "kstream",
"user", "kraft").file("producer_byte_rate").contents());

metadataNodeManager.handleMessage(record.setRemove(true));

assertFalse(
metadataNodeManager.getData().root().directory("configs", "user", "kraft").children().containsKey("producer_byte_rate"));
assertFalse(
metadataNodeManager.getData().root().directory("configs", "client", "kstream").children().containsKey("producer_byte_rate"));
metadataNodeManager.getData().root().directory("client-quotas",
"client", "kstream",
"user", "kraft").children().containsKey("producer_byte_rate"));

record = new ClientQuotaRecord()
.setEntity(Arrays.asList(
new ClientQuotaRecord.EntityData()
.setEntityType("user")
.setEntityName(null)
))
.setKey("producer_byte_rate")
.setValue(2000.0);

metadataNodeManager.handleMessage(record);

assertEquals("2000.0",
metadataNodeManager.getData().root().directory("client-quotas",
"user", "<default>").file("producer_byte_rate").contents());
}
}

0 comments on commit d36832e

Please sign in to comment.