Skip to content

Commit

Permalink
[close #699] Batch pick from release-3.1.1.11 (#708)
Browse files Browse the repository at this point in the history
* [close #626] remove nix-shell (#627)

Signed-off-by: shiyuhang <[email protected]>

* fix wrong backoffer for pd client

Signed-off-by: iosmanthus <[email protected]>
Signed-off-by: shiyuhang <[email protected]>

* add TsoBatchUsedUp region error handler

Signed-off-by: iosmanthus <[email protected]>
Signed-off-by: shiyuhang <[email protected]>

* fix license header

Signed-off-by: iosmanthus <[email protected]>
Signed-off-by: shiyuhang <[email protected]>

* upgrade grpc-netty to 1.48.0

Signed-off-by: iosmanthus <[email protected]>
Signed-off-by: shiyuhang <[email protected]>

* Update pom.xml

Upgrade commons-codec as well

Signed-off-by: shiyuhang <[email protected]>

* upgrade protobuf

Signed-off-by: iosmanthus <[email protected]>
Signed-off-by: shiyuhang <[email protected]>

* upgrade protobuf to 3.19.6

Signed-off-by: iosmanthus <[email protected]>
Signed-off-by: shiyuhang <[email protected]>

Signed-off-by: shiyuhang <[email protected]>
Signed-off-by: iosmanthus <[email protected]>
Co-authored-by: iosmanthus <[email protected]>
Co-authored-by: iosmanthus <[email protected]>
Co-authored-by: Xiaoguang Sun <[email protected]>
  • Loading branch information
4 people authored Dec 30, 2022
1 parent 30930e2 commit 6b21aa6
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 74 deletions.
11 changes: 8 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
<protobuf.version>3.5.1</protobuf.version>
<log4j.version>1.2.17</log4j.version>
<slf4j.version>1.7.16</slf4j.version>
<grpc.version>1.38.0</grpc.version>
<grpc.version>1.48.0</grpc.version>
<netty.tcnative.version>2.0.34.Final</netty.tcnative.version>
<gson.version>2.8.9</gson.version>
<powermock.version>1.6.6</powermock.version>
Expand All @@ -75,12 +75,12 @@
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.16.1</version>
<version>3.19.6</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>3.16.1</version>
<version>3.19.6</version>
</dependency>
<dependency>
<groupId>io.perfmark</groupId>
Expand Down Expand Up @@ -232,6 +232,11 @@
<version>3.9</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.15</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
Expand Down
14 changes: 0 additions & 14 deletions shell.nix

This file was deleted.

22 changes: 14 additions & 8 deletions src/main/java/io/grpc/internal/ClientCallImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static io.grpc.Status.DEADLINE_EXCEEDED;
import static io.grpc.internal.GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY;
import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY;
import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
import static java.lang.Math.max;
Expand All @@ -33,6 +34,7 @@
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.ClientStreamTracer;
import io.grpc.Codec;
import io.grpc.Compressor;
import io.grpc.CompressorRegistry;
Expand Down Expand Up @@ -166,6 +168,7 @@ static void prepareHeaders(
DecompressorRegistry decompressorRegistry,
Compressor compressor,
boolean fullStreamDecompression) {
headers.discardAll(CONTENT_LENGTH_KEY);
headers.discardAll(MESSAGE_ENCODING_KEY);
if (compressor != Codec.Identity.NONE) {
headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding());
Expand Down Expand Up @@ -260,10 +263,13 @@ public void runInContext() {
effectiveDeadline, context.getDeadline(), callOptions.getDeadline());
stream = clientStreamProvider.newStream(method, callOptions, headers, context);
} else {
ClientStreamTracer[] tracers =
GrpcUtil.getClientStreamTracers(callOptions, headers, 0, false);
stream =
new FailingClientStream(
DEADLINE_EXCEEDED.withDescription(
"ClientCall started after deadline exceeded: " + effectiveDeadline));
"ClientCall started after deadline exceeded: " + effectiveDeadline),
tracers);
}

if (callExecutorIsDirect) {
Expand Down Expand Up @@ -363,12 +369,14 @@ private static void logIfContextNarrowedTimeout(
StringBuilder builder =
new StringBuilder(
String.format(
"Call timeout set to '%d' ns, due to context deadline.", effectiveTimeout));
Locale.US,
"Call timeout set to '%d' ns, due to context deadline.",
effectiveTimeout));
if (callDeadline == null) {
builder.append(" Explicit call timeout was not set.");
} else {
long callTimeout = callDeadline.timeRemaining(TimeUnit.NANOSECONDS);
builder.append(String.format(" Explicit call timeout was '%d' ns.", callTimeout));
builder.append(String.format(Locale.US, " Explicit call timeout was '%d' ns.", callTimeout));
}

log.fine(builder.toString());
Expand Down Expand Up @@ -562,6 +570,9 @@ public void setMessageCompression(boolean enabled) {

@Override
public boolean isReady() {
if (halfCloseCalled) {
return false;
}
return stream.isReady();
}

Expand Down Expand Up @@ -711,11 +722,6 @@ private void runInternal() {
}
}

@Override
public void closed(Status status, Metadata trailers) {
closed(status, RpcProgress.PROCESSED, trailers);
}

@Override
public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
PerfMark.startTask("ClientStreamListener.closed", tag);
Expand Down
91 changes: 51 additions & 40 deletions src/main/java/io/netty/buffer/PoolArena.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ enum SizeClass {

final int numSmallSubpagePools;
final int directMemoryCacheAlignment;
final int directMemoryCacheAlignmentMask;
private final PoolSubpage<T>[] smallSubpagePools;

private final PoolChunkList<T> q050;
Expand Down Expand Up @@ -97,7 +96,6 @@ protected PoolArena(
super(pageSize, pageShifts, chunkSize, cacheAlignment);
this.parent = parent;
directMemoryCacheAlignment = cacheAlignment;
directMemoryCacheAlignmentMask = cacheAlignment - 1;

numSmallSubpagePools = nSubpages;
smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
Expand Down Expand Up @@ -183,17 +181,23 @@ private void tcacheAllocateSmall(
return;
}

/**
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and {@link
* PoolChunk#free(long)} may modify the doubly linked list as well.
/*
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
* {@link PoolChunk#free(long)} may modify the doubly linked list as well.
*/
final PoolSubpage<T> head = smallSubpagePools[sizeIdx];
final boolean needsNormalAllocation;
synchronized (head) {
final PoolSubpage<T> s = head.next;
needsNormalAllocation = s == head;
if (!needsNormalAllocation) {
assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx);
assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx)
: "doNotDestroy="
+ s.doNotDestroy
+ ", elemSize="
+ s.elemSize
+ ", sizeIdx="
+ sizeIdx;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache);
Expand Down Expand Up @@ -221,7 +225,7 @@ private void tcacheAllocateNormal(
}
}

// Method must be called inside synchronized(this) { ... } block
// Method must be called inside synchronized(this) { ... } block
private void allocateNormal(
PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache threadCache) {
if (q050.allocate(buf, reqCapacity, sizeIdx, threadCache)
Expand Down Expand Up @@ -272,7 +276,7 @@ void free(
}
}

private SizeClass sizeClass(long handle) {
private static SizeClass sizeClass(long handle) {
return isSubpage(handle) ? SizeClass.Small : SizeClass.Normal;
}

Expand Down Expand Up @@ -499,6 +503,25 @@ public long numActiveBytes() {
return max(0, val);
}

/**
* Return the number of bytes that are currently pinned to buffer instances, by the arena. The
* pinned memory is not accessible for use by any other allocation, until the buffers using have
* all been released.
*/
public long numPinnedBytes() {
long val =
activeBytesHuge
.value(); // Huge chunks are exact-sized for the buffers they were allocated to.
synchronized (this) {
for (int i = 0; i < chunkListMetrics.size(); i++) {
for (PoolChunkMetric m : chunkListMetrics.get(i)) {
val += ((PoolChunk<?>) m).pinnedBytes();
}
}
}
return max(0, val);
}

protected abstract PoolChunk<T> newChunk(
int pageSize, int maxPageIdx, int pageShifts, int chunkSize);

Expand Down Expand Up @@ -588,13 +611,8 @@ private void destroyPoolChunkLists(PoolChunkList<T>... chunkLists) {

static final class HeapArena extends PoolArena<byte[]> {

HeapArena(
PooledByteBufAllocator parent,
int pageSize,
int pageShifts,
int chunkSize,
int directMemoryCacheAlignment) {
super(parent, pageSize, pageShifts, chunkSize, directMemoryCacheAlignment);
HeapArena(PooledByteBufAllocator parent, int pageSize, int pageShifts, int chunkSize) {
super(parent, pageSize, pageShifts, chunkSize, 0);
}

private static byte[] newByteArray(int size) {
Expand All @@ -610,12 +628,12 @@ boolean isDirect() {
protected PoolChunk<byte[]> newChunk(
int pageSize, int maxPageIdx, int pageShifts, int chunkSize) {
return new PoolChunk<byte[]>(
this, newByteArray(chunkSize), pageSize, pageShifts, chunkSize, maxPageIdx, 0);
this, null, newByteArray(chunkSize), pageSize, pageShifts, chunkSize, maxPageIdx);
}

@Override
protected PoolChunk<byte[]> newUnpooledChunk(int capacity) {
return new PoolChunk<byte[]>(this, newByteArray(capacity), capacity, 0);
return new PoolChunk<byte[]>(this, null, newByteArray(capacity), capacity);
}

@Override
Expand Down Expand Up @@ -656,40 +674,33 @@ boolean isDirect() {
return true;
}

// mark as package-private, only for unit test
int offsetCacheLine(ByteBuffer memory) {
// We can only calculate the offset if Unsafe is present as otherwise directBufferAddress(...)
// will
// throw an NPE.
int remainder =
HAS_UNSAFE
? (int)
(PlatformDependent.directBufferAddress(memory) & directMemoryCacheAlignmentMask)
: 0;

// offset = alignment - address & (alignment - 1)
return directMemoryCacheAlignment - remainder;
}

@Override
protected PoolChunk<ByteBuffer> newChunk(
int pageSize, int maxPageIdx, int pageShifts, int chunkSize) {
if (directMemoryCacheAlignment == 0) {
ByteBuffer memory = allocateDirect(chunkSize);
return new PoolChunk<ByteBuffer>(
this, allocateDirect(chunkSize), pageSize, pageShifts, chunkSize, maxPageIdx, 0);
this, memory, memory, pageSize, pageShifts, chunkSize, maxPageIdx);
}
final ByteBuffer memory = allocateDirect(chunkSize + directMemoryCacheAlignment);

final ByteBuffer base = allocateDirect(chunkSize + directMemoryCacheAlignment);
final ByteBuffer memory =
PlatformDependent.alignDirectBuffer(base, directMemoryCacheAlignment);
return new PoolChunk<ByteBuffer>(
this, memory, pageSize, pageShifts, chunkSize, maxPageIdx, offsetCacheLine(memory));
this, base, memory, pageSize, pageShifts, chunkSize, maxPageIdx);
}

@Override
protected PoolChunk<ByteBuffer> newUnpooledChunk(int capacity) {
if (directMemoryCacheAlignment == 0) {
return new PoolChunk<ByteBuffer>(this, allocateDirect(capacity), capacity, 0);
ByteBuffer memory = allocateDirect(capacity);
return new PoolChunk<ByteBuffer>(this, memory, memory, capacity);
}
final ByteBuffer memory = allocateDirect(capacity + directMemoryCacheAlignment);
return new PoolChunk<ByteBuffer>(this, memory, capacity, offsetCacheLine(memory));

final ByteBuffer base = allocateDirect(capacity + directMemoryCacheAlignment);
final ByteBuffer memory =
PlatformDependent.alignDirectBuffer(base, directMemoryCacheAlignment);
return new PoolChunk<ByteBuffer>(this, base, memory, capacity);
}

private static ByteBuffer allocateDirect(int capacity) {
Expand All @@ -701,9 +712,9 @@ private static ByteBuffer allocateDirect(int capacity) {
@Override
protected void destroyChunk(PoolChunk<ByteBuffer> chunk) {
if (PlatformDependent.useDirectBufferNoCleaner()) {
PlatformDependent.freeDirectNoCleaner(chunk.memory);
PlatformDependent.freeDirectNoCleaner((ByteBuffer) chunk.base);
} else {
PlatformDependent.freeDirectBuffer(chunk.memory);
PlatformDependent.freeDirectBuffer((ByteBuffer) chunk.base);
}
}

Expand Down
8 changes: 6 additions & 2 deletions src/main/java/org/tikv/common/AbstractGRPCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,17 @@ private boolean doCheckHealth(BackOffer backOffer, String addressStr, HostMappin
HealthCheckResponse resp = stub.check(req);
return resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING;
} catch (Exception e) {
logger.warn("check health failed.", e);
logger.warn("check health failed, addr: {}, caused by: {}", addressStr, e.getMessage());
backOffer.doBackOff(BackOffFuncType.BoCheckHealth, e);
}
}
}

protected boolean checkHealth(BackOffer backOffer, String addressStr, HostMapping hostMapping) {
return doCheckHealth(backOffer, addressStr, hostMapping);
try {
return doCheckHealth(backOffer, addressStr, hostMapping);
} catch (Exception e) {
return false;
}
}
}
Loading

0 comments on commit 6b21aa6

Please sign in to comment.