Skip to content

Commit

Permalink
Unsafe direct buffers. (Azure#316)
Browse files Browse the repository at this point in the history
  • Loading branch information
StormHub authored and nayato committed Dec 20, 2017
1 parent 5ac7f4a commit d77bc29
Show file tree
Hide file tree
Showing 61 changed files with 2,142 additions and 609 deletions.
24 changes: 17 additions & 7 deletions src/DotNetty.Buffers/AbstractByteBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public virtual IByteBuffer EnsureWritable(int minWritableBytes)
return this;
}

protected void EnsureWritable0(int minWritableBytes)
protected internal void EnsureWritable0(int minWritableBytes)
{
this.EnsureAccessible();
if (minWritableBytes <= this.WritableBytes)
Expand Down Expand Up @@ -821,13 +821,15 @@ public virtual IByteBuffer ReadBytes(int length)

public virtual IByteBuffer ReadSlice(int length)
{
this.CheckReadableBytes(length);
IByteBuffer slice = this.Slice(this.readerIndex, length);
this.readerIndex += length;
return slice;
}

public virtual IByteBuffer ReadRetainedSlice(int length)
{
this.CheckReadableBytes(length);
IByteBuffer slice = this.RetainedSlice(this.readerIndex, length);
this.readerIndex += length;
return slice;
Expand Down Expand Up @@ -1145,19 +1147,19 @@ public virtual int BytesBefore(int index, int length, byte value)
return endIndex - index;
}

public virtual int ForEachByte(ByteProcessor processor)
public virtual int ForEachByte(IByteProcessor processor)
{
this.EnsureAccessible();
return this.ForEachByteAsc0(this.readerIndex, this.writerIndex, processor);
}

public virtual int ForEachByte(int index, int length, ByteProcessor processor)
public virtual int ForEachByte(int index, int length, IByteProcessor processor)
{
this.CheckIndex(index, length);
return this.ForEachByteAsc0(index, index + length, processor);
}

int ForEachByteAsc0(int start, int end, ByteProcessor processor)
int ForEachByteAsc0(int start, int end, IByteProcessor processor)
{
for (; start < end; ++start)
{
Expand All @@ -1170,19 +1172,19 @@ int ForEachByteAsc0(int start, int end, ByteProcessor processor)
return -1;
}

public virtual int ForEachByteDesc(ByteProcessor processor)
public virtual int ForEachByteDesc(IByteProcessor processor)
{
this.EnsureAccessible();
return this.ForEachByteDesc0(this.writerIndex - 1, this.readerIndex, processor);
}

public virtual int ForEachByteDesc(int index, int length, ByteProcessor processor)
public virtual int ForEachByteDesc(int index, int length, IByteProcessor processor)
{
this.CheckIndex(index, length);
return this.ForEachByteDesc0(index + length - 1, index, processor);
}

int ForEachByteDesc0(int rStart, int rEnd, ByteProcessor processor)
int ForEachByteDesc0(int rStart, int rEnd, IByteProcessor processor)
{
for (; rStart >= rEnd; --rStart)
{
Expand Down Expand Up @@ -1331,8 +1333,16 @@ protected void DiscardMarks()

public abstract int ArrayOffset { get; }

public abstract bool HasMemoryAddress { get; }

public abstract ref byte GetPinnableMemoryAddress();

public abstract IntPtr AddressOfPinnedMemory();

public abstract IByteBuffer Unwrap();

public abstract bool IsDirect { get; }

public abstract int ReferenceCount { get; }

public abstract IReferenceCounted Retain();
Expand Down
53 changes: 45 additions & 8 deletions src/DotNetty.Buffers/AbstractByteBufferAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace DotNetty.Buffers
{
using System;
using System.Runtime.CompilerServices;
using DotNetty.Common;

/// <inheritdoc />
Expand Down Expand Up @@ -75,18 +76,27 @@ protected static CompositeByteBuffer ToLeakAwareBuffer(CompositeByteBuffer buf)
return buf;
}

readonly bool directByDefault;
readonly IByteBuffer emptyBuffer;

protected AbstractByteBufferAllocator()
{
this.emptyBuffer = new EmptyByteBuffer(this);
}

public IByteBuffer Buffer() => this.HeapBuffer();
protected AbstractByteBufferAllocator(bool preferDirect)
{
this.directByDefault = preferDirect;
this.emptyBuffer = new EmptyByteBuffer(this);
}

public IByteBuffer Buffer(int initialCapacity) => this.HeapBuffer(initialCapacity);
public IByteBuffer Buffer() => this.directByDefault ? this.DirectBuffer() : this.HeapBuffer();

public IByteBuffer Buffer(int initialCapacity, int maxCapacity) => this.HeapBuffer(initialCapacity, maxCapacity);
public IByteBuffer Buffer(int initialCapacity) =>
this.directByDefault ? this.DirectBuffer(initialCapacity) : this.HeapBuffer(initialCapacity);

public IByteBuffer Buffer(int initialCapacity, int maxCapacity) =>
this.directByDefault ? this.DirectBuffer(initialCapacity, maxCapacity) : this.HeapBuffer(initialCapacity, maxCapacity);

public IByteBuffer HeapBuffer() => this.HeapBuffer(DefaultInitialCapacity, DefaultMaxCapacity);

Expand All @@ -103,29 +113,56 @@ public IByteBuffer HeapBuffer(int initialCapacity, int maxCapacity)
return this.NewHeapBuffer(initialCapacity, maxCapacity);
}

public CompositeByteBuffer CompositeBuffer() => this.CompositeHeapBuffer();
public IByteBuffer DirectBuffer() => this.DirectBuffer(DefaultInitialCapacity, DefaultMaxCapacity);

public IByteBuffer DirectBuffer(int initialCapacity) => this.DirectBuffer(initialCapacity, DefaultMaxCapacity);

public IByteBuffer DirectBuffer(int initialCapacity, int maxCapacity)
{
if (initialCapacity == 0 && maxCapacity == 0)
{
return this.emptyBuffer;
}
Validate(initialCapacity, maxCapacity);
return this.NewDirectBuffer(initialCapacity, maxCapacity);
}

public CompositeByteBuffer CompositeBuffer() =>
this.directByDefault ? this.CompositeDirectBuffer() : this.CompositeHeapBuffer();

public CompositeByteBuffer CompositeBuffer(int maxComponents) => this.CompositeHeapBuffer(maxComponents);
public CompositeByteBuffer CompositeBuffer(int maxComponents) =>
this.directByDefault ? this.CompositeDirectBuffer(maxComponents) : this.CompositeHeapBuffer(maxComponents);

public CompositeByteBuffer CompositeHeapBuffer() => this.CompositeHeapBuffer(DefaultMaxComponents);

public virtual CompositeByteBuffer CompositeHeapBuffer(int maxNumComponents) => ToLeakAwareBuffer(new CompositeByteBuffer(this, maxNumComponents));
public virtual CompositeByteBuffer CompositeHeapBuffer(int maxNumComponents) =>
ToLeakAwareBuffer(new CompositeByteBuffer(this, false, maxNumComponents));

public CompositeByteBuffer CompositeDirectBuffer() => this.CompositeDirectBuffer(DefaultMaxComponents);

public virtual CompositeByteBuffer CompositeDirectBuffer(int maxNumComponents) =>
ToLeakAwareBuffer(new CompositeByteBuffer(this, true, maxNumComponents));

[MethodImpl(MethodImplOptions.AggressiveInlining)]
static void Validate(int initialCapacity, int maxCapacity)
{
if (initialCapacity < 0)
{
throw new ArgumentOutOfRangeException(nameof(initialCapacity), "initialCapacity must be greater than zero");
ThrowHelper.ThrowArgumentOutOfRangeException(nameof(initialCapacity), "initialCapacity must be greater than zero");
}

if (initialCapacity > maxCapacity)
{
throw new ArgumentOutOfRangeException(nameof(initialCapacity), $"initialCapacity ({initialCapacity}) must be greater than maxCapacity ({maxCapacity})");
ThrowHelper.ThrowArgumentOutOfRangeException(nameof(initialCapacity), $"initialCapacity ({initialCapacity}) must be greater than maxCapacity ({maxCapacity})");
}
}

protected abstract IByteBuffer NewHeapBuffer(int initialCapacity, int maxCapacity);

protected abstract IByteBuffer NewDirectBuffer(int initialCapacity, int maxCapacity);

public abstract bool IsDirectBufferPooled { get; }

public int CalculateNewCapacity(int minNewCapacity, int maxCapacity)
{
if (minNewCapacity < 0)
Expand Down
4 changes: 4 additions & 0 deletions src/DotNetty.Buffers/AbstractPooledDerivedByteBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,14 @@ protected internal sealed override void Deallocate()

public sealed override IByteBufferAllocator Allocator => this.Unwrap().Allocator;

public sealed override bool IsDirect => this.Unwrap().IsDirect;

public override bool HasArray => this.Unwrap().HasArray;

public override byte[] Array => this.Unwrap().Array;

public override bool HasMemoryAddress => this.Unwrap().HasMemoryAddress;

public sealed override int IoBufferCount => this.Unwrap().IoBufferCount;

public sealed override IByteBuffer RetainedSlice()
Expand Down
21 changes: 19 additions & 2 deletions src/DotNetty.Buffers/AbstractUnpooledSlicedByteBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ namespace DotNetty.Buffers
{
using System;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using DotNetty.Common.Internal;
Expand Down Expand Up @@ -45,6 +46,8 @@ protected AbstractUnpooledSlicedByteBuffer(IByteBuffer buffer, int index, int le

public override IByteBufferAllocator Allocator => this.Unwrap().Allocator;

public override bool IsDirect => this.Unwrap().IsDirect;

public override IByteBuffer AdjustCapacity(int newCapacity) => throw new NotSupportedException("sliced buffer");

public override bool HasArray => this.Unwrap().HasArray;
Expand All @@ -53,6 +56,20 @@ protected AbstractUnpooledSlicedByteBuffer(IByteBuffer buffer, int index, int le

public override int ArrayOffset => this.Idx(this.Unwrap().ArrayOffset);

public override bool HasMemoryAddress => this.Unwrap().HasMemoryAddress;

public override ref byte GetPinnableMemoryAddress() => ref Unsafe.Add(ref this.Unwrap().GetPinnableMemoryAddress(), this.adjustment);

public override IntPtr AddressOfPinnedMemory()
{
IntPtr ptr = this.Unwrap().AddressOfPinnedMemory();
if (ptr == IntPtr.Zero)
{
return ptr;
}
return ptr + this.adjustment;
}

public override byte GetByte(int index)
{
this.CheckIndex0(index, 1);
Expand Down Expand Up @@ -275,7 +292,7 @@ public override ArraySegment<byte>[] GetIoBuffers(int index, int length)
return this.Unwrap().GetIoBuffers(index + this.adjustment, length);
}

public override int ForEachByte(int index, int length, ByteProcessor processor)
public override int ForEachByte(int index, int length, IByteProcessor processor)
{
this.CheckIndex0(index, length);
int ret = this.Unwrap().ForEachByte(this.Idx(index), length, processor);
Expand All @@ -289,7 +306,7 @@ public override int ForEachByte(int index, int length, ByteProcessor processor)
}
}

public override int ForEachByteDesc(int index, int length, ByteProcessor processor)
public override int ForEachByteDesc(int index, int length, IByteProcessor processor)
{
this.CheckIndex0(index, length);
int ret = this.Unwrap().ForEachByteDesc(this.Idx(index), length, processor);
Expand Down
8 changes: 4 additions & 4 deletions src/DotNetty.Buffers/AdvancedLeakAwareByteBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -534,25 +534,25 @@ public override int BytesBefore(int index, int length, byte value)
return base.BytesBefore(index, length, value);
}

public override int ForEachByte(ByteProcessor processor)
public override int ForEachByte(IByteProcessor processor)
{
RecordLeakNonRefCountingOperation(this.Leak);
return base.ForEachByte(processor);
}

public override int ForEachByte(int index, int length, ByteProcessor processor)
public override int ForEachByte(int index, int length, IByteProcessor processor)
{
RecordLeakNonRefCountingOperation(this.Leak);
return base.ForEachByte(index, length, processor);
}

public override int ForEachByteDesc(ByteProcessor processor)
public override int ForEachByteDesc(IByteProcessor processor)
{
RecordLeakNonRefCountingOperation(this.Leak);
return base.ForEachByteDesc(processor);
}

public override int ForEachByteDesc(int index, int length, ByteProcessor processor)
public override int ForEachByteDesc(int index, int length, IByteProcessor processor)
{
RecordLeakNonRefCountingOperation(this.Leak);
return base.ForEachByteDesc(index, length, processor);
Expand Down
8 changes: 4 additions & 4 deletions src/DotNetty.Buffers/AdvancedLeakAwareCompositeByteBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -416,25 +416,25 @@ public override int BytesBefore(int index, int length, byte value)
return base.BytesBefore(index, length, value);
}

public override int ForEachByte(ByteProcessor processor)
public override int ForEachByte(IByteProcessor processor)
{
RecordLeakNonRefCountingOperation(this.Leak);
return base.ForEachByte(processor);
}

public override int ForEachByte(int index, int length, ByteProcessor processor)
public override int ForEachByte(int index, int length, IByteProcessor processor)
{
RecordLeakNonRefCountingOperation(this.Leak);
return base.ForEachByte(index, length, processor);
}

public override int ForEachByteDesc(ByteProcessor processor)
public override int ForEachByteDesc(IByteProcessor processor)
{
RecordLeakNonRefCountingOperation(this.Leak);
return base.ForEachByteDesc(processor);
}

public override int ForEachByteDesc(int index, int length, ByteProcessor processor)
public override int ForEachByteDesc(int index, int length, IByteProcessor processor)
{
RecordLeakNonRefCountingOperation(this.Leak);
return base.ForEachByteDesc(index, length, processor);
Expand Down
8 changes: 4 additions & 4 deletions src/DotNetty.Buffers/ByteBufferUtil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ static int FirstIndexOf(IByteBuffer buffer, int fromIndex, int toIndex, byte val
return -1;
}

return buffer.ForEachByte(fromIndex, toIndex - fromIndex, new ByteProcessor.IndexOfProcessor(value));
return buffer.ForEachByte(fromIndex, toIndex - fromIndex, new IndexOfProcessor(value));
}

static int LastIndexOf(IByteBuffer buffer, int fromIndex, int toIndex, byte value)
Expand All @@ -292,7 +292,7 @@ static int LastIndexOf(IByteBuffer buffer, int fromIndex, int toIndex, byte valu
return -1;
}

return buffer.ForEachByteDesc(toIndex, fromIndex - toIndex, new ByteProcessor.IndexOfProcessor(value));
return buffer.ForEachByteDesc(toIndex, fromIndex - toIndex, new IndexOfProcessor(value));
}

/// <summary>
Expand Down Expand Up @@ -667,9 +667,9 @@ public static bool IsText(IByteBuffer buf, int index, int length, Encoding encod

static readonly FindNonAscii AsciiByteProcessor = new FindNonAscii();

sealed class FindNonAscii : ByteProcessor
sealed class FindNonAscii : IByteProcessor
{
public override bool Process(byte value) => value < 0x80;
public bool Process(byte value) => value < 0x80;
}

static bool IsAscii(IByteBuffer buf, int index, int length) => buf.ForEachByte(index, length, AsciiByteProcessor) == -1;
Expand Down
Loading

0 comments on commit d77bc29

Please sign in to comment.