diff --git a/RecordParser/Extensions/FileWriter/WriterExtensions.cs b/RecordParser/Extensions/FileWriter/WriterExtensions.cs index ae7dd06..9854375 100644 --- a/RecordParser/Extensions/FileWriter/WriterExtensions.cs +++ b/RecordParser/Extensions/FileWriter/WriterExtensions.cs @@ -7,6 +7,7 @@ namespace RecordParser.Extensions.FileWriter { using RecordParser.Extensions.FileReader; + using System.Threading.Tasks; /// /// Delegate representing object to text convert method. @@ -49,7 +50,14 @@ public static void WriteRecords(this TextWriter textWriter, IEnumerable it { if (options.Enabled) { - WriteParallel(textWriter, items, tryFormat, options); + if (options.EnsureOriginalOrdering) + { + WriteParallelOrdered(textWriter, items, tryFormat, options); + } + else + { + WriteParallelUnordered(textWriter, items, tryFormat, options); + } } else { @@ -64,40 +72,98 @@ private class BufferContext public object lockObj; } - private static void WriteParallel(TextWriter textWriter, IEnumerable items, TryFormat tryFormat, ParallelismOptions options) + private static void WriteParallelUnordered(TextWriter textWriter, IEnumerable items, TryFormat tryFormat, ParallelismOptions options) + { + var parallelism = 20; // TODO remove hardcoded + + var buffers = Enumerable + .Range(0, parallelism) + .Select(_ => new BufferContext + { + pow = 10, + buffer = ArrayPool.Shared.Rent((int)Math.Pow(2, 10)), + lockObj = new object() + }) + .ToArray(); + + var textWriterLock = new object(); + var opts = new ParallelOptions(); + if (options.MaxDegreeOfParallelism is { } max) + opts.MaxDegreeOfParallelism = max; + + try + { + Parallel.ForEach(items, opts, (item, _, i) => + { + var x = buffers[i % parallelism]; + + lock (x.lockObj) + { + retry: + + if (tryFormat(item, x.buffer, out var charsWritten)) + { + lock (textWriterLock) + { + textWriter.WriteLine(x.buffer, 0, charsWritten); + } + } + else + { + ArrayPool.Shared.Return(x.buffer); + x.pow++; + x.buffer = ArrayPool.Shared.Rent((int)Math.Pow(2, x.pow)); + goto retry; + } + } + }); + } + finally + { + foreach (var x in buffers) + ArrayPool.Shared.Return(x.buffer); + } + } + + private static void WriteParallelOrdered(TextWriter textWriter, IEnumerable items, TryFormat tryFormat, ParallelismOptions options) { var initialPool = 20; var pool = new Stack(initialPool); - for (var index = 0; index < initialPool; index++) - pool.Push(ArrayPool.Shared.Rent((int)Math.Pow(2, initialPow))); - - var xs = items.AsParallel(options).Select((item, i) => + try { - var buffer = Pop() ?? ArrayPool.Shared.Rent((int)Math.Pow(2, initialPow)); - retry: + for (var index = 0; index < initialPool; index++) + pool.Push(ArrayPool.Shared.Rent((int)Math.Pow(2, initialPow))); - if (tryFormat(item, buffer, out var charsWritten)) + var xs = items.AsParallel(options).Select((item, i) => { - return (buffer, charsWritten); - } - else + var buffer = Pop(); + retry: + + if (tryFormat(item, buffer, out var charsWritten)) + { + return (buffer, charsWritten); + } + else + { + ArrayPool.Shared.Return(buffer); + buffer = ArrayPool.Shared.Rent(buffer.Length * 2); + goto retry; + } + }); + + foreach (var x in xs) { - ArrayPool.Shared.Return(buffer); - buffer = ArrayPool.Shared.Rent(buffer.Length * 2); - goto retry; + textWriter.WriteLine(x.buffer, 0, x.charsWritten); + Push(x.buffer); } - }); - - foreach (var x in xs) - { - textWriter.WriteLine(x.buffer, 0, x.charsWritten); - Push(x.buffer); } - - foreach (var x in pool) + finally { - ArrayPool.Shared.Return(x); + foreach (var x in pool) + { + ArrayPool.Shared.Return(x); + } } pool.Clear(); @@ -107,7 +173,7 @@ char[] Pop() char[] x; lock (pool) pool.TryPop(out x); - return x; + return x ?? ArrayPool.Shared.Rent((int)Math.Pow(2, initialPow)); } void Push(char[] item)