From 3b3a954877c2216dd66e434e6ca291057c3f9285 Mon Sep 17 00:00:00 2001 From: Luc Genetier Date: Tue, 9 Apr 2024 16:35:39 +0200 Subject: [PATCH] Updates --- .../Environment/CdpTabularService.cs | 87 +++++++++++++ .../Environment/PowerFxConfigExtensions.cs | 31 ++--- .../Environment/SwaggerTabularService.cs | 117 ++++++++++-------- .../Environment/TabularService.cs | 37 ++---- .../Public/ConnectorTableValue.cs | 20 +-- .../ConnectorTableValueWithServiceProvider.cs | 8 +- .../FileTabularConnector.cs | 89 +++++++++++++ .../PowerPlatformConnectorTests.cs | 109 +++++++++++++--- .../PublicSurfaceTests.cs | 1 + 9 files changed, 369 insertions(+), 130 deletions(-) create mode 100644 src/libraries/Microsoft.PowerFx.Connectors/Environment/CdpTabularService.cs create mode 100644 src/tests/Microsoft.PowerFx.Connectors.Tests/FileTabularConnector.cs diff --git a/src/libraries/Microsoft.PowerFx.Connectors/Environment/CdpTabularService.cs b/src/libraries/Microsoft.PowerFx.Connectors/Environment/CdpTabularService.cs new file mode 100644 index 0000000000..e0a1ebc64a --- /dev/null +++ b/src/libraries/Microsoft.PowerFx.Connectors/Environment/CdpTabularService.cs @@ -0,0 +1,87 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net.Http; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.PowerFx.Core.IR; +using Microsoft.PowerFx.Types; + +namespace Microsoft.PowerFx.Connectors +{ + // Implements CDP protocol for Tabular connectors + public class CdpTabularService : TabularService + { + public string DataSetName { get; } + + public string TableName { get; } + + protected HttpClient _httpClient; + + protected string _uriPrefix; + + public CdpTabularService(string dataset, string table, HttpClient httpClient, string uriPrefix = null) + { + DataSetName = dataset ?? throw new ArgumentNullException(nameof(dataset)); + TableName = table ?? throw new ArgumentNullException(nameof(table)); + _httpClient = httpClient ?? throw new ArgumentNullException(nameof(httpClient)); + _uriPrefix = uriPrefix; + } + + //// TABLE METADATA SERVICE + // GET: /$metadata.json/datasets/{datasetName}/tables/{tableName}?api-version=2015-09-01 + public override async Task GetSchemaAsync(CancellationToken cancellationToken) + { + using HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, _uriPrefix + $"/$metadata.json/datasets/{DataSetName}/tables/{TableName}?api-version=2015-09-01"); + using HttpResponseMessage response = await _httpClient.SendAsync(request, cancellationToken).ConfigureAwait(false); + + string text = response?.Content == null ? string.Empty : await response.Content.ReadAsStringAsync().ConfigureAwait(false); + int statusCode = (int)response.StatusCode; + + return statusCode < 300 ? GetSchema(text) : null; + } + + internal static RecordType GetSchema(string text) + { + ConnectorType connectorType = ConnectorFunction.GetConnectorType("Schema/Items", FormulaValue.New(text), ConnectorCompatibility.SwaggerCompatibility); + return connectorType?.FormulaType as RecordType; + } + + // TABLE DATA SERVICE - CREATE + // POST: /datasets/{datasetName}/tables/{tableName}/items?api-version=2015-09-01 + + // TABLE DATA SERVICE - READ + // GET AN ITEM - GET: /datasets/{datasetName}/tables/{tableName}/items/{id}?api-version=2015-09-01 + + // LIST ITEMS - GET: /datasets/{datasetName}/tables/{tableName}/items?$filter=’CreatedBy’ eq ‘john.doe’&$top=50&$orderby=’Priority’ asc, ’CreationDate’ desc + public override async Task>> GetItemsAsync(IServiceProvider serviceProvider, CancellationToken cancellationToken) + { + using HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, _uriPrefix + $"/datasets/{DataSetName}/tables/{TableName}/items?api-version=2015-09-01"); + using HttpResponseMessage response = await _httpClient.SendAsync(request, cancellationToken).ConfigureAwait(false); + + string text = response?.Content == null ? string.Empty : await response.Content.ReadAsStringAsync().ConfigureAwait(false); + int statusCode = (int)response.StatusCode; + + return statusCode < 300 ? GetResult(text) : Array.Empty>(); + } + + protected ICollection> GetResult(string text) + { + // $$$ Is this always this type? + RecordValue rv = FormulaValueJSON.FromJson(text, RecordType.Empty().Add("value", TableType)) as RecordValue; + TableValue tv = rv.Fields.FirstOrDefault(field => field.Name == "value").Value as TableValue; + + // The call we make contains more fields and we want to remove them here ('@odata.etag') + return new InMemoryTableValue(IRContext.NotInSource(TableType), tv.Rows).Rows.ToArray(); + } + + // TABLE DATA SERVICE - UPDATE + // PATCH: /datasets/{datasetName}/tables/{tableName}/items/{id} + + // TABLE DATA SERVICE - DELETE + // DELETE: /datasets/{datasetName}/tables/{tableName}/items/{id} + } +} diff --git a/src/libraries/Microsoft.PowerFx.Connectors/Environment/PowerFxConfigExtensions.cs b/src/libraries/Microsoft.PowerFx.Connectors/Environment/PowerFxConfigExtensions.cs index 072e9cb1d3..09e8244e2d 100644 --- a/src/libraries/Microsoft.PowerFx.Connectors/Environment/PowerFxConfigExtensions.cs +++ b/src/libraries/Microsoft.PowerFx.Connectors/Environment/PowerFxConfigExtensions.cs @@ -19,19 +19,19 @@ namespace Microsoft.PowerFx public static class ConfigExtensions { /// - /// Add functions for each operation in the . + /// Add functions for each operation in the . /// Functions names will be 'functionNamespace.operationName'. - /// Functions are invoked via rest via the httpClient. The client must handle authentication. + /// Functions are invoked via rest via the httpClient. The client must handle authentication. /// - /// + /// /// Config to add the functions to. - /// Connector settings containing Namespace and MaxRows to be returned. + /// Connector settings containing Namespace and MaxRows to be returned. /// An API document. This can represent multiple formats, including Swagger 2.0 and OpenAPI 3.0. /// Logger. /// Global Values. /// List of connector functions. public static IReadOnlyList AddActionConnector(this PowerFxConfig config, ConnectorSettings connectorSettings, OpenApiDocument openApiDocument, ConnectorLogger configurationLogger = null, IReadOnlyDictionary globalValues = null) - { + { try { configurationLogger?.LogInformation($"Entering in ConfigExtensions.{nameof(AddActionConnector)}, with {nameof(ConnectorSettings)} {LogConnectorSettings(connectorSettings)}"); @@ -65,9 +65,9 @@ internal static IReadOnlyList AddActionConnectorInternal(this } /// - /// Add functions for each operation in the . + /// Add functions for each operation in the . /// Functions names will be 'functionNamespace.operationName'. - /// Functions are invoked via rest via the httpClient. The client must handle authentication. + /// Functions are invoked via rest via the httpClient. The client must handle authentication. /// /// Config to add the functions to. /// Namespace name. @@ -76,7 +76,7 @@ internal static IReadOnlyList AddActionConnectorInternal(this /// Global Values. /// List of connector functions. public static IReadOnlyList AddActionConnector(this PowerFxConfig config, string @namespace, OpenApiDocument openApiDocument, ConnectorLogger configurationLogger = null, IReadOnlyDictionary globalValues = null) - { + { try { configurationLogger?.LogInformation($"Entering in ConfigExtensions.{nameof(AddActionConnector)}, with {nameof(ConnectorSettings)} Namespace {@namespace ?? Null(nameof(@namespace))}"); @@ -87,25 +87,14 @@ public static IReadOnlyList AddActionConnector(this PowerFxCo return null; } - configurationLogger?.LogInformation($"Exiting ConfigExtensions.{nameof(AddActionConnector)}, returning {connectorFunctions.Count()} functions"); + configurationLogger?.LogInformation($"Exiting ConfigExtensions.{nameof(AddActionConnector)}, returning {connectorFunctions.Count()} functions"); return connectorFunctions; } catch (Exception ex) { configurationLogger?.LogException(ex, $"Exception in ConfigExtensions.{nameof(AddActionConnector)}, Namespace {@namespace ?? Null(nameof(@namespace))}, {LogException(ex)}"); throw; - } + } } - - public static async Task AddTabularConnector(this PowerFxConfig config, string tableName, TabularService tabularService, BaseRuntimeConnectorContext runtimeConnectorContext, CancellationToken cancellationToken, ConnectorLogger configurationLogger = null) - { - cancellationToken.ThrowIfCancellationRequested(); - - RecordType recordType = await tabularService.InitAsync(config, tableName, runtimeConnectorContext, cancellationToken).ConfigureAwait(false); - - return recordType == null - ? throw new InvalidOperationException("Cannot determine table schema") - : new ConnectorTableValue(tabularService, recordType); - } } } diff --git a/src/libraries/Microsoft.PowerFx.Connectors/Environment/SwaggerTabularService.cs b/src/libraries/Microsoft.PowerFx.Connectors/Environment/SwaggerTabularService.cs index 2b7608096d..79586d5a5e 100644 --- a/src/libraries/Microsoft.PowerFx.Connectors/Environment/SwaggerTabularService.cs +++ b/src/libraries/Microsoft.PowerFx.Connectors/Environment/SwaggerTabularService.cs @@ -10,55 +10,48 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.OpenApi.Models; -using Microsoft.PowerFx.Core.IR; using Microsoft.PowerFx.Types; namespace Microsoft.PowerFx.Connectors { - public class SwaggerTabularService : TabularService + // Swagger based CDP tabular service + public sealed class SwaggerTabularService : CdpTabularService { - //public string Namespace { get; private set; } + public string Namespace => $"_tbl_{ConnectionId}"; - //public string TableName { get; private set; } + public string ConnectionId => _connectionId; - //// TABLE METADATA SERVICE - // GET: /$metadata.json/datasets/{datasetName}/tables/{tableName}?api-version=2015-09-01 - internal ConnectorFunction MetadataService => _metadataService ??= GetMetadataService(); - - // TABLE DATA SERVICE - CREATE - // POST: /datasets/{datasetName}/tables/{tableName}/items?api-version=2015-09-01 - - // TABLE DATA SERVICE - READ - // GET AN ITEM - GET: /datasets/{datasetName}/tables/{tableName}/items/{id}?api-version=2015-09-01 - - // LIST ITEMS - GET: /datasets/{datasetName}/tables/{tableName}/items?$filter=’CreatedBy’ eq ‘john.doe’&$top=50&$orderby=’Priority’ asc, ’CreationDate’ desc - internal ConnectorFunction GetItems => _getItems ??= GetItemsFunction(); - - // TABLE DATA SERVICE - UPDATE - // PATCH: /datasets/{datasetName}/tables/{tableName}/items/{id} - - // TABLE DATA SERVICE - DELETE - // DELETE: /datasets/{datasetName}/tables/{tableName}/items/{id} + private readonly string _connectionId; private IReadOnlyList _tabularFunctions; + private ConnectorFunction _metadataService; + private ConnectorFunction _getItems; private readonly OpenApiDocument _openApiDocument; + private readonly IReadOnlyDictionary _globalValues; - private readonly HttpClient _httpClient; + private readonly ConnectorLogger _connectorLogger; - public SwaggerTabularService(OpenApiDocument openApiDocument, IReadOnlyDictionary globalValues, HttpClient client, ConnectorLogger configurationLogger = null) - : base(globalValues) + private readonly PowerFxConfig _config; + + public SwaggerTabularService(PowerFxConfig config, OpenApiDocument openApiDocument, IReadOnlyDictionary globalValues, HttpClient httpClient, ConnectorLogger configurationLogger = null) + : base(GetDataSetName(globalValues), GetTableName(globalValues), httpClient) { + _config = config; _openApiDocument = openApiDocument; _globalValues = globalValues; - _httpClient = client; _connectorLogger = configurationLogger; + _connectionId = TryGetString("connectionId", globalValues, out string connectorId) ? connectorId : throw new InvalidOperationException("Cannot determine connectionId."); } - internal override async Task InitInternalAsync(PowerFxConfig config, string tableName, BaseRuntimeConnectorContext runtimeConnectorContext, CancellationToken cancellationToken) + //// TABLE METADATA SERVICE + // GET: /$metadata.json/datasets/{datasetName}/tables/{tableName}?api-version=2015-09-01 + internal ConnectorFunction MetadataService => _metadataService ??= GetMetadataService(); + + public override async Task GetSchemaAsync(CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); @@ -69,44 +62,68 @@ internal override async Task InitInternalAsync(PowerFxConfig config, }; // Swagger based tabular connectors - _tabularFunctions = config.AddActionConnector(connectorSettings, _openApiDocument, _connectorLogger, _globalValues); + _tabularFunctions = _config.AddActionConnector(connectorSettings, _openApiDocument, _connectorLogger, _globalValues); - return await GetSchemaAsync(new SimpleRuntimeConnectorContext(_httpClient), cancellationToken).ConfigureAwait(false); + BaseRuntimeConnectorContext runtimeConnectorContext = new RawRuntimeConnectorContext(_httpClient); + FormulaValue schema = await MetadataService.InvokeAsync(Array.Empty(), runtimeConnectorContext, cancellationToken).ConfigureAwait(false); + + return schema is StringValue str ? GetSchema(str.Value) : null; } - internal override async Task GetSchemaAsync(BaseRuntimeConnectorContext runtimeConnectorContext, CancellationToken cancellationToken) - { - cancellationToken.ThrowIfCancellationRequested(); + // TABLE DATA SERVICE - CREATE + // POST: /datasets/{datasetName}/tables/{tableName}/items?api-version=2015-09-01 - FormulaValue schema = await MetadataService.InvokeAsync(Array.Empty(), runtimeConnectorContext.WithRawResults(), cancellationToken).ConfigureAwait(false); - ConnectorType connectorType = ConnectorFunction.GetConnectorType("Schema/Items", schema as StringValue, ConnectorCompatibility.SwaggerCompatibility); + // TABLE DATA SERVICE - READ + // GET AN ITEM - GET: /datasets/{datasetName}/tables/{tableName}/items/{id}?api-version=2015-09-01 - return connectorType?.FormulaType as RecordType; - } + // LIST ITEMS - GET: /datasets/{datasetName}/tables/{tableName}/items?$filter=’CreatedBy’ eq ‘john.doe’&$top=50&$orderby=’Priority’ asc, ’CreationDate’ desc + internal ConnectorFunction GetItems => _getItems ??= GetItemsFunction(); - public override async Task>> GetItemsAsync(BaseRuntimeConnectorContext runtimeConnectorContext, CancellationToken cancellationToken) + public override async Task>> GetItemsAsync(IServiceProvider serviceProvider, CancellationToken cancellationToken) { + BaseRuntimeConnectorContext runtimeConnectorContext = serviceProvider.GetService() ?? throw new InvalidOperationException("Cannot determine runtime connector context."); + // Notice that there is no paging here, just get 1 page // Use WithRawResults to ignore _getItems return type which is in the form of ![value:*[dynamicProperties:![]]] (ie. without the actual type) FormulaValue rowsRaw = await GetItems.InvokeAsync(Array.Empty(), runtimeConnectorContext.WithRawResults(), CancellationToken.None).ConfigureAwait(false); - if (rowsRaw is ErrorValue ev) - { - return Enumerable.Empty>(); - } + return rowsRaw is StringValue sv ? GetResult(sv.Value) : Array.Empty>(); + } - StringValue rowsStr = rowsRaw as StringValue; + // TABLE DATA SERVICE - UPDATE + // PATCH: /datasets/{datasetName}/tables/{tableName}/items/{id} - // $$$ Is this always this type? - RecordValue rv = FormulaValueJSON.FromJson(rowsStr.Value, RecordType.Empty().Add("value", TableType)) as RecordValue; - TableValue tv = rv.Fields.FirstOrDefault(field => field.Name == "value").Value as TableValue; + // TABLE DATA SERVICE - DELETE + // DELETE: /datasets/{datasetName}/tables/{tableName}/items/{id} - // The call we make contains more fields and we want to remove them here ('@odata.etag') - return new InMemoryTableValue(IRContext.NotInSource(TableType), tv.Rows).Rows; + private static string GetDataSetName(IReadOnlyDictionary globalValues) => + TryGetString("dataset", globalValues, out string dataset) + ? dataset + : TryGetString("server", globalValues, out string server) && TryGetString("database", globalValues, out string database) + ? $"{server},{database}" + : throw new InvalidOperationException("Cannot determine dataset name."); + + private static string GetTableName(IReadOnlyDictionary globalValues) => + TryGetString("table", globalValues, out string table) + ? table + : throw new InvalidOperationException("Cannot determine table name."); + + private static bool TryGetString(string name, IReadOnlyDictionary globalValues, out string str) + { + if (globalValues.TryGetValue(name, out FormulaValue fv) && fv is StringValue sv) + { + str = sv.Value; + return true; + } + + str = null; + return false; } private const string MetadataServiceRegex = @"/\$metadata\.json/datasets/{[^{}]+}(,{[^{}]+})?/tables/{[^{}]+}$"; + private const string GetItemsRegex = @"/datasets/{[^{}]+}(,{[^{}]+})?/tables/{[^{}]+}/items$"; + private const string NameVersionRegex = @"V(?[0-9]{0,2})$"; private ConnectorFunction GetMetadataService() @@ -144,15 +161,17 @@ private ConnectorFunction GetItemsFunction() return functions[0]; } - private class SimpleRuntimeConnectorContext : BaseRuntimeConnectorContext + private class RawRuntimeConnectorContext : BaseRuntimeConnectorContext { private readonly HttpMessageInvoker _invoker; - internal SimpleRuntimeConnectorContext(HttpMessageInvoker invoker) + internal RawRuntimeConnectorContext(HttpMessageInvoker invoker) { _invoker = invoker; } + internal override bool ReturnRawResults => true; + public override TimeZoneInfo TimeZoneInfo => TimeZoneInfo.Utc; public override HttpMessageInvoker GetInvoker(string @namespace) => _invoker; diff --git a/src/libraries/Microsoft.PowerFx.Connectors/Environment/TabularService.cs b/src/libraries/Microsoft.PowerFx.Connectors/Environment/TabularService.cs index cb99776dd5..d8bbc1597e 100644 --- a/src/libraries/Microsoft.PowerFx.Connectors/Environment/TabularService.cs +++ b/src/libraries/Microsoft.PowerFx.Connectors/Environment/TabularService.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. +using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; @@ -8,50 +9,32 @@ namespace Microsoft.PowerFx.Connectors { - // Implements CDP protocol for Tabular connectors public abstract class TabularService { - public string TableName { get; } - public RecordType RecordType { get; private set; } = null; public TableType TableType => _tableType ??= RecordType.ToTable(); - public virtual string Namespace => $"_tbl_{TableName}"; - public bool IsInitialized => RecordType != null; private TableType _tableType; - protected TabularService(IReadOnlyDictionary globalValues) + public virtual ConnectorTableValue GetTableValue() { - if (!globalValues.TryGetValue("table", out FormulaValue tableName)) - { - throw new PowerFxConnectorException("Missing 'table' parameter in the global values"); - } - - if (tableName is not StringValue sv) - { - throw new PowerFxConnectorException("'table' global value is not of type StringValue"); - } - - TableName = sv.Value; + return RecordType == null + ? throw new InvalidOperationException("Tabular service is not initialized.") + : new ConnectorTableValue(this, RecordType); } - public async Task InitAsync(PowerFxConfig config, string tableName, BaseRuntimeConnectorContext runtimeConnectorContext, CancellationToken cancellationToken) + public async Task InitAsync(CancellationToken cancellationToken) { - cancellationToken.ThrowIfCancellationRequested(); - - RecordType = await InitInternalAsync(config, tableName, runtimeConnectorContext, cancellationToken).ConfigureAwait(false); - - return RecordType; + cancellationToken.ThrowIfCancellationRequested(); + RecordType = await GetSchemaAsync(cancellationToken).ConfigureAwait(false); } - internal abstract Task InitInternalAsync(PowerFxConfig config, string tableName, BaseRuntimeConnectorContext runtimeConnectorContext, CancellationToken cancellationToken); - // TABLE METADATA SERVICE // GET: /$metadata.json/datasets/{datasetName}/tables/{tableName}?api-version=2015-09-01 - internal abstract Task GetSchemaAsync(BaseRuntimeConnectorContext runtimeConnectorContext, CancellationToken cancellationToken); + public abstract Task GetSchemaAsync(CancellationToken cancellationToken); // TABLE DATA SERVICE - CREATE // POST: /datasets/{datasetName}/tables/{tableName}/items?api-version=2015-09-01 @@ -60,7 +43,7 @@ public async Task InitAsync(PowerFxConfig config, string tableName, // GET AN ITEM - GET: /datasets/{datasetName}/tables/{tableName}/items/{id}?api-version=2015-09-01 // LIST ITEMS - GET: /datasets/{datasetName}/tables/{tableName}/items?$filter=’CreatedBy’ eq ‘john.doe’&$top=50&$orderby=’Priority’ asc, ’CreationDate’ desc - public abstract Task>> GetItemsAsync(BaseRuntimeConnectorContext runtimeConnectorContext, CancellationToken cancellationToken); + public abstract Task>> GetItemsAsync(IServiceProvider serviceProvider, CancellationToken cancellationToken); // TABLE DATA SERVICE - UPDATE // PATCH: /datasets/{datasetName}/tables/{tableName}/items/{id} diff --git a/src/libraries/Microsoft.PowerFx.Connectors/Public/ConnectorTableValue.cs b/src/libraries/Microsoft.PowerFx.Connectors/Public/ConnectorTableValue.cs index 1889c979c6..1ddc0ffe44 100644 --- a/src/libraries/Microsoft.PowerFx.Connectors/Public/ConnectorTableValue.cs +++ b/src/libraries/Microsoft.PowerFx.Connectors/Public/ConnectorTableValue.cs @@ -3,7 +3,6 @@ using System; using System.Collections.Generic; -using System.Threading.Tasks; using Microsoft.PowerFx.Core.Entities; using Microsoft.PowerFx.Core.IR; using Microsoft.PowerFx.Types; @@ -14,18 +13,14 @@ namespace Microsoft.PowerFx.Connectors // Doesn't contain any ServiceProvider which is runtime only public class ConnectorTableValue : TableValue, IRefreshable { - public string Name => _tabularService.TableName; - - public string Namespace => _tabularService.Namespace; - public new TableType Type => _tabularService.TableType; - protected internal readonly TabularService _tabularService; - + protected internal readonly TabularService _tabularService; + public ConnectorTableValue(TabularService tabularService, RecordType recordType) : base(IRContext.NotInSource(new ConnectorTableType(recordType))) - { - _tabularService = tabularService; + { + _tabularService = tabularService; } public ConnectorTableValue(RecordType recordType) @@ -45,12 +40,7 @@ internal ConnectorTableValue(IRContext irContext) { } - public override IEnumerable> Rows => GetRowsInternal().ConfigureAwait(false).GetAwaiter().GetResult(); - - protected virtual Task>> GetRowsInternal() - { - throw new Exception("No HttpClient context"); - } + public override IEnumerable> Rows => throw new InvalidOperationException("No service context. Make sure to call engine.EnableTabularConnectors()."); public virtual void Refresh() { diff --git a/src/libraries/Microsoft.PowerFx.Connectors/Public/ConnectorTableValueWithServiceProvider.cs b/src/libraries/Microsoft.PowerFx.Connectors/Public/ConnectorTableValueWithServiceProvider.cs index 4b42703b46..307a69e5b4 100644 --- a/src/libraries/Microsoft.PowerFx.Connectors/Public/ConnectorTableValueWithServiceProvider.cs +++ b/src/libraries/Microsoft.PowerFx.Connectors/Public/ConnectorTableValueWithServiceProvider.cs @@ -19,16 +19,18 @@ public ConnectorTableValueWithServiceProvider(ConnectorTableValue tableValue, IS ServiceProvider = serviceProvider; } - private IEnumerable> _cachedRows; + private ICollection> _cachedRows; - protected override async Task>> GetRowsInternal() + public override IEnumerable> Rows => GetRowsInternal().ConfigureAwait(false).GetAwaiter().GetResult(); + + private async Task>> GetRowsInternal() { if (_cachedRows != null) { return _cachedRows; } - _cachedRows = await _tabularService.GetItemsAsync(ServiceProvider.GetService(), CancellationToken.None).ConfigureAwait(false); + _cachedRows = await _tabularService.GetItemsAsync(ServiceProvider, CancellationToken.None).ConfigureAwait(false); return _cachedRows; } diff --git a/src/tests/Microsoft.PowerFx.Connectors.Tests/FileTabularConnector.cs b/src/tests/Microsoft.PowerFx.Connectors.Tests/FileTabularConnector.cs new file mode 100644 index 0000000000..e6720154fd --- /dev/null +++ b/src/tests/Microsoft.PowerFx.Connectors.Tests/FileTabularConnector.cs @@ -0,0 +1,89 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Net.Http; +using System.Text.RegularExpressions; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.PowerFx.Types; +using Xunit; +using Xunit.Abstractions; + +namespace Microsoft.PowerFx.Connectors.Tests +{ + public class FileTabularTests + { + private readonly ITestOutputHelper _output; + + public FileTabularTests(ITestOutputHelper output) + { + _output = output; + } + + [Fact] + public async Task FileTabularTest() + { + string curDir = Directory.GetCurrentDirectory(); + string fileName = Path.Combine(curDir, "TestFile.txt"); + File.WriteAllLines(fileName, new string[] { "a", "b", "c" }); + + FileTabularService tabularService = new FileTabularService(fileName); + Assert.False(tabularService.IsInitialized); + + await tabularService.InitAsync(CancellationToken.None).ConfigureAwait(false); + Assert.True(tabularService.IsInitialized); + + ConnectorTableValue fileTable = tabularService.GetTableValue(); + Assert.True(fileTable._tabularService.IsInitialized); + Assert.Equal("*[line:s]", fileTable.Type._type.ToString()); + + PowerFxConfig config = new PowerFxConfig(Features.PowerFxV1); + RecalcEngine engine = new RecalcEngine(config); + engine.EnableTabularConnectors(); + + SymbolValues symbolValues = new SymbolValues().Add("File", fileTable); + + // Expression with tabular connector + string expr = @"Last(FirstN(File, 2)).line"; + + CheckResult check = engine.Check(expr, options: new ParserOptions() { AllowsSideEffects = true }, symbolTable: symbolValues.SymbolTable); + Assert.True(check.IsSuccess); + + // Confirm that InjectServiceProviderFunction has properly been added + string ir = new Regex("RuntimeValues_[0-9]+").Replace(check.PrintIR(), "RuntimeValues_XXX"); + Assert.Equal("FieldAccess(Last:![line:s](FirstN:*[line:s](InjectServiceProviderFunction:![line:s](ResolvedObject('File:RuntimeValues_XXX')), Float:n(2:w))), line)", ir); + + // Use tabular connector. Internally we'll call ConnectorTableValueWithServiceProvider.GetRowsInternal to get the data + FormulaValue result = await check.GetEvaluator().EvalAsync(CancellationToken.None, symbolValues).ConfigureAwait(false); + StringValue str = Assert.IsType(result); + Assert.Equal("b", str.Value); + } + } + + internal class FileTabularService : TabularService + { + private readonly string _fileName; + + public FileTabularService(string fileName) + { + _fileName = File.Exists(fileName) ? fileName : throw new FileNotFoundException($"File not found: {_fileName}"); + } + + public override Task GetSchemaAsync(CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + return Task.FromResult(RecordType.Empty().Add("line", FormulaType.String)); + } + + public override async Task>> GetItemsAsync(IServiceProvider serviceProvider, CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + string[] lines = await File.ReadAllLinesAsync(_fileName, cancellationToken).ConfigureAwait(false); + return lines.Select(line => DValue.Of(FormulaValue.NewRecordFromFields(new NamedValue("line", FormulaValue.New(line))))).ToArray(); + } + } +} diff --git a/src/tests/Microsoft.PowerFx.Connectors.Tests/PowerPlatformConnectorTests.cs b/src/tests/Microsoft.PowerFx.Connectors.Tests/PowerPlatformConnectorTests.cs index aacf2cf67d..fd5935bb2c 100644 --- a/src/tests/Microsoft.PowerFx.Connectors.Tests/PowerPlatformConnectorTests.cs +++ b/src/tests/Microsoft.PowerFx.Connectors.Tests/PowerPlatformConnectorTests.cs @@ -1925,15 +1925,15 @@ public async Task SQL_Tabular() var engine = new RecalcEngine(config); using var httpClient = new HttpClient(testConnector); - - using var client = new PowerPlatformConnectorClient("tip2-001.azure-apihub.net", "Default-f0900a13-bc1c-4df3-87a2-b7a403db8dd0", "5a6d0eb8c4dd44178c46d85caa5593dc", () => "eyJ0eXAiOiJKVZ...", httpClient) + string jwt = "eyJ0eXAiOi..."; + using var client = new PowerPlatformConnectorClient("firstrelease-003.azure-apihub.net", "49970107-0806-e5a7-be5e-7c60e2750f01", "e74bd8913489439e886426eba8dec1c8", () => jwt, httpClient) { SessionId = "8e67ebdc-d402-455a-b33a-304820832383" }; IReadOnlyDictionary globals = new ReadOnlyDictionary(new Dictionary() { - { "connectionId", FormulaValue.New("5a6d0eb8c4dd44178c46d85caa5593dc") }, + { "connectionId", FormulaValue.New("e74bd8913489439e886426eba8dec1c8") }, { "server", FormulaValue.New("pfxdev-sql.database.windows.net") }, { "database", FormulaValue.New("connectortest") }, { "table", FormulaValue.New("Customers") } @@ -1942,18 +1942,23 @@ public async Task SQL_Tabular() // Use of tabular connector // There is a network call here to retrieve the table's schema testConnector.SetResponseFromFile(@"Responses\SQL Server Load Customers DB.json"); - TabularService tabularService = new SwaggerTabularService(apiDoc, globals, client, new ConsoleLogger(_output)); - BaseRuntimeConnectorContext runtimeContext = new TestConnectorRuntimeContext(tabularService.Namespace, client, console: _output); - ConnectorTableValue sqlTable = await config.AddTabularConnector("Customers", tabularService, runtimeContext, CancellationToken.None).ConfigureAwait(false); + SwaggerTabularService tabularService = new SwaggerTabularService(config, apiDoc, globals, client, new ConsoleLogger(_output)); + Assert.False(tabularService.IsInitialized); + Assert.Equal("Customers", tabularService.TableName); + Assert.Equal("_tbl_e74bd8913489439e886426eba8dec1c8", tabularService.Namespace); + + await tabularService.InitAsync(CancellationToken.None).ConfigureAwait(false); + Assert.True(tabularService.IsInitialized); - Assert.Equal("Customers", sqlTable.Name); - Assert.Equal("_tbl_Customers", sqlTable.Namespace); // internal connector namespace + ConnectorTableValue sqlTable = tabularService.GetTableValue(); + Assert.True(sqlTable._tabularService.IsInitialized); Assert.Equal("*[Address:s, Country:s, CustomerId:w, Name:s, Phone:s]", sqlTable.Type._type.ToString()); // Enable IR rewritter to auto-inject ServiceProvider where needed engine.EnableTabularConnectors(); - SymbolValues symbolValues = new SymbolValues().Add(sqlTable.Name, sqlTable); + SymbolValues symbolValues = new SymbolValues().Add("Customers", sqlTable); + BaseRuntimeConnectorContext runtimeContext = new TestConnectorRuntimeContext(tabularService.Namespace, client, console: _output); RuntimeConfig rc = new RuntimeConfig(symbolValues).AddRuntimeContext(runtimeContext); // Expression with tabular connector @@ -1979,6 +1984,74 @@ public async Task SQL_Tabular() Assert.Equal("+1-425-705-0000", phone.Value); } + [Fact] + public async Task SQL_CdpTabular() + { + using var testConnector = new LoggingTestServer(@"Swagger\SQL Server.json", _output); + var apiDoc = testConnector._apiDocument; + var config = new PowerFxConfig(Features.PowerFxV1); + var engine = new RecalcEngine(config); + + using var httpClient = new HttpClient(testConnector); + string jwt = "eyJ0eXAiOi..."; + using var client = new PowerPlatformConnectorClient("firstrelease-003.azure-apihub.net", "49970107-0806-e5a7-be5e-7c60e2750f01", "e74bd8913489439e886426eba8dec1c8", () => jwt, httpClient) + { + SessionId = "8e67ebdc-d402-455a-b33a-304820832383" + }; + + IReadOnlyDictionary globals = new ReadOnlyDictionary(new Dictionary() + { + { "connectionId", FormulaValue.New("e74bd8913489439e886426eba8dec1c8") }, + { "server", FormulaValue.New("pfxdev-sql.database.windows.net") }, + { "database", FormulaValue.New("connectortest") }, + { "table", FormulaValue.New("Customers") } + }); + + // Use of tabular connector + // There is a network call here to retrieve the table's schema + testConnector.SetResponseFromFile(@"Responses\SQL Server Load Customers DB.json"); + + // IMPORTANT NOTE: This is NOT what PowerApps is doing as they use /v2 version and do NOT use "default" dataset. + CdpTabularService tabularService = new CdpTabularService("default", "Customers", client, "/apim/sql/e74bd8913489439e886426eba8dec1c8"); + Assert.False(tabularService.IsInitialized); + Assert.Equal("Customers", tabularService.TableName); + + await tabularService.InitAsync(CancellationToken.None).ConfigureAwait(false); + Assert.True(tabularService.IsInitialized); + + ConnectorTableValue sqlTable = tabularService.GetTableValue(); + Assert.True(sqlTable._tabularService.IsInitialized); + Assert.Equal("*[Address:s, Country:s, CustomerId:w, Name:s, Phone:s]", sqlTable.Type._type.ToString()); + + // Enable IR rewritter to auto-inject ServiceProvider where needed + engine.EnableTabularConnectors(); + + SymbolValues symbolValues = new SymbolValues().Add("Customers", sqlTable); + RuntimeConfig rc = new RuntimeConfig(symbolValues); + + // Expression with tabular connector + string expr = @"First(Customers).Address"; + CheckResult check = engine.Check(expr, options: new ParserOptions() { AllowsSideEffects = true }, symbolTable: symbolValues.SymbolTable); + Assert.True(check.IsSuccess); + + // Confirm that InjectServiceProviderFunction has properly been added + string ir = new Regex("RuntimeValues_[0-9]+").Replace(check.PrintIR(), "RuntimeValues_XXX"); + Assert.Equal("FieldAccess(First:![Address:s, Country:s, CustomerId:w, Name:s, Phone:s](InjectServiceProviderFunction:![Address:s, Country:s, CustomerId:w, Name:s, Phone:s](ResolvedObject('Customers:RuntimeValues_XXX'))), Address)", ir); + + // Use tabular connector. Internally we'll call ConnectorTableValueWithServiceProvider.GetRowsInternal to get the data + testConnector.SetResponseFromFile(@"Responses\SQL Server Get First Customers.json"); + FormulaValue result = await check.GetEvaluator().EvalAsync(CancellationToken.None, rc).ConfigureAwait(false); + + StringValue address = Assert.IsType(result); + Assert.Equal("Juigné", address.Value); + + // Rows are not cached here as the cache is stored in ConnectorTableValueWithServiceProvider which is created by InjectServiceProviderFunction, itself added during Engine.Check + testConnector.SetResponseFromFile(@"Responses\SQL Server Get First Customers.json"); + result = await engine.EvalAsync("Last(Customers).Phone", CancellationToken.None, runtimeConfig: rc).ConfigureAwait(false); + StringValue phone = Assert.IsType(result); + Assert.Equal("+1-425-705-0000", phone.Value); + } + [Fact] public async Task SP_Tabular() { @@ -2005,12 +2078,17 @@ public async Task SP_Tabular() // There is a network call here to retrieve the table's schema testConnector.SetResponseFromFile(@"Responses\SP GetTable.json"); - TabularService tabularService = new SwaggerTabularService(apiDoc, globals, client, new ConsoleLogger(_output)); - BaseRuntimeConnectorContext runtimeContext = new TestConnectorRuntimeContext(tabularService.Namespace, client, console: _output); - ConnectorTableValue spTable = await config.AddTabularConnector("Documents", tabularService, runtimeContext, CancellationToken.None).ConfigureAwait(false); + SwaggerTabularService tabularService = new SwaggerTabularService(config, apiDoc, globals, client, new ConsoleLogger(_output)); + Assert.False(tabularService.IsInitialized); + Assert.Equal("Documents", tabularService.TableName); + Assert.Equal("_tbl_cc276c328f62456bb944f0736b3cb3b1", tabularService.Namespace); + + await tabularService.InitAsync(CancellationToken.None).ConfigureAwait(false); + Assert.True(tabularService.IsInitialized); + + ConnectorTableValue spTable = tabularService.GetTableValue(); + Assert.True(spTable._tabularService.IsInitialized); - Assert.Equal("Documents", spTable.Name); - Assert.Equal("_tbl_Documents", spTable.Namespace); // internal connector namespace Assert.Equal( "*[Author:![Claims:s, Department:s, DisplayName:s, Email:s, JobTitle:s, Picture:s], CheckoutUser:![Claims:s, Department:s, DisplayName:s, Email:s, JobTitle:s, Picture:s], " + "ComplianceAssetId:s, Created:d, Editor:![Claims:s, Department:s, DisplayName:s, Email:s, JobTitle:s, Picture:s], ID:w, Modified:d, OData__ColorTag:s, OData__DisplayName:s, " + @@ -2022,7 +2100,8 @@ public async Task SP_Tabular() // Enable IR rewritter to auto-inject ServiceProvider where needed engine.EnableTabularConnectors(); - SymbolValues symbolValues = new SymbolValues().Add(spTable.Name, spTable); + SymbolValues symbolValues = new SymbolValues().Add("Documents", spTable); + BaseRuntimeConnectorContext runtimeContext = new TestConnectorRuntimeContext(tabularService.Namespace, client, console: _output); RuntimeConfig rc = new RuntimeConfig(symbolValues).AddRuntimeContext(runtimeContext); // Expression with tabular connector diff --git a/src/tests/Microsoft.PowerFx.Connectors.Tests/PublicSurfaceTests.cs b/src/tests/Microsoft.PowerFx.Connectors.Tests/PublicSurfaceTests.cs index 974515c5e5..5ab1e9ea82 100644 --- a/src/tests/Microsoft.PowerFx.Connectors.Tests/PublicSurfaceTests.cs +++ b/src/tests/Microsoft.PowerFx.Connectors.Tests/PublicSurfaceTests.cs @@ -27,6 +27,7 @@ public void PublicSurfaceTest_Connectors() { "Microsoft.PowerFx.ConfigExtensions", "Microsoft.PowerFx.Connectors.BaseRuntimeConnectorContext", + "Microsoft.PowerFx.Connectors.CdpTabularService", "Microsoft.PowerFx.Connectors.ConnectorCompatibility", "Microsoft.PowerFx.Connectors.ConnectorEnhancedSuggestions", "Microsoft.PowerFx.Connectors.ConnectorFunction",