Skip to content

Commit

Permalink
done
Browse files Browse the repository at this point in the history
  • Loading branch information
leandromoh committed Oct 16, 2023
1 parent cac21c4 commit b4f620a
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 99 deletions.
69 changes: 11 additions & 58 deletions RecordParser.Test/FileWriterTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ namespace RecordParser.Test
public class FileWriterTest : TestSetup
{
private static readonly IEnumerable<int> _repeats = new[] { 0, 1, 3, 1_000, 10_000 };
private const int MaxParallelism = 4;

public static IEnumerable<object[]> Repeats()
{
Expand All @@ -31,6 +32,9 @@ public static IEnumerable<object[]> Repeats()
}
}

// the fixed-length file scenario is already covered in the bellow test,
// because "WriteRecords" method dont matters what parser is used,
// since it just receives a delegate
[Theory]
[MemberData(nameof(Repeats))]
public void Write_csv_file(int repeat, bool parallel, bool ordered)
Expand Down Expand Up @@ -71,6 +75,7 @@ public void Write_csv_file(int repeat, bool parallel, bool ordered)
{
Enabled = parallel,
EnsureOriginalOrdering = ordered,
MaxDegreeOfParallelism = MaxParallelism,
};

textWriter.WriteRecords(expectedItems, writer.TryFormat, writeOptions);
Expand All @@ -82,67 +87,15 @@ public void Write_csv_file(int repeat, bool parallel, bool ordered)
using var textReader = new StreamReader(memory);
var readOptions = new VariableLengthReaderOptions()
{
ParallelismOptions = new() { Enabled = parallel }
ParallelismOptions = writeOptions
};

var reads = textReader.ReadRecords(reader, readOptions);

reads.Should().BeEquivalentTo(expectedItems);
}

// the scenario is covered in the above test, because test dont matters what
// parser is used, since it just receives a delegate
//[Theory]
//[MemberData(nameof(Repeats))]
public void Write_fixed_length_file(int repeat, bool parallel, bool ordered)
{
// Arrange

var expectedItems = new Fixture()
.CreateMany<(string Name, DateTime Birthday, decimal Money, Color Color)>(repeat)
.ToList();

var writer = new FixedLengthWriterSequentialBuilder<(string Name, DateTime Birthday, decimal Money, Color Color)>()
.Map(x => x.Name, 50)
.Map(x => x.Birthday, 20, (dest, value) => (value.Ticks.TryFormat(dest, out var written), written))
.Map(x => x.Money, 15)
.Map(x => x.Color, 15)
.Build();

var reader = new FixedLengthReaderSequentialBuilder<(string Name, DateTime Birthday, decimal Money, Color Color)>()
.Map(x => x.Name, 50)
.Map(x => x.Birthday, 20, value => new DateTime(long.Parse(value)))
.Map(x => x.Money, 15)
.Map(x => x.Color, 15)
.Build();

// Act

using var memory = new MemoryStream();
using var textWriter = new StreamWriter(memory);

var writeOptions = new ParallelismOptions()
{
Enabled = parallel,
EnsureOriginalOrdering = ordered
};

textWriter.WriteRecords(expectedItems, writer.TryFormat, writeOptions);
textWriter.Flush();

// Assert

memory.Seek(0, SeekOrigin.Begin);
using var textReader = new StreamReader(memory);
var readOptions = new FixedLengthReaderOptions<(string Name, DateTime Birthday, decimal Money, Color Color)>()
{
Parser = reader.Parse,
ParallelismOptions = new() { Enabled = parallel }
};

var reads = textReader.ReadRecords(readOptions);

reads.Should().BeEquivalentTo(expectedItems);
}
if (ordered)
reads.Should().BeEquivalentTo(expectedItems, cfg => cfg.WithStrictOrdering());
else
reads.Should().BeEquivalentTo(expectedItems);
}
}
}
90 changes: 49 additions & 41 deletions RecordParser/Extensions/FileWriter/WriterExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,54 +69,62 @@ private static void WriteParallel<T>(TextWriter textWriter, IEnumerable<T> items
var parallelism = 20; // TODO remove hardcoded

Check warning on line 69 in RecordParser/Extensions/FileWriter/WriterExtensions.cs

View workflow job for this annotation

GitHub Actions / dotnet test

The variable 'parallelism' is assigned but its value is never used

Check warning on line 69 in RecordParser/Extensions/FileWriter/WriterExtensions.cs

View workflow job for this annotation

GitHub Actions / dotnet test

The variable 'parallelism' is assigned but its value is never used
var textWriterLock = new object();

var buffers = Enumerable
.Range(0, parallelism)
.Select(_ => new BufferContext
{
pow = initialPow,
buffer = ArrayPool<char>.Shared.Rent((int)Math.Pow(2, initialPow)),
lockObj = new object()
})
.ToArray();

try
//var buffers = Enumerable
// .Range(0, parallelism)
// .Select(_ => new BufferContext
// {
// pow = initialPow,
// buffer = ArrayPool<char>.Shared.Rent((int)Math.Pow(2, initialPow)),
// lockObj = new object()
// })
// .ToArray();

var pool = new Stack<char[]>(20);
char[] Pop()
{
var xs = items.AsParallel(options).Select((item, i) =>
{
var x = buffers[i % parallelism];
char[] x;
lock (pool)
pool.TryPop(out x);
return x;
}
void Push(char[] item)
{
lock (pool)
pool.Push(item);
}

lock (x.lockObj)
{
retry:

if (tryFormat(item, x.buffer, out var charsWritten))
{
lock (textWriterLock)
{
textWriter.WriteLine(x.buffer, 0, charsWritten);
}
}
else
{
ArrayPool<char>.Shared.Return(x.buffer);
x.pow++;
x.buffer = ArrayPool<char>.Shared.Rent((int)Math.Pow(2, x.pow));
goto retry;
}
}
for (var index = 0; index < 20; index++)
pool.Push(ArrayPool<char>.Shared.Rent((int)Math.Pow(2, initialPow)));

// dummy value
return (string)null;
});
var xs = items.AsParallel(options).Select((item, i) =>
{
var buffer = Pop() ?? ArrayPool<char>.Shared.Rent((int)Math.Pow(2, initialPow));
retry:

// dummy iteration to force evaluation
foreach (var _ in xs) ;
if (tryFormat(item, buffer, out var charsWritten))
{
return (buffer, charsWritten);
}
else
{
ArrayPool<char>.Shared.Return(buffer);
buffer = ArrayPool<char>.Shared.Rent(buffer.Length * 2);
goto retry;
}
});

foreach (var x in xs)
{
textWriter.WriteLine(x.buffer, 0, x.charsWritten);
Push(x.buffer);
}
finally

foreach (var x in pool)
{
foreach (var x in buffers)
ArrayPool<char>.Shared.Return(x.buffer);
ArrayPool<char>.Shared.Return(x);
}

pool.Clear();
}

private static void WriteSequential<T>(TextWriter textWriter, IEnumerable<T> items, TryFormat<T> tryFormat)
Expand Down

0 comments on commit b4f620a

Please sign in to comment.