From 4f9e8322f7ba4e7a193496cb5c58798a81467068 Mon Sep 17 00:00:00 2001 From: Anne Thompson Date: Thu, 28 Mar 2024 14:26:59 -0700 Subject: [PATCH] first steps --- .dotnet/src/Custom/Chat/ChatClient.cs | 67 ++++++------- .../src/Custom/Chat/StreamingChatUpdate.cs | 12 ++- .dotnet/src/Utility/SseReader.cs | 8 +- .../src/Utility/StreamingClientResultOfT.cs | 28 ++++++ .dotnet/src/Utility/StreamingResult.cs | 95 ------------------- 5 files changed, 73 insertions(+), 137 deletions(-) create mode 100644 .dotnet/src/Utility/StreamingClientResultOfT.cs delete mode 100644 .dotnet/src/Utility/StreamingResult.cs diff --git a/.dotnet/src/Custom/Chat/ChatClient.cs b/.dotnet/src/Custom/Chat/ChatClient.cs index eeeeeacf9..a4b7b1352 100644 --- a/.dotnet/src/Custom/Chat/ChatClient.cs +++ b/.dotnet/src/Custom/Chat/ChatClient.cs @@ -40,8 +40,8 @@ public ChatClient(string model, ApiKeyCredential credential = default, OpenAICli /// The user message to provide as a prompt for chat completion. /// Additional options for the chat completion request. /// A result for a single chat completion. - public virtual ClientResult CompleteChat(string message, ChatCompletionOptions options = null) - => CompleteChat(new List() { new ChatRequestUserMessage(message) }, options); + public virtual ClientResult CompleteChat(string message, ChatCompletionOptions options = null) + => CompleteChat(new List() { new ChatRequestUserMessage(message) }, options); /// /// Generates a single chat completion result for a single, simple user message. @@ -49,9 +49,9 @@ public virtual ClientResult CompleteChat(string message, ChatCom /// The user message to provide as a prompt for chat completion. /// Additional options for the chat completion request. /// A result for a single chat completion. - public virtual Task> CompleteChatAsync(string message, ChatCompletionOptions options = null) - => CompleteChatAsync( - new List() { new ChatRequestUserMessage(message) }, options); + public virtual Task> CompleteChatAsync(string message, ChatCompletionOptions options = null) + => CompleteChatAsync( + new List() { new ChatRequestUserMessage(message) }, options); /// /// Generates a single chat completion result for a provided set of input chat messages. @@ -63,7 +63,7 @@ public virtual ClientResult CompleteChat( IEnumerable messages, ChatCompletionOptions options = null) { - Internal.Models.CreateChatCompletionRequest request = CreateInternalRequest(messages, options); + Internal.Models.CreateChatCompletionRequest request = CreateInternalRequest(messages, options); ClientResult response = Shim.CreateChatCompletion(request); ChatCompletion chatCompletion = new(response.Value, internalChoiceIndex: 0); return ClientResult.FromValue(chatCompletion, response.GetRawResponse()); @@ -93,7 +93,6 @@ public virtual async Task> CompleteChatAsync( /// The number of independent, alternative response choices that should be generated. /// /// Additional options for the chat completion request. - /// The cancellation token for the operation. /// A result for a single chat completion. public virtual ClientResult CompleteChat( IEnumerable messages, @@ -147,14 +146,14 @@ public virtual async Task> CompleteChatAs /// /// Additional options for the chat completion request. /// A streaming result with incremental chat completion updates. - public virtual StreamingClientResult CompleteChatStreaming( - string message, - int? choiceCount = null, - ChatCompletionOptions options = null) - => CompleteChatStreaming( - new List { new ChatRequestUserMessage(message) }, - choiceCount, - options); + public virtual StreamingClientResult CompleteChatStreaming( + string message, + int? choiceCount = null, + ChatCompletionOptions options = null) + => CompleteChatStreaming( + new List { new ChatRequestUserMessage(message) }, + choiceCount, + options); /// /// Begins a streaming response for a chat completion request using a single, simple user message as input. @@ -191,29 +190,25 @@ public virtual Task> CompleteChatStre /// The number of independent, alternative choices that the chat completion request should generate. /// /// Additional options for the chat completion request. - /// The cancellation token for the operation. /// A streaming result with incremental chat completion updates. public virtual StreamingClientResult CompleteChatStreaming( IEnumerable messages, int? choiceCount = null, ChatCompletionOptions options = null) { - PipelineMessage requestMessage = CreateCustomRequestMessage(messages, choiceCount, options); - requestMessage.BufferResponse = false; - Shim.Pipeline.Send(requestMessage); - PipelineResponse response = requestMessage.ExtractResponse(); + PipelineMessage message = CreateCustomRequestMessage(messages, choiceCount, options); + message.BufferResponse = false; + + Shim.Pipeline.Send(message); + + PipelineResponse response = message.Response; if (response.IsError) { throw new ClientResultException(response); } - ClientResult genericResult = ClientResult.FromResponse(response); - return StreamingClientResult.CreateFromResponse( - genericResult, - (responseForEnumeration) => SseAsyncEnumerator.EnumerateFromSseJsonStream( - responseForEnumeration.GetRawResponse().ContentStream, - StreamingChatUpdate.DeserializeSseChatUpdates)); + return new StreamingChatResult(response); } /// @@ -229,29 +224,25 @@ public virtual StreamingClientResult CompleteChatStreaming( /// The number of independent, alternative choices that the chat completion request should generate. /// /// Additional options for the chat completion request. - /// The cancellation token for the operation. /// A streaming result with incremental chat completion updates. public virtual async Task> CompleteChatStreamingAsync( IEnumerable messages, int? choiceCount = null, ChatCompletionOptions options = null) { - PipelineMessage requestMessage = CreateCustomRequestMessage(messages, choiceCount, options); - requestMessage.BufferResponse = false; - await Shim.Pipeline.SendAsync(requestMessage).ConfigureAwait(false); - PipelineResponse response = requestMessage.ExtractResponse(); + PipelineMessage message = CreateCustomRequestMessage(messages, choiceCount, options); + message.BufferResponse = false; + + await Shim.Pipeline.SendAsync(message).ConfigureAwait(false); + + PipelineResponse response = message.Response; if (response.IsError) { throw new ClientResultException(response); } - ClientResult genericResult = ClientResult.FromResponse(response); - return StreamingClientResult.CreateFromResponse( - genericResult, - (responseForEnumeration) => SseAsyncEnumerator.EnumerateFromSseJsonStream( - responseForEnumeration.GetRawResponse().ContentStream, - StreamingChatUpdate.DeserializeSseChatUpdates)); + return new StreamingChatResult(response); } private Internal.Models.CreateChatCompletionRequest CreateInternalRequest( @@ -326,4 +317,4 @@ private PipelineMessage CreateCustomRequestMessage(IEnumerable _responseErrorClassifier200 ??= PipelineMessageClassifier.Create(stackalloc ushort[] { 200 }); -} +} \ No newline at end of file diff --git a/.dotnet/src/Custom/Chat/StreamingChatUpdate.cs b/.dotnet/src/Custom/Chat/StreamingChatUpdate.cs index f7e9e9ec4..11e50eabe 100644 --- a/.dotnet/src/Custom/Chat/StreamingChatUpdate.cs +++ b/.dotnet/src/Custom/Chat/StreamingChatUpdate.cs @@ -1,9 +1,9 @@ -namespace OpenAI.Chat; - using System; using System.Collections.Generic; using System.Text.Json; +namespace OpenAI.Chat; + /// /// Represents an incremental item of new data in a streaming response to a chat completion request. /// @@ -184,11 +184,17 @@ internal StreamingChatUpdate( internal static IEnumerable DeserializeSseChatUpdates(ReadOnlyMemory _, JsonElement sseDataJson) { + // TODO: would another enumerable implementation be more performant than list? List results = []; + + // TODO: Do we need to validate that we didn't get null or empty? + // What's the contract for the JSON updates? + if (sseDataJson.ValueKind == JsonValueKind.Null) { return results; } + string id = default; DateTimeOffset created = default; string systemFingerprint = null; @@ -333,4 +339,4 @@ Internal.Models.CreateChatCompletionResponseChoiceLogprobs internalLogprobs } return results; } -} +} \ No newline at end of file diff --git a/.dotnet/src/Utility/SseReader.cs b/.dotnet/src/Utility/SseReader.cs index cab725caf..fef0d6d7b 100644 --- a/.dotnet/src/Utility/SseReader.cs +++ b/.dotnet/src/Utility/SseReader.cs @@ -6,7 +6,7 @@ namespace OpenAI; -internal sealed class SseReader : IDisposable +internal sealed class SseReader : IDisposable, IAsyncDisposable { private readonly Stream _stream; private readonly StreamReader _reader; @@ -121,4 +121,10 @@ private void Dispose(bool disposing) _disposedValue = true; } } + + ValueTask IAsyncDisposable.DisposeAsync() + { + // TODO: revisit per platforms where async dispose is available. + return new ValueTask(); + } } \ No newline at end of file diff --git a/.dotnet/src/Utility/StreamingClientResultOfT.cs b/.dotnet/src/Utility/StreamingClientResultOfT.cs new file mode 100644 index 000000000..6d2d68225 --- /dev/null +++ b/.dotnet/src/Utility/StreamingClientResultOfT.cs @@ -0,0 +1,28 @@ +using System.ClientModel; +using System.ClientModel.Primitives; +using System.Collections.Generic; +using System.Threading; + +namespace OpenAI; + +#pragma warning disable CS1591 // public XML comments + +/// +/// Represents an operation response with streaming content that can be deserialized and enumerated while the response +/// is still being received. +/// +/// The data type representative of distinct, streamable items. +// TODO: Revisit the IDisposable question +public abstract class StreamingClientResult : ClientResult, IAsyncEnumerable +{ + protected StreamingClientResult(PipelineResponse response) : base(response) + { + } + + // Note that if the implementation disposes the stream, the caller can only + // enumerate the results once. I think this makes sense, but we should + // make sure architects agree. + public abstract IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default); +} + +#pragma warning restore CS1591 // public XML comments \ No newline at end of file diff --git a/.dotnet/src/Utility/StreamingResult.cs b/.dotnet/src/Utility/StreamingResult.cs deleted file mode 100644 index a1b6ff538..000000000 --- a/.dotnet/src/Utility/StreamingResult.cs +++ /dev/null @@ -1,95 +0,0 @@ -using System.ClientModel; -using System.ClientModel; -using System.ClientModel.Primitives; -using System.Threading; -using System.Collections.Generic; -using System; - -namespace OpenAI; - -/// -/// Represents an operation response with streaming content that can be deserialized and enumerated while the response -/// is still being received. -/// -/// The data type representative of distinct, streamable items. -public class StreamingClientResult - : IDisposable - , IAsyncEnumerable -{ - private ClientResult _rawResult { get; } - private IAsyncEnumerable _asyncEnumerableSource { get; } - private bool _disposedValue { get; set; } - - private StreamingClientResult() { } - - private StreamingClientResult( - ClientResult rawResult, - Func> asyncEnumerableProcessor) - { - _rawResult = rawResult; - _asyncEnumerableSource = asyncEnumerableProcessor.Invoke(rawResult); - } - - /// - /// Creates a new instance of using the provided underlying HTTP response. The - /// provided function will be used to resolve the response into an asynchronous enumeration of streamed response - /// items. - /// - /// The HTTP response. - /// - /// The function that will resolve the provided response into an IAsyncEnumerable. - /// - /// - /// A new instance of that will be capable of asynchronous enumeration of - /// items from the HTTP response. - /// - internal static StreamingClientResult CreateFromResponse( - ClientResult result, - Func> asyncEnumerableProcessor) - { - return new(result, asyncEnumerableProcessor); - } - - /// - /// Gets the underlying instance that this may enumerate - /// over. - /// - /// The instance attached to this . - public PipelineResponse GetRawResponse() => _rawResult.GetRawResponse(); - - /// - /// Gets the asynchronously enumerable collection of distinct, streamable items in the response. - /// - /// - /// The return value of this method may be used with the "await foreach" statement. - /// - /// As explicitly implements , callers may - /// enumerate a instance directly instead of calling this method. - /// - /// - /// - public IAsyncEnumerable EnumerateValues() => this; - - /// - public void Dispose() - { - Dispose(disposing: true); - GC.SuppressFinalize(this); - } - - /// - protected virtual void Dispose(bool disposing) - { - if (!_disposedValue) - { - if (disposing) - { - _rawResult?.GetRawResponse()?.Dispose(); - } - _disposedValue = true; - } - } - - IAsyncEnumerator IAsyncEnumerable.GetAsyncEnumerator(CancellationToken cancellationToken) - => _asyncEnumerableSource.GetAsyncEnumerator(cancellationToken); -} \ No newline at end of file