Skip to content

Commit

Permalink
show proc tablet_mapping, expand show compute nodes, fix bug with rep…
Browse files Browse the repository at this point in the history
…laying adding cn
  • Loading branch information
ctbrennan committed Nov 14, 2024
1 parent d3d6631 commit 06982ba
Show file tree
Hide file tree
Showing 7 changed files with 428 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

Expand All @@ -59,6 +60,7 @@ public class ComputeNodeProcDir implements ProcDirInterface {
.add("WorkerId")
.add("WarehouseName")
.add("TabletNum")
.add("UsedForTabletScanCount")
.add("ResourceIsolationGroup");
TITLE_NAMES_SHARED_DATA = builder.build();
}
Expand Down Expand Up @@ -105,6 +107,14 @@ public static List<List<String>> getClusterComputeNodesInfos() {
return computeNodesInfos;
}

boolean usingInternalMapper = clusterInfoService.shouldUseInternalTabletToCnMapper();
Map<Long, Long> computeNodeToOwnedTabletCount = null;
if (usingInternalMapper) {
// Note that this will only return information about tablets which this FE has needed to scan.
computeNodeToOwnedTabletCount = clusterInfoService.internalTabletMapper().computeNodeToOwnedTabletCount();
}


long start = System.currentTimeMillis();
Stopwatch watch = Stopwatch.createUnstarted();
List<List<Comparable>> comparableComputeNodeInfos = new LinkedList<>();
Expand Down Expand Up @@ -176,9 +186,21 @@ public static List<List<String>> getClusterComputeNodesInfos() {
Warehouse wh = GlobalStateMgr.getCurrentState().getWarehouseMgr().getWarehouse(computeNode.getWarehouseId());
computeNodeInfo.add(wh.getName());

String workerAddr = computeNode.getHost() + ":" + computeNode.getStarletPort();
long tabletNum = GlobalStateMgr.getCurrentState().getStarOSAgent().getWorkerTabletNum(workerAddr);
computeNodeInfo.add(tabletNum);
// If we're using resource isolation groups, we bypass the call to StarOS/StarMgr
if (usingInternalMapper) {
Long tabletNum = computeNodeToOwnedTabletCount.getOrDefault(computeNodeId, 0L);
computeNodeInfo.add(tabletNum);

Long tabletsScannedCount = clusterInfoService.internalTabletMapper().getComputeNodeReturnCount(computeNodeId);
computeNodeInfo.add(tabletsScannedCount);
} else {
String workerAddr = computeNode.getHost() + ":" + computeNode.getStarletPort();
long tabletNum = GlobalStateMgr.getCurrentState().getStarOSAgent().getWorkerTabletNum(workerAddr);
computeNodeInfo.add(tabletNum);
// We haven't tracked the "used for tablet scan count" in this case
computeNodeInfo.add("unknown");
}

computeNodeInfo.add(computeNode.getResourceIsolationGroup());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ private ProcService() {
root.register("compactions", new CompactionsProcNode());
root.register("meta_recovery", new MetaRecoveryProdDir());
root.register("replications", new ReplicationsProcNode());
root.register("tablet_mapping", new TabletMappingProcNode());
}

// Get the corresponding PROC Node by the specified path
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.common.proc;

import com.google.common.collect.ImmutableList;
import com.starrocks.common.AnalysisException;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.system.SystemInfoService;
import com.starrocks.system.TabletComputeNodeMapper;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

/*
This class is responsible for giving information about Tablets. It is limited in that it will only report information about
tablets which this FE has needed to scan to fulfill a query which it received since it came up.
*/
public class TabletMappingProcNode implements ProcDirInterface {

private static final Logger LOG = LogManager.getLogger(TabletMappingProcNode.class);

public static final ImmutableList<String> TITLE_NAMES;

static {
ImmutableList.Builder<String> builder =
new ImmutableList.Builder<String>().add("TabletId").add("RequestCount").add("PrimaryCnOwner")
.add("SecondaryCnOwner");
TITLE_NAMES = builder.build();
}

@Override
public ProcResult fetchResult() throws AnalysisException {
final SystemInfoService clusterInfoService = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo();
if (!clusterInfoService.shouldUseInternalTabletToCnMapper()) {
LOG.warn("Requesting tablet mapping information but it's not internally maintained.");
BaseProcResult result = new BaseProcResult();
result.setNames(List.of("N/A"));
result.addRow(List.of("Tablet mapping information not internally maintained"));
return result;
}

BaseProcResult result = new BaseProcResult();
result.setNames(TITLE_NAMES);
final TabletComputeNodeMapper tabletComputeNodeMapper = clusterInfoService.internalTabletMapper();

for (Map.Entry<Long, AtomicLong> tabletToMappingCount : tabletComputeNodeMapper.getTabletMappingCount().entrySet()) {
List<String> tabletInfo = new ArrayList<>(TITLE_NAMES.size());

tabletInfo.add(tabletToMappingCount.getKey().toString());
tabletInfo.add(tabletToMappingCount.getValue().toString());

List<Long> computeNodeIds = tabletComputeNodeMapper.computeNodesForTablet(tabletToMappingCount.getKey(), 2);
for (Long computeNodeId : computeNodeIds) {
tabletInfo.add(computeNodeId.toString());
}
while (tabletInfo.size() < TITLE_NAMES.size()) {
// If there's only 1 CN in this resource isolation group, there can be no SecondaryCnOwner,
// fill in the row to reflect that.
tabletInfo.add("N/A");
}
result.addRow(tabletInfo);
}
return result;
}

@Override
public boolean register(String name, ProcNodeInterface node) {
return true;
}

@Override
public ProcNodeInterface lookup(String name) throws AnalysisException {
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1106,7 +1106,9 @@ public void replayAddComputeNode(ComputeNode newComputeNode) {
// update idToComputeNode
newComputeNode.setBackendState(BackendState.using);
idToComputeNodeRef.put(newComputeNode.getId(), newComputeNode);
tabletComputeNodeMapper.addComputeNode(newComputeNode.getId(), DEFAULT_RESOURCE_ISOLATION_GROUP_ID);
// TODO(cbrennan) Should this be using the given newComputeNode resource isolation group?
// Technically the modify should get replayed right?
tabletComputeNodeMapper.addComputeNode(newComputeNode.getId(), newComputeNode.getResourceIsolationGroup());
}

public void replayAddBackend(Backend newBackend) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
Expand All @@ -63,7 +66,6 @@
* If using multiple replicas, consider the earlier-index CN for a given tablet to be more preferred
* If some CN is removed, and it was the primary CN for some tablet, the first backup CN will become the primary
* Thread-safe after initialization.
*
*/

public class TabletComputeNodeMapper {
Expand All @@ -76,9 +78,9 @@ public class TabletComputeNodeMapper {

private class TabletMap {
private final HashRing<Long, Long> tabletToComputeNodeId;

TabletMap() {
tabletToComputeNodeId = new ConsistentHashRing<>(
Hashing.murmur3_128(), new LongIdFunnel(), new LongIdFunnel(),
tabletToComputeNodeId = new ConsistentHashRing<>(Hashing.murmur3_128(), new LongIdFunnel(), new LongIdFunnel(),
Collections.emptyList(), CONSISTENT_HASH_RING_VIRTUAL_NUMBER);
}

Expand All @@ -93,13 +95,19 @@ private boolean tracksSomeComputeNode() {
private final Lock readLock = stateLock.readLock();
private final Lock writeLock = stateLock.writeLock();

// Tracking usage since the instantiation of this Mapper.
private final ConcurrentMap<Long, AtomicLong> tabletMappingCount = new ConcurrentHashMap<>();
private final ConcurrentMap<Long, AtomicLong> computeNodeReturnCount = new ConcurrentHashMap<>();

public TabletComputeNodeMapper() {
resourceIsolationGroupToTabletMapping = new HashMap<>();
}

@TestOnly
public void clear() {
resourceIsolationGroupToTabletMapping.clear();
tabletMappingCount.clear();
computeNodeReturnCount.clear();
}

public int numResourceIsolationGroups() {
Expand All @@ -120,12 +128,12 @@ private String getResourceIsolationGroupName(String resourceIsolationGroup) {

public String debugString() {
return resourceIsolationGroupToTabletMapping.entrySet().stream()
.map(entry -> String.format("%-15s : %s", entry.getKey(), entry.getValue()))
.collect(Collectors.joining("\n"));
.map(entry -> String.format("%-15s : %s", entry.getKey(), entry.getValue())).collect(Collectors.joining("\n"));
}

public void addComputeNode(Long computeNodeId, String resourceIsolationGroup) {
resourceIsolationGroup = getResourceIsolationGroupName(resourceIsolationGroup);
LOG.info("Adding the cn {} to resource isolation group {}", computeNodeId, resourceIsolationGroup);
writeLock.lock();
try {
addComputeNodeUnsynchronized(computeNodeId, resourceIsolationGroup);
Expand All @@ -149,6 +157,7 @@ private void addComputeNodeUnsynchronized(long computeNodeId, String resourceIso
// This will succeed even if the resource isolation group is not being tracked.
public void removeComputeNode(Long computeNodeId, String resourceIsolationGroup) {
resourceIsolationGroup = getResourceIsolationGroupName(resourceIsolationGroup);
LOG.info("Removing the cn {} from resource isolation group {}", computeNodeId, resourceIsolationGroup);
writeLock.lock();
try {
removeComputeNodeUnsynchronized(computeNodeId, resourceIsolationGroup);
Expand All @@ -168,10 +177,11 @@ private void removeComputeNodeUnsynchronized(Long computeNodeId, String resource
}
}

public void modifyComputeNode(Long computeNodeId,
String oldResourceIsolationGroup, String newResourceIsolationGroup) {
public void modifyComputeNode(Long computeNodeId, String oldResourceIsolationGroup, String newResourceIsolationGroup) {
oldResourceIsolationGroup = getResourceIsolationGroupName(oldResourceIsolationGroup);
newResourceIsolationGroup = getResourceIsolationGroupName(newResourceIsolationGroup);
LOG.info("Modifying the resource isolation group for cn {} from {} to {}", computeNodeId, oldResourceIsolationGroup,
newResourceIsolationGroup);
// We run the following even if oldResourceIsolationGroup.equals(newResourceIsolationGroup)
// because we want to cleanly handle edge cases where the compute node hasn't already been
// added to the TabletComputeNode mapper. This can happen in at least one situation, which
Expand All @@ -192,26 +202,61 @@ public List<Long> computeNodesForTablet(Long tabletId) {
}

public List<Long> computeNodesForTablet(Long tabletId, int count) {
String thisResourceIsolationGroup = GlobalStateMgr.getCurrentState().getNodeMgr().getMySelf().
getResourceIsolationGroup();
String thisResourceIsolationGroup = GlobalStateMgr.getCurrentState().getNodeMgr().getMySelf().getResourceIsolationGroup();
return computeNodesForTablet(tabletId, count, thisResourceIsolationGroup);
}

public List<Long> computeNodesForTablet(Long tabletId, int count, String resourceIsolationGroup) {
readLock.lock();
try {
if (!this.resourceIsolationGroupToTabletMapping.containsKey(resourceIsolationGroup)) {
LOG.warn(String.format("Requesting node for resource isolation group %s, to which"
+ " there is not a known CN assigned.", resourceIsolationGroup));
LOG.warn(String.format(
"Requesting node for resource isolation group %s, to which" + " there is not a known CN assigned.",
resourceIsolationGroup));
return Collections.emptyList();
}
TabletMap m = this.resourceIsolationGroupToTabletMapping.get(resourceIsolationGroup);
return m.tabletToComputeNodeId.get(tabletId, count);
List<Long> computeNodes = m.tabletToComputeNodeId.get(tabletId, count);
// Update tracking information
tabletMappingCount.computeIfAbsent(tabletId, k -> new AtomicLong()).incrementAndGet();
computeNodes.forEach(
nodeId -> computeNodeReturnCount.computeIfAbsent(nodeId, k -> new AtomicLong()).incrementAndGet());
return computeNodes;
} finally {
readLock.unlock();
}
}

// Number of times that this mapper has returned the given compute node as the return value for `computeNodesForTablet`.
public Long getComputeNodeReturnCount(Long computeNodeId) {
return computeNodeReturnCount.getOrDefault(computeNodeId, new AtomicLong(0)).get();
}

// Returns the tablet mapped to the number of times which some caller has requested the appropriate CN.
// Only reports the number of times since this particular FE came up.
public ConcurrentMap<Long, AtomicLong> getTabletMappingCount() {
return tabletMappingCount;
}

// Key is compute node id. Value is number of distinct tablets for which this mapper instance has returned the given CN.
// Note, this is a kind of best-effort count -- it is possible that the given CN "owns" many more tablets, but we simply
// haven't scanned them due to a query that has been issued to this FE since this mapper has been instantiated.
// Note: this function is kind of expensive (~1 ms for largish databases), don't call it often.
public Map<Long, Long> computeNodeToOwnedTabletCount() throws IllegalStateException {
long startTime = System.currentTimeMillis();
HashMap<Long, Long> computeNodeToOwnedTabletCount = new HashMap<>();
for (Long tabletId : tabletMappingCount.keySet()) {
List<Long> primaryCnForTablet = computeNodesForTablet(tabletId, 1);
if (primaryCnForTablet.isEmpty()) {
throw new IllegalStateException("No owner for tablet, seemingly no cn for resource isolation group");
}
// Instantiate to 1 or increment.
computeNodeToOwnedTabletCount.merge(primaryCnForTablet.get(0), 1L, Long::sum);
}
LOG.info("millis passed calculating computeNodeToOwnedTabletCount: {}", System.currentTimeMillis() - startTime);
return computeNodeToOwnedTabletCount;
}

class LongIdFunnel implements Funnel<Long> {
@Override
public void funnel(Long id, PrimitiveSink primitiveSink) {
Expand Down
Loading

0 comments on commit 06982ba

Please sign in to comment.