Skip to content

Commit

Permalink
Updates
Browse files Browse the repository at this point in the history
  • Loading branch information
LucGenetier committed Apr 9, 2024
1 parent e65818a commit 3b3a954
Show file tree
Hide file tree
Showing 9 changed files with 369 additions and 130 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.PowerFx.Core.IR;
using Microsoft.PowerFx.Types;

namespace Microsoft.PowerFx.Connectors
{
// Implements CDP protocol for Tabular connectors
public class CdpTabularService : TabularService
{
public string DataSetName { get; }

public string TableName { get; }

protected HttpClient _httpClient;

protected string _uriPrefix;

public CdpTabularService(string dataset, string table, HttpClient httpClient, string uriPrefix = null)
{
DataSetName = dataset ?? throw new ArgumentNullException(nameof(dataset));
TableName = table ?? throw new ArgumentNullException(nameof(table));
_httpClient = httpClient ?? throw new ArgumentNullException(nameof(httpClient));
_uriPrefix = uriPrefix;
}

//// TABLE METADATA SERVICE
// GET: /$metadata.json/datasets/{datasetName}/tables/{tableName}?api-version=2015-09-01
public override async Task<RecordType> GetSchemaAsync(CancellationToken cancellationToken)
{
using HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, _uriPrefix + $"/$metadata.json/datasets/{DataSetName}/tables/{TableName}?api-version=2015-09-01");
using HttpResponseMessage response = await _httpClient.SendAsync(request, cancellationToken).ConfigureAwait(false);

string text = response?.Content == null ? string.Empty : await response.Content.ReadAsStringAsync().ConfigureAwait(false);
int statusCode = (int)response.StatusCode;

return statusCode < 300 ? GetSchema(text) : null;
}

internal static RecordType GetSchema(string text)
{
ConnectorType connectorType = ConnectorFunction.GetConnectorType("Schema/Items", FormulaValue.New(text), ConnectorCompatibility.SwaggerCompatibility);
return connectorType?.FormulaType as RecordType;
}

// TABLE DATA SERVICE - CREATE
// POST: /datasets/{datasetName}/tables/{tableName}/items?api-version=2015-09-01

// TABLE DATA SERVICE - READ
// GET AN ITEM - GET: /datasets/{datasetName}/tables/{tableName}/items/{id}?api-version=2015-09-01

// LIST ITEMS - GET: /datasets/{datasetName}/tables/{tableName}/items?$filter=’CreatedBy’ eq ‘john.doe’&$top=50&$orderby=’Priority’ asc, ’CreationDate’ desc
public override async Task<ICollection<DValue<RecordValue>>> GetItemsAsync(IServiceProvider serviceProvider, CancellationToken cancellationToken)
{
using HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, _uriPrefix + $"/datasets/{DataSetName}/tables/{TableName}/items?api-version=2015-09-01");
using HttpResponseMessage response = await _httpClient.SendAsync(request, cancellationToken).ConfigureAwait(false);

string text = response?.Content == null ? string.Empty : await response.Content.ReadAsStringAsync().ConfigureAwait(false);
int statusCode = (int)response.StatusCode;

return statusCode < 300 ? GetResult(text) : Array.Empty<DValue<RecordValue>>();
}

protected ICollection<DValue<RecordValue>> GetResult(string text)
{
// $$$ Is this always this type?
RecordValue rv = FormulaValueJSON.FromJson(text, RecordType.Empty().Add("value", TableType)) as RecordValue;
TableValue tv = rv.Fields.FirstOrDefault(field => field.Name == "value").Value as TableValue;

// The call we make contains more fields and we want to remove them here ('@odata.etag')
return new InMemoryTableValue(IRContext.NotInSource(TableType), tv.Rows).Rows.ToArray();
}

// TABLE DATA SERVICE - UPDATE
// PATCH: /datasets/{datasetName}/tables/{tableName}/items/{id}

// TABLE DATA SERVICE - DELETE
// DELETE: /datasets/{datasetName}/tables/{tableName}/items/{id}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@ namespace Microsoft.PowerFx
public static class ConfigExtensions
{
/// <summary>
/// Add functions for each operation in the <see cref="OpenApiDocument"/>.
/// Add functions for each operation in the <see cref="OpenApiDocument"/>.
/// Functions names will be 'functionNamespace.operationName'.
/// Functions are invoked via rest via the httpClient. The client must handle authentication.
/// Functions are invoked via rest via the httpClient. The client must handle authentication.
/// </summary>
///
///
/// <param name="config">Config to add the functions to.</param>
/// <param name="connectorSettings">Connector settings containing Namespace and MaxRows to be returned.</param>
/// <param name="connectorSettings">Connector settings containing Namespace and MaxRows to be returned.</param>
/// <param name="openApiDocument">An API document. This can represent multiple formats, including Swagger 2.0 and OpenAPI 3.0.</param>
/// <param name="configurationLogger">Logger.</param>
/// <param name="globalValues">Global Values.</param>
/// <returns>List of connector functions.</returns>
public static IReadOnlyList<ConnectorFunction> AddActionConnector(this PowerFxConfig config, ConnectorSettings connectorSettings, OpenApiDocument openApiDocument, ConnectorLogger configurationLogger = null, IReadOnlyDictionary<string, FormulaValue> globalValues = null)
{
{
try
{
configurationLogger?.LogInformation($"Entering in ConfigExtensions.{nameof(AddActionConnector)}, with {nameof(ConnectorSettings)} {LogConnectorSettings(connectorSettings)}");
Expand Down Expand Up @@ -65,9 +65,9 @@ internal static IReadOnlyList<ConnectorFunction> AddActionConnectorInternal(this
}

/// <summary>
/// Add functions for each operation in the <see cref="OpenApiDocument"/>.
/// Add functions for each operation in the <see cref="OpenApiDocument"/>.
/// Functions names will be 'functionNamespace.operationName'.
/// Functions are invoked via rest via the httpClient. The client must handle authentication.
/// Functions are invoked via rest via the httpClient. The client must handle authentication.
/// </summary>
/// <param name="config">Config to add the functions to.</param>
/// <param name="namespace">Namespace name.</param>
Expand All @@ -76,7 +76,7 @@ internal static IReadOnlyList<ConnectorFunction> AddActionConnectorInternal(this
/// <param name="globalValues">Global Values.</param>
/// <returns>List of connector functions.</returns>
public static IReadOnlyList<ConnectorFunction> AddActionConnector(this PowerFxConfig config, string @namespace, OpenApiDocument openApiDocument, ConnectorLogger configurationLogger = null, IReadOnlyDictionary<string, FormulaValue> globalValues = null)
{
{
try
{
configurationLogger?.LogInformation($"Entering in ConfigExtensions.{nameof(AddActionConnector)}, with {nameof(ConnectorSettings)} Namespace {@namespace ?? Null(nameof(@namespace))}");
Expand All @@ -87,25 +87,14 @@ public static IReadOnlyList<ConnectorFunction> AddActionConnector(this PowerFxCo
return null;
}

configurationLogger?.LogInformation($"Exiting ConfigExtensions.{nameof(AddActionConnector)}, returning {connectorFunctions.Count()} functions");
configurationLogger?.LogInformation($"Exiting ConfigExtensions.{nameof(AddActionConnector)}, returning {connectorFunctions.Count()} functions");
return connectorFunctions;
}
catch (Exception ex)
{
configurationLogger?.LogException(ex, $"Exception in ConfigExtensions.{nameof(AddActionConnector)}, Namespace {@namespace ?? Null(nameof(@namespace))}, {LogException(ex)}");
throw;
}
}
}

public static async Task<ConnectorTableValue> AddTabularConnector(this PowerFxConfig config, string tableName, TabularService tabularService, BaseRuntimeConnectorContext runtimeConnectorContext, CancellationToken cancellationToken, ConnectorLogger configurationLogger = null)
{
cancellationToken.ThrowIfCancellationRequested();

RecordType recordType = await tabularService.InitAsync(config, tableName, runtimeConnectorContext, cancellationToken).ConfigureAwait(false);

return recordType == null
? throw new InvalidOperationException("Cannot determine table schema")
: new ConnectorTableValue(tabularService, recordType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,55 +10,48 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.OpenApi.Models;
using Microsoft.PowerFx.Core.IR;
using Microsoft.PowerFx.Types;

namespace Microsoft.PowerFx.Connectors
{
public class SwaggerTabularService : TabularService
// Swagger based CDP tabular service
public sealed class SwaggerTabularService : CdpTabularService
{
//public string Namespace { get; private set; }
public string Namespace => $"_tbl_{ConnectionId}";

//public string TableName { get; private set; }
public string ConnectionId => _connectionId;

//// TABLE METADATA SERVICE
// GET: /$metadata.json/datasets/{datasetName}/tables/{tableName}?api-version=2015-09-01
internal ConnectorFunction MetadataService => _metadataService ??= GetMetadataService();

// TABLE DATA SERVICE - CREATE
// POST: /datasets/{datasetName}/tables/{tableName}/items?api-version=2015-09-01

// TABLE DATA SERVICE - READ
// GET AN ITEM - GET: /datasets/{datasetName}/tables/{tableName}/items/{id}?api-version=2015-09-01

// LIST ITEMS - GET: /datasets/{datasetName}/tables/{tableName}/items?$filter=’CreatedBy’ eq ‘john.doe’&$top=50&$orderby=’Priority’ asc, ’CreationDate’ desc
internal ConnectorFunction GetItems => _getItems ??= GetItemsFunction();

// TABLE DATA SERVICE - UPDATE
// PATCH: /datasets/{datasetName}/tables/{tableName}/items/{id}

// TABLE DATA SERVICE - DELETE
// DELETE: /datasets/{datasetName}/tables/{tableName}/items/{id}
private readonly string _connectionId;

private IReadOnlyList<ConnectorFunction> _tabularFunctions;

private ConnectorFunction _metadataService;

private ConnectorFunction _getItems;

private readonly OpenApiDocument _openApiDocument;

private readonly IReadOnlyDictionary<string, FormulaValue> _globalValues;
private readonly HttpClient _httpClient;

private readonly ConnectorLogger _connectorLogger;

public SwaggerTabularService(OpenApiDocument openApiDocument, IReadOnlyDictionary<string, FormulaValue> globalValues, HttpClient client, ConnectorLogger configurationLogger = null)
: base(globalValues)
private readonly PowerFxConfig _config;

public SwaggerTabularService(PowerFxConfig config, OpenApiDocument openApiDocument, IReadOnlyDictionary<string, FormulaValue> globalValues, HttpClient httpClient, ConnectorLogger configurationLogger = null)
: base(GetDataSetName(globalValues), GetTableName(globalValues), httpClient)
{
_config = config;
_openApiDocument = openApiDocument;
_globalValues = globalValues;
_httpClient = client;
_connectorLogger = configurationLogger;
_connectionId = TryGetString("connectionId", globalValues, out string connectorId) ? connectorId : throw new InvalidOperationException("Cannot determine connectionId.");
}

internal override async Task<RecordType> InitInternalAsync(PowerFxConfig config, string tableName, BaseRuntimeConnectorContext runtimeConnectorContext, CancellationToken cancellationToken)
//// TABLE METADATA SERVICE
// GET: /$metadata.json/datasets/{datasetName}/tables/{tableName}?api-version=2015-09-01
internal ConnectorFunction MetadataService => _metadataService ??= GetMetadataService();

public override async Task<RecordType> GetSchemaAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

Expand All @@ -69,44 +62,68 @@ internal override async Task<RecordType> InitInternalAsync(PowerFxConfig config,
};

// Swagger based tabular connectors
_tabularFunctions = config.AddActionConnector(connectorSettings, _openApiDocument, _connectorLogger, _globalValues);
_tabularFunctions = _config.AddActionConnector(connectorSettings, _openApiDocument, _connectorLogger, _globalValues);

return await GetSchemaAsync(new SimpleRuntimeConnectorContext(_httpClient), cancellationToken).ConfigureAwait(false);
BaseRuntimeConnectorContext runtimeConnectorContext = new RawRuntimeConnectorContext(_httpClient);
FormulaValue schema = await MetadataService.InvokeAsync(Array.Empty<FormulaValue>(), runtimeConnectorContext, cancellationToken).ConfigureAwait(false);

return schema is StringValue str ? GetSchema(str.Value) : null;
}

internal override async Task<RecordType> GetSchemaAsync(BaseRuntimeConnectorContext runtimeConnectorContext, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
// TABLE DATA SERVICE - CREATE
// POST: /datasets/{datasetName}/tables/{tableName}/items?api-version=2015-09-01

FormulaValue schema = await MetadataService.InvokeAsync(Array.Empty<FormulaValue>(), runtimeConnectorContext.WithRawResults(), cancellationToken).ConfigureAwait(false);
ConnectorType connectorType = ConnectorFunction.GetConnectorType("Schema/Items", schema as StringValue, ConnectorCompatibility.SwaggerCompatibility);
// TABLE DATA SERVICE - READ
// GET AN ITEM - GET: /datasets/{datasetName}/tables/{tableName}/items/{id}?api-version=2015-09-01

return connectorType?.FormulaType as RecordType;
}
// LIST ITEMS - GET: /datasets/{datasetName}/tables/{tableName}/items?$filter=’CreatedBy’ eq ‘john.doe’&$top=50&$orderby=’Priority’ asc, ’CreationDate’ desc
internal ConnectorFunction GetItems => _getItems ??= GetItemsFunction();

public override async Task<IEnumerable<DValue<RecordValue>>> GetItemsAsync(BaseRuntimeConnectorContext runtimeConnectorContext, CancellationToken cancellationToken)
public override async Task<ICollection<DValue<RecordValue>>> GetItemsAsync(IServiceProvider serviceProvider, CancellationToken cancellationToken)
{
BaseRuntimeConnectorContext runtimeConnectorContext = serviceProvider.GetService<BaseRuntimeConnectorContext>() ?? throw new InvalidOperationException("Cannot determine runtime connector context.");

// Notice that there is no paging here, just get 1 page
// Use WithRawResults to ignore _getItems return type which is in the form of ![value:*[dynamicProperties:![]]] (ie. without the actual type)
FormulaValue rowsRaw = await GetItems.InvokeAsync(Array.Empty<FormulaValue>(), runtimeConnectorContext.WithRawResults(), CancellationToken.None).ConfigureAwait(false);

if (rowsRaw is ErrorValue ev)
{
return Enumerable.Empty<DValue<RecordValue>>();
}
return rowsRaw is StringValue sv ? GetResult(sv.Value) : Array.Empty<DValue<RecordValue>>();
}

StringValue rowsStr = rowsRaw as StringValue;
// TABLE DATA SERVICE - UPDATE
// PATCH: /datasets/{datasetName}/tables/{tableName}/items/{id}

// $$$ Is this always this type?
RecordValue rv = FormulaValueJSON.FromJson(rowsStr.Value, RecordType.Empty().Add("value", TableType)) as RecordValue;
TableValue tv = rv.Fields.FirstOrDefault(field => field.Name == "value").Value as TableValue;
// TABLE DATA SERVICE - DELETE
// DELETE: /datasets/{datasetName}/tables/{tableName}/items/{id}

// The call we make contains more fields and we want to remove them here ('@odata.etag')
return new InMemoryTableValue(IRContext.NotInSource(TableType), tv.Rows).Rows;
private static string GetDataSetName(IReadOnlyDictionary<string, FormulaValue> globalValues) =>
TryGetString("dataset", globalValues, out string dataset)
? dataset
: TryGetString("server", globalValues, out string server) && TryGetString("database", globalValues, out string database)
? $"{server},{database}"
: throw new InvalidOperationException("Cannot determine dataset name.");

private static string GetTableName(IReadOnlyDictionary<string, FormulaValue> globalValues) =>
TryGetString("table", globalValues, out string table)
? table
: throw new InvalidOperationException("Cannot determine table name.");

private static bool TryGetString(string name, IReadOnlyDictionary<string, FormulaValue> globalValues, out string str)
{
if (globalValues.TryGetValue(name, out FormulaValue fv) && fv is StringValue sv)
{
str = sv.Value;
return true;
}

str = null;
return false;
}

private const string MetadataServiceRegex = @"/\$metadata\.json/datasets/{[^{}]+}(,{[^{}]+})?/tables/{[^{}]+}$";

private const string GetItemsRegex = @"/datasets/{[^{}]+}(,{[^{}]+})?/tables/{[^{}]+}/items$";

private const string NameVersionRegex = @"V(?<n>[0-9]{0,2})$";

private ConnectorFunction GetMetadataService()
Expand Down Expand Up @@ -144,15 +161,17 @@ private ConnectorFunction GetItemsFunction()
return functions[0];
}

private class SimpleRuntimeConnectorContext : BaseRuntimeConnectorContext
private class RawRuntimeConnectorContext : BaseRuntimeConnectorContext
{
private readonly HttpMessageInvoker _invoker;

internal SimpleRuntimeConnectorContext(HttpMessageInvoker invoker)
internal RawRuntimeConnectorContext(HttpMessageInvoker invoker)
{
_invoker = invoker;
}

internal override bool ReturnRawResults => true;

public override TimeZoneInfo TimeZoneInfo => TimeZoneInfo.Utc;

public override HttpMessageInvoker GetInvoker(string @namespace) => _invoker;
Expand Down
Loading

0 comments on commit 3b3a954

Please sign in to comment.