Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Resolved all issues mentioned in PR #44783
This commit addresses all the concerns raised in PR #44783, ensuring correctness, performance improvements, and better maintainability.
  • Loading branch information
HackPoint committed Feb 25, 2025
1 parent fb7e627 commit 5a4c58a
Show file tree
Hide file tree
Showing 11 changed files with 102 additions and 270 deletions.
37 changes: 23 additions & 14 deletions csharp/src/Apache.Arrow.Flight.Sql/Client/FlightSqlClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,13 @@ public FlightSqlClient(FlightClient client)
/// <param name="transaction">A transaction to associate this query with.</param>
/// <param name="options">RPC-layer hints for this call.</param>
/// <returns>The FlightInfo describing where to access the dataset.</returns>
public async Task<FlightInfo> ExecuteAsync(string query, Transaction? transaction = null, FlightCallOptions? options = default)
public async Task<FlightInfo> ExecuteAsync(string query, Transaction transaction = default, FlightCallOptions? options = null)
{
transaction ??= Transaction.NoTransaction;

if (transaction == default)
{
transaction = Transaction.NoTransaction;
}

if (string.IsNullOrEmpty(query))
{
throw new ArgumentException($"Query cannot be null or empty: {nameof(query)}");
Expand All @@ -66,15 +69,18 @@ public async Task<FlightInfo> ExecuteAsync(string query, Transaction? transactio
}

/// <summary>
/// Executes an update query on the server.
/// Executes an update SQL command and returns the number of affected rows.
/// </summary>
/// <param name="query">The UTF8-encoded SQL query to be executed.</param>
/// <param name="transaction">A transaction to associate this query with. Defaults to no transaction if not provided.</param>
/// <param name="options">RPC-layer hints for this call.</param>
/// <returns>The number of rows affected by the operation.</returns>
public async Task<long> ExecuteUpdateAsync(string query, Transaction? transaction = null, FlightCallOptions? options = default)
public async Task<long> ExecuteUpdateAsync(string query, Transaction transaction = default, FlightCallOptions? options = null)
{
transaction ??= Transaction.NoTransaction;
if (transaction == default)
{
transaction = Transaction.NoTransaction;
}

if (string.IsNullOrEmpty(query))
{
Expand Down Expand Up @@ -104,10 +110,7 @@ public async Task<long> ExecuteUpdateAsync(string query, Transaction? transactio

await foreach (var recordBatch in doGetResult.ConfigureAwait(false))
{
foreach (var rowCount in recordBatch.ExtractRowCount())
{
affectedRows += rowCount;
}
affectedRows += recordBatch.ExtractRowCount();
}
}

Expand Down Expand Up @@ -170,9 +173,12 @@ public async IAsyncEnumerable<FlightResult> DoActionAsync(FlightAction action, F
/// <param name="transaction">A transaction to associate this query with</param>
/// <param name="options">Per-RPC options</param>
/// <returns>The SchemaResult describing the schema of the result set</returns>
public async Task<Schema> GetExecuteSchemaAsync(string query, Transaction? transaction = null, FlightCallOptions? options = default)
public async Task<Schema> GetExecuteSchemaAsync(string query, Transaction transaction = default, FlightCallOptions? options = null)
{
transaction ??= Transaction.NoTransaction;
if (transaction == default)
{
transaction = Transaction.NoTransaction;
}

if (string.IsNullOrEmpty(query))
throw new ArgumentException($"Query cannot be null or empty: {nameof(query)}");
Expand Down Expand Up @@ -897,12 +903,15 @@ public AsyncServerStreamingCall<FlightResult> RollbackAsync(Transaction transact
/// <param name="transaction">A transaction to associate this query with.</param>
/// <param name="options">RPC-layer hints for this call.</param>
/// <returns>The created prepared statement.</returns>
public async Task<PreparedStatement> PrepareAsync(string query, Transaction? transaction = null, FlightCallOptions? options = default)
public async Task<PreparedStatement> PrepareAsync(string query, Transaction transaction = default, FlightCallOptions? options = null)
{
if (string.IsNullOrEmpty(query))
throw new ArgumentException("Query cannot be null or empty", nameof(query));

transaction ??= Transaction.NoTransaction;
if (transaction == default)
{
transaction = Transaction.NoTransaction;
}

try
{
Expand Down
11 changes: 1 addition & 10 deletions csharp/src/Apache.Arrow.Flight.Sql/FlightCallOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,13 @@ public FlightCallOptions()
{
Timeout = TimeSpan.FromSeconds(-1);
}

// Implement any necessary options for RPC calls
public Metadata Headers { get; set; } = new();

/// <summary>
/// Gets or sets a token to enable interactive user cancellation of long-running requests.
/// </summary>
public CancellationToken StopToken { get; set; }

/// <summary>
/// Gets or sets the optional timeout for this call.
/// Negative durations mean an implementation-defined default behavior will be used instead.
/// </summary>
public TimeSpan Timeout { get; set; }

/// <summary>
/// Gets or sets an optional memory manager to control where to allocate incoming data.
/// </summary>
public MemoryManager<ArrowBuffer>? MemoryManager { get; set; }
}
62 changes: 11 additions & 51 deletions csharp/src/Apache.Arrow.Flight.Sql/FlightExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
// limitations under the License.

using System;
using System.Collections.Generic;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;

Expand All @@ -23,59 +22,20 @@ namespace Apache.Arrow.Flight.Sql;
internal static class FlightExtensions
{
public static byte[] PackAndSerialize(this IMessage command) => Any.Pack(command).ToByteArray();
public static T ParseAndUnpack<T>(this ByteString source) where T : IMessage<T>, new() => Any.Parser.ParseFrom(source).Unpack<T>();

public static IEnumerable<long> ExtractRowCount(this RecordBatch batch)
{
foreach (var array in batch.Arrays)
{
var values = ExtractValues(array);
foreach (var value in values)
{
yield return value switch
{
long l => l,
int i => i != 0 ? i : 0,
_ => 0L
};
}
}
}

private static IEnumerable<object?> ExtractValues(IArrowArray array)
{
return array switch
{
Int32Array int32Array => ExtractPrimitiveValues(int32Array),
Int64Array int64Array => ExtractPrimitiveValues(int64Array),
FloatArray floatArray => ExtractPrimitiveValues(floatArray),
BooleanArray booleanArray => ExtractBooleanValues(booleanArray),
StringArray stringArray => ExtractStringValues(stringArray),
_ => throw new NotSupportedException($"Array type {array.GetType().Name} is not supported.")
};
}

private static IEnumerable<object?> ExtractPrimitiveValues<T>(PrimitiveArray<T> array) where T : struct, IEquatable<T>
{
for (int i = 0; i < array.Length; i++)
{
yield return array.IsNull(i) ? null : array.Values[i];
}
}

private static IEnumerable<object?> ExtractBooleanValues(BooleanArray array)
{
for (int i = 0; i < array.Length; i++)
{
yield return array.IsNull(i) ? null : array.Values[i];
}
}

private static IEnumerable<string?> ExtractStringValues(StringArray stringArray)
public static T ParseAndUnpack<T>(this ByteString source) where T : IMessage<T>, new() =>
Any.Parser.ParseFrom(source).Unpack<T>();

public static int ExtractRowCount(this RecordBatch batch)
{
for (int i = 0; i < stringArray.Length; i++)
if (batch.ColumnCount == 0) return 0;
int length = batch.Column(0).Length;
foreach (var column in batch.Arrays)
{
yield return stringArray.IsNull(i) ? null : stringArray.GetString(i);
if (column.Length != length)
throw new InvalidOperationException("Inconsistent column lengths in RecordBatch.");
}

return length;
}
}
1 change: 0 additions & 1 deletion csharp/src/Apache.Arrow.Flight.Sql/PreparedStatement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ public async Task<Schema> GetSchemaAsync(FlightCallOptions? options = default)
}
}


/// <summary>
/// Closes the prepared statement asynchronously.
/// </summary>
Expand Down
138 changes: 0 additions & 138 deletions csharp/src/Apache.Arrow.Flight.Sql/RecordBatchExtensions.cs

This file was deleted.

10 changes: 4 additions & 6 deletions csharp/src/Apache.Arrow.Flight.Sql/SchemaExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,13 @@ public static class SchemaExtensions
/// </summary>
/// <param name="serializedSchema">The byte array representing the serialized schema.</param>
/// <returns>The deserialized Schema object.</returns>
public static Schema DeserializeSchema(byte[] serializedSchema)
public static Schema DeserializeSchema(ReadOnlyMemory<byte> serializedSchema)
{
if (serializedSchema == null || serializedSchema.Length == 0)
if (serializedSchema.IsEmpty)
{
throw new ArgumentException("Invalid serialized schema");
throw new ArgumentException("Invalid serialized schema", nameof(serializedSchema));
}

using var stream = new MemoryStream(serializedSchema);
var reader = new ArrowStreamReader(stream);
using var reader = new ArrowStreamReader(serializedSchema);
return reader.Schema;
}

Expand Down
21 changes: 18 additions & 3 deletions csharp/src/Apache.Arrow.Flight.Sql/TableRef.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using System;

namespace Apache.Arrow.Flight.Sql;

public class TableRef
{
public string? Catalog { get; set; }
public string DbSchema { get; set; } = null!;
public string Table { get; set; } = null!;
public string? Catalog { get; }
public string DbSchema { get; }
public string Table { get; }

public TableRef(string dbSchema, string table)
{
DbSchema = dbSchema ?? throw new ArgumentNullException(nameof(dbSchema));
Table = table ?? throw new ArgumentNullException(nameof(table));
}

public TableRef(string? catalog, string dbSchema, string table)
{
Catalog = catalog;
DbSchema = dbSchema ?? throw new ArgumentNullException(nameof(dbSchema));
Table = table ?? throw new ArgumentNullException(nameof(table));
}
}
Loading

0 comments on commit 5a4c58a

Please sign in to comment.