From 109c91d0bbe5d081a62e05717bdba0d57ba3495d Mon Sep 17 00:00:00 2001 From: Anne Thompson Date: Wed, 27 Mar 2024 14:15:29 -0700 Subject: [PATCH 1/4] investigatino --- .../src/Custom/Chat/StreamingChatUpdate.cs | 4 +- .dotnet/src/Utility/SseReader.cs | 2 - .../src/Utility/StreamingClientResultOfT.cs | 59 ++++++++++++ .dotnet/src/Utility/StreamingResult.cs | 95 ------------------- 4 files changed, 61 insertions(+), 99 deletions(-) create mode 100644 .dotnet/src/Utility/StreamingClientResultOfT.cs delete mode 100644 .dotnet/src/Utility/StreamingResult.cs diff --git a/.dotnet/src/Custom/Chat/StreamingChatUpdate.cs b/.dotnet/src/Custom/Chat/StreamingChatUpdate.cs index c1540897b..8cbe73aef 100644 --- a/.dotnet/src/Custom/Chat/StreamingChatUpdate.cs +++ b/.dotnet/src/Custom/Chat/StreamingChatUpdate.cs @@ -1,9 +1,9 @@ -namespace OpenAI.Chat; - using System; using System.Collections.Generic; using System.Text.Json; +namespace OpenAI.Chat; + /// /// Represents an incremental item of new data in a streaming response to a chat completion request. /// diff --git a/.dotnet/src/Utility/SseReader.cs b/.dotnet/src/Utility/SseReader.cs index cf0301408..f517f8d99 100644 --- a/.dotnet/src/Utility/SseReader.cs +++ b/.dotnet/src/Utility/SseReader.cs @@ -1,6 +1,4 @@ using System; -using System.ClientModel; -using System.ClientModel.Internal; using System.IO; using System.Threading.Tasks; diff --git a/.dotnet/src/Utility/StreamingClientResultOfT.cs b/.dotnet/src/Utility/StreamingClientResultOfT.cs new file mode 100644 index 000000000..5b744e7cd --- /dev/null +++ b/.dotnet/src/Utility/StreamingClientResultOfT.cs @@ -0,0 +1,59 @@ +using System; +using System.ClientModel; +using System.ClientModel.Primitives; +using System.Collections.Generic; +using System.Threading; + +namespace OpenAI; + +/// +/// Represents an operation response with streaming content that can be deserialized and enumerated while the response +/// is still being received. +/// +/// The data type representative of distinct, streamable items. +public class StreamingClientResult + : IDisposable + , IAsyncEnumerable +{ + private ClientResult _rawResult { get; } + private bool _disposedValue { get; set; } + + private StreamingClientResult() { } + + private StreamingClientResult(ClientResult rawResult) + { + _rawResult = rawResult; + } + + /// + /// Gets the underlying instance that this may enumerate + /// over. + /// + /// The instance attached to this . + public PipelineResponse GetRawResponse() => _rawResult.GetRawResponse(); + + /// + public void Dispose() + { + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + + /// + protected virtual void Dispose(bool disposing) + { + if (!_disposedValue) + { + if (disposing) + { + _rawResult?.GetRawResponse()?.Dispose(); + } + _disposedValue = true; + } + } + + IAsyncEnumerator IAsyncEnumerable.GetAsyncEnumerator(CancellationToken cancellationToken) + { + + } +} \ No newline at end of file diff --git a/.dotnet/src/Utility/StreamingResult.cs b/.dotnet/src/Utility/StreamingResult.cs deleted file mode 100644 index a1b6ff538..000000000 --- a/.dotnet/src/Utility/StreamingResult.cs +++ /dev/null @@ -1,95 +0,0 @@ -using System.ClientModel; -using System.ClientModel; -using System.ClientModel.Primitives; -using System.Threading; -using System.Collections.Generic; -using System; - -namespace OpenAI; - -/// -/// Represents an operation response with streaming content that can be deserialized and enumerated while the response -/// is still being received. -/// -/// The data type representative of distinct, streamable items. -public class StreamingClientResult - : IDisposable - , IAsyncEnumerable -{ - private ClientResult _rawResult { get; } - private IAsyncEnumerable _asyncEnumerableSource { get; } - private bool _disposedValue { get; set; } - - private StreamingClientResult() { } - - private StreamingClientResult( - ClientResult rawResult, - Func> asyncEnumerableProcessor) - { - _rawResult = rawResult; - _asyncEnumerableSource = asyncEnumerableProcessor.Invoke(rawResult); - } - - /// - /// Creates a new instance of using the provided underlying HTTP response. The - /// provided function will be used to resolve the response into an asynchronous enumeration of streamed response - /// items. - /// - /// The HTTP response. - /// - /// The function that will resolve the provided response into an IAsyncEnumerable. - /// - /// - /// A new instance of that will be capable of asynchronous enumeration of - /// items from the HTTP response. - /// - internal static StreamingClientResult CreateFromResponse( - ClientResult result, - Func> asyncEnumerableProcessor) - { - return new(result, asyncEnumerableProcessor); - } - - /// - /// Gets the underlying instance that this may enumerate - /// over. - /// - /// The instance attached to this . - public PipelineResponse GetRawResponse() => _rawResult.GetRawResponse(); - - /// - /// Gets the asynchronously enumerable collection of distinct, streamable items in the response. - /// - /// - /// The return value of this method may be used with the "await foreach" statement. - /// - /// As explicitly implements , callers may - /// enumerate a instance directly instead of calling this method. - /// - /// - /// - public IAsyncEnumerable EnumerateValues() => this; - - /// - public void Dispose() - { - Dispose(disposing: true); - GC.SuppressFinalize(this); - } - - /// - protected virtual void Dispose(bool disposing) - { - if (!_disposedValue) - { - if (disposing) - { - _rawResult?.GetRawResponse()?.Dispose(); - } - _disposedValue = true; - } - } - - IAsyncEnumerator IAsyncEnumerable.GetAsyncEnumerator(CancellationToken cancellationToken) - => _asyncEnumerableSource.GetAsyncEnumerator(cancellationToken); -} \ No newline at end of file From 49f903f713704792518b005f574cb9268c9a99e2 Mon Sep 17 00:00:00 2001 From: Anne Thompson Date: Wed, 27 Mar 2024 15:04:37 -0700 Subject: [PATCH 2/4] initial enumerator implementation --- .../src/Custom/Chat/StreamingChatResult.cs | 87 +++++++++++++++++++ .../src/Custom/Chat/StreamingChatUpdate.cs | 5 ++ .dotnet/src/Utility/SseAsyncEnumerator.cs | 31 +------ .dotnet/src/Utility/SseReader.cs | 10 ++- .../src/Utility/StreamingClientResultOfT.cs | 19 ++-- 5 files changed, 113 insertions(+), 39 deletions(-) create mode 100644 .dotnet/src/Custom/Chat/StreamingChatResult.cs diff --git a/.dotnet/src/Custom/Chat/StreamingChatResult.cs b/.dotnet/src/Custom/Chat/StreamingChatResult.cs new file mode 100644 index 000000000..7a03fc4e0 --- /dev/null +++ b/.dotnet/src/Custom/Chat/StreamingChatResult.cs @@ -0,0 +1,87 @@ +using System; +using System.ClientModel; +using System.Collections.Generic; +using System.IO; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; + +namespace OpenAI.Chat; + +#nullable enable + +internal class StreamingChatResult : StreamingClientResult +{ + public StreamingChatResult(ClientResult result) : base(result) + { + + } + + public override IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) + { + throw new NotImplementedException(); + } + + + private class StreamingClientResultEnumerator : IAsyncEnumerator + { + private readonly SseReader _sseReader; + + private List? _currentUpdates; + private int _currentUpdateIndex; + + public StreamingClientResultEnumerator(Stream stream) + { + _sseReader = new(stream); + } + + public StreamingChatUpdate Current => throw new NotImplementedException(); + + public async ValueTask MoveNextAsync() + { + // TODO: How to handle the CancellationToken? + + if (_currentUpdates is not null && _currentUpdateIndex < _currentUpdates.Count) + { + _currentUpdateIndex++; + return true; + } + + // We either don't have any stored updates, or we've exceeded the + // count of the ones we have. Get the next set. + + // TODO: Call different configure await variant in this context, or no? + SseLine? sseEvent = await _sseReader.TryReadSingleFieldEventAsync().ConfigureAwait(false); + if (sseEvent is null) + { + // TODO: does this mean we're done or not? + return false; + } + + ReadOnlyMemory name = sseEvent.Value.FieldName; + if (!name.Span.SequenceEqual("data".AsSpan())) + { + throw new InvalidDataException(); + } + + ReadOnlyMemory value = sseEvent.Value.FieldValue; + if (value.Span.SequenceEqual("[DONE]".AsSpan())) + { + // enumerator semantics are that MoveNextAsync returns false when done. + return false; + } + + // TODO:optimize performance using Utf8JsonReader? + using JsonDocument sseMessageJson = JsonDocument.Parse(value); + _currentUpdates = StreamingChatUpdate.DeserializeStreamingChatUpdates(sseMessageJson.RootElement); + return true; + } + + public ValueTask DisposeAsync() + { + // TODO: revisit per platforms where async dispose is available. + _sseReader?.Dispose(); + return new ValueTask(); + } + } +} diff --git a/.dotnet/src/Custom/Chat/StreamingChatUpdate.cs b/.dotnet/src/Custom/Chat/StreamingChatUpdate.cs index 8cbe73aef..c4bb71bdc 100644 --- a/.dotnet/src/Custom/Chat/StreamingChatUpdate.cs +++ b/.dotnet/src/Custom/Chat/StreamingChatUpdate.cs @@ -184,11 +184,16 @@ internal StreamingChatUpdate( internal static List DeserializeStreamingChatUpdates(JsonElement element) { + // TODO: Do we need to validate that we didn't get null or empty? + // What's the contract for the JSON updates? + List results = []; + if (element.ValueKind == JsonValueKind.Null) { return results; } + string id = default; DateTimeOffset created = default; string systemFingerprint = null; diff --git a/.dotnet/src/Utility/SseAsyncEnumerator.cs b/.dotnet/src/Utility/SseAsyncEnumerator.cs index 743a1bedd..d5094cc41 100644 --- a/.dotnet/src/Utility/SseAsyncEnumerator.cs +++ b/.dotnet/src/Utility/SseAsyncEnumerator.cs @@ -19,26 +19,8 @@ internal static async IAsyncEnumerable EnumerateFromSseStream( using SseReader sseReader = new(stream); while (!cancellationToken.IsCancellationRequested) { - SseLine? sseEvent = await sseReader.TryReadSingleFieldEventAsync().ConfigureAwait(false); - if (sseEvent is not null) - { - ReadOnlyMemory name = sseEvent.Value.FieldName; - if (!name.Span.SequenceEqual("data".AsSpan())) - { - throw new InvalidDataException(); - } - ReadOnlyMemory value = sseEvent.Value.FieldValue; - if (value.Span.SequenceEqual("[DONE]".AsSpan())) - { - break; - } - using JsonDocument sseMessageJson = JsonDocument.Parse(value); - IEnumerable newItems = multiElementDeserializer.Invoke(sseMessageJson.RootElement); - foreach (T item in newItems) - { - yield return item; - } - } + + } } finally @@ -47,13 +29,4 @@ internal static async IAsyncEnumerable EnumerateFromSseStream( stream.Dispose(); } } - - internal static IAsyncEnumerable EnumerateFromSseStream( - Stream stream, - Func elementDeserializer, - CancellationToken cancellationToken = default) - => EnumerateFromSseStream( - stream, - (element) => new T[] { elementDeserializer.Invoke(element) }, - cancellationToken); } \ No newline at end of file diff --git a/.dotnet/src/Utility/SseReader.cs b/.dotnet/src/Utility/SseReader.cs index f517f8d99..88a125a81 100644 --- a/.dotnet/src/Utility/SseReader.cs +++ b/.dotnet/src/Utility/SseReader.cs @@ -4,7 +4,7 @@ namespace OpenAI; -internal sealed class SseReader : IDisposable +internal sealed class SseReader : IDisposable, IAsyncDisposable { private readonly Stream _stream; private readonly StreamReader _reader; @@ -113,4 +113,10 @@ public void Dispose() Dispose(disposing: true); GC.SuppressFinalize(this); } - } \ No newline at end of file + + ValueTask IAsyncDisposable.DisposeAsync() + { + // TODO: revisit per platforms where async dispose is available. + return new ValueTask(); + } +} \ No newline at end of file diff --git a/.dotnet/src/Utility/StreamingClientResultOfT.cs b/.dotnet/src/Utility/StreamingClientResultOfT.cs index 5b744e7cd..9d6653800 100644 --- a/.dotnet/src/Utility/StreamingClientResultOfT.cs +++ b/.dotnet/src/Utility/StreamingClientResultOfT.cs @@ -2,25 +2,28 @@ using System.ClientModel; using System.ClientModel.Primitives; using System.Collections.Generic; +using System.IO; using System.Threading; +using System.Threading.Tasks; namespace OpenAI; +#pragma warning disable CS1591 // public XML comments + /// /// Represents an operation response with streaming content that can be deserialized and enumerated while the response /// is still being received. /// /// The data type representative of distinct, streamable items. -public class StreamingClientResult - : IDisposable - , IAsyncEnumerable +public abstract class StreamingClientResult : IDisposable, IAsyncEnumerable { private ClientResult _rawResult { get; } private bool _disposedValue { get; set; } private StreamingClientResult() { } - private StreamingClientResult(ClientResult rawResult) + // TODO: Should constructor take PipelineResponse instead? + protected StreamingClientResult(ClientResult rawResult) { _rawResult = rawResult; } @@ -52,8 +55,8 @@ protected virtual void Dispose(bool disposing) } } - IAsyncEnumerator IAsyncEnumerable.GetAsyncEnumerator(CancellationToken cancellationToken) - { + public abstract IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default); - } -} \ No newline at end of file +} + +#pragma warning restore CS1591 // public XML comments \ No newline at end of file From 920edfeb3a9976df97bdbef48bb799cf80f46d5f Mon Sep 17 00:00:00 2001 From: Anne Thompson Date: Wed, 27 Mar 2024 15:21:28 -0700 Subject: [PATCH 3/4] reworked enumerator and convenience methods --- .dotnet/src/Custom/Chat/ChatClient.cs | 33 +++++--------- .../src/Custom/Chat/StreamingChatResult.cs | 25 ++++++++--- .dotnet/src/Utility/SseAsyncEnumerator.cs | 32 ------------- .../src/Utility/StreamingClientResultOfT.cs | 45 +++---------------- 4 files changed, 33 insertions(+), 102 deletions(-) delete mode 100644 .dotnet/src/Utility/SseAsyncEnumerator.cs diff --git a/.dotnet/src/Custom/Chat/ChatClient.cs b/.dotnet/src/Custom/Chat/ChatClient.cs index 16a875c5a..a580ab374 100644 --- a/.dotnet/src/Custom/Chat/ChatClient.cs +++ b/.dotnet/src/Custom/Chat/ChatClient.cs @@ -93,7 +93,6 @@ public virtual async Task> CompleteChatAsync( /// The number of independent, alternative response choices that should be generated. /// /// Additional options for the chat completion request. - /// The cancellation token for the operation. /// A result for a single chat completion. public virtual ClientResult CompleteChat( IEnumerable messages, @@ -191,29 +190,23 @@ public virtual Task> CompleteChatStre /// The number of independent, alternative choices that the chat completion request should generate. /// /// Additional options for the chat completion request. - /// The cancellation token for the operation. /// A streaming result with incremental chat completion updates. public virtual StreamingClientResult CompleteChatStreaming( IEnumerable messages, int? choiceCount = null, ChatCompletionOptions options = null) { - PipelineMessage requestMessage = CreateCustomRequestMessage(messages, choiceCount, options); - requestMessage.BufferResponse = false; - Shim.Pipeline.Send(requestMessage); - PipelineResponse response = requestMessage.ExtractResponse(); + PipelineMessage message = CreateCustomRequestMessage(messages, choiceCount, options); + message.BufferResponse = false; + Shim.Pipeline.Send(message); + PipelineResponse response = message.Response; if (response.IsError) { throw new ClientResultException(response); } - ClientResult genericResult = ClientResult.FromResponse(response); - return StreamingClientResult.CreateFromResponse( - genericResult, - (responseForEnumeration) => SseAsyncEnumerator.EnumerateFromSseStream( - responseForEnumeration.GetRawResponse().ContentStream, - e => StreamingChatUpdate.DeserializeStreamingChatUpdates(e))); + return new StreamingChatResult(response); } /// @@ -229,29 +222,23 @@ public virtual StreamingClientResult CompleteChatStreaming( /// The number of independent, alternative choices that the chat completion request should generate. /// /// Additional options for the chat completion request. - /// The cancellation token for the operation. /// A streaming result with incremental chat completion updates. public virtual async Task> CompleteChatStreamingAsync( IEnumerable messages, int? choiceCount = null, ChatCompletionOptions options = null) { - PipelineMessage requestMessage = CreateCustomRequestMessage(messages, choiceCount, options); - requestMessage.BufferResponse = false; - await Shim.Pipeline.SendAsync(requestMessage).ConfigureAwait(false); - PipelineResponse response = requestMessage.ExtractResponse(); + PipelineMessage message = CreateCustomRequestMessage(messages, choiceCount, options); + message.BufferResponse = false; + await Shim.Pipeline.SendAsync(message).ConfigureAwait(false); + PipelineResponse response = message.Response; if (response.IsError) { throw new ClientResultException(response); } - ClientResult genericResult = ClientResult.FromResponse(response); - return StreamingClientResult.CreateFromResponse( - genericResult, - (responseForEnumeration) => SseAsyncEnumerator.EnumerateFromSseStream( - responseForEnumeration.GetRawResponse().ContentStream, - e => StreamingChatUpdate.DeserializeStreamingChatUpdates(e))); + return new StreamingChatResult(response); } private Internal.Models.CreateChatCompletionRequest CreateInternalRequest( diff --git a/.dotnet/src/Custom/Chat/StreamingChatResult.cs b/.dotnet/src/Custom/Chat/StreamingChatResult.cs index 7a03fc4e0..850287465 100644 --- a/.dotnet/src/Custom/Chat/StreamingChatResult.cs +++ b/.dotnet/src/Custom/Chat/StreamingChatResult.cs @@ -1,5 +1,5 @@ using System; -using System.ClientModel; +using System.ClientModel.Primitives; using System.Collections.Generic; using System.IO; using System.Text.Json; @@ -12,25 +12,36 @@ namespace OpenAI.Chat; internal class StreamingChatResult : StreamingClientResult { - public StreamingChatResult(ClientResult result) : base(result) + public StreamingChatResult(PipelineResponse response) : base(response) { - } public override IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) { - throw new NotImplementedException(); + // Note: this implementation disposes the stream after the caller has + // enumerated the elements obtained from the stream. That is to say, + // the `await foreach` loop can only happen once -- if it is tried a + // second time, the caller will get an ObjectDisposedException trying + // to access a disposed Stream. + using PipelineResponse response = GetRawResponse(); + + // Extract the content stream from the response to obtain dispose + // ownership of it. This means the content stream will not be disposed + // when the response is disposed. + Stream contentStream = response.ContentStream ?? throw new InvalidOperationException("Cannot enumerate null response ContentStream."); + response.ContentStream = null; + + return new ChatUpdateEnumerator(contentStream); } - - private class StreamingClientResultEnumerator : IAsyncEnumerator + private class ChatUpdateEnumerator : IAsyncEnumerator { private readonly SseReader _sseReader; private List? _currentUpdates; private int _currentUpdateIndex; - public StreamingClientResultEnumerator(Stream stream) + public ChatUpdateEnumerator(Stream stream) { _sseReader = new(stream); } diff --git a/.dotnet/src/Utility/SseAsyncEnumerator.cs b/.dotnet/src/Utility/SseAsyncEnumerator.cs deleted file mode 100644 index d5094cc41..000000000 --- a/.dotnet/src/Utility/SseAsyncEnumerator.cs +++ /dev/null @@ -1,32 +0,0 @@ -using System; -using System.Collections.Generic; -using System.IO; -using System.Runtime.CompilerServices; -using System.Text.Json; -using System.Threading; - -namespace OpenAI; - -internal static class SseAsyncEnumerator -{ - internal static async IAsyncEnumerable EnumerateFromSseStream( - Stream stream, - Func> multiElementDeserializer, - [EnumeratorCancellation] CancellationToken cancellationToken = default) - { - try - { - using SseReader sseReader = new(stream); - while (!cancellationToken.IsCancellationRequested) - { - - - } - } - finally - { - // Always dispose the stream immediately once enumeration is complete for any reason - stream.Dispose(); - } - } -} \ No newline at end of file diff --git a/.dotnet/src/Utility/StreamingClientResultOfT.cs b/.dotnet/src/Utility/StreamingClientResultOfT.cs index 9d6653800..8fbece850 100644 --- a/.dotnet/src/Utility/StreamingClientResultOfT.cs +++ b/.dotnet/src/Utility/StreamingClientResultOfT.cs @@ -1,10 +1,7 @@ -using System; using System.ClientModel; using System.ClientModel.Primitives; using System.Collections.Generic; -using System.IO; using System.Threading; -using System.Threading.Tasks; namespace OpenAI; @@ -15,48 +12,16 @@ namespace OpenAI; /// is still being received. /// /// The data type representative of distinct, streamable items. -public abstract class StreamingClientResult : IDisposable, IAsyncEnumerable +public abstract class StreamingClientResult : ClientResult, IAsyncEnumerable { - private ClientResult _rawResult { get; } - private bool _disposedValue { get; set; } - - private StreamingClientResult() { } - - // TODO: Should constructor take PipelineResponse instead? - protected StreamingClientResult(ClientResult rawResult) - { - _rawResult = rawResult; - } - - /// - /// Gets the underlying instance that this may enumerate - /// over. - /// - /// The instance attached to this . - public PipelineResponse GetRawResponse() => _rawResult.GetRawResponse(); - - /// - public void Dispose() + protected StreamingClientResult(PipelineResponse response) : base(response) { - Dispose(disposing: true); - GC.SuppressFinalize(this); - } - - /// - protected virtual void Dispose(bool disposing) - { - if (!_disposedValue) - { - if (disposing) - { - _rawResult?.GetRawResponse()?.Dispose(); - } - _disposedValue = true; - } } + // Note that if the implementation disposes the stream, the caller can only + // enumerate the results once. I think this makes sense, but we should + // make sure architects agree. public abstract IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default); - } #pragma warning restore CS1591 // public XML comments \ No newline at end of file From 0e4cbedacc186f6cdb1fd4441579e63d2e9b239c Mon Sep 17 00:00:00 2001 From: Anne Thompson Date: Wed, 27 Mar 2024 15:25:20 -0700 Subject: [PATCH 4/4] nits --- .dotnet/src/Custom/Chat/ChatClient.cs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.dotnet/src/Custom/Chat/ChatClient.cs b/.dotnet/src/Custom/Chat/ChatClient.cs index a580ab374..a594ae5eb 100644 --- a/.dotnet/src/Custom/Chat/ChatClient.cs +++ b/.dotnet/src/Custom/Chat/ChatClient.cs @@ -198,7 +198,9 @@ public virtual StreamingClientResult CompleteChatStreaming( { PipelineMessage message = CreateCustomRequestMessage(messages, choiceCount, options); message.BufferResponse = false; + Shim.Pipeline.Send(message); + PipelineResponse response = message.Response; if (response.IsError) @@ -230,7 +232,9 @@ public virtual async Task> CompleteCh { PipelineMessage message = CreateCustomRequestMessage(messages, choiceCount, options); message.BufferResponse = false; + await Shim.Pipeline.SendAsync(message).ConfigureAwait(false); + PipelineResponse response = message.Response; if (response.IsError)