Skip to content

Commit

Permalink
Merge feature/core-entities to dev (#2574)
Browse files Browse the repository at this point in the history
* udpate readme.

* update durability provider class for new core-entities support. (#2570)

* update durability provider class for new core-entities support.

* add configuration setting for max entity concurrency to DurableTaskOptions

* minor fixes.

* update DurableClient to take advantage of native entity queries (#2571)

* update DurableClient to take advantage of native entity queries if available

* fix minor errors.

* address PR feedback

* implement passthrough middleware for entities (#2572)

* implement passthrough middleware for entities.

* propagate changes to protocol

* update/simplify protobuf format

* address PR feedback

* implement entity queries for grpc listener (#2573)

* implement entity queries for grpc listener

* propagate changes to protocol

* update/simplify protobuf format

* Various fixes (#2585)

* durability provider must implement and pass-through IEntityOrchestrationService since it wraps the orchestration service

* simple mistake

* fix misunderstanding of initializer syntax (produced null, not empty list)

* fix missing failure details

* fix missing compile-time switch for trigger value type

* fix missing optional arguments

* fix  missing override

* simplify how entities are excluded from instance queries (#2586)

* add an entity example to the DotNetIsolated smoke test project. (#2584)

* add an entity example to the DotNetIsolated smoke test project.

* remove superfluous argument.

* address PR feedback

* Entities: Add worker side entity trigger and logic (#2576)

* Add worker side entity trigger and logic

* update comments

* Address PR comments

* another small fix that got lost somewhere. (#2596)

* Update packages and version for entities preview (#2599)

* Switch to Microsoft.DurableTask.Grpc (#2605)

* Fix grpc core (#2616)

* pass entity parameters for task orchestration. (#2611)

* Core entities/various fixes and updates (#2619)

* assign the necessary AzureStorageOrchestrationServiceSettings

* propagate changes to query name and metadata parameters

* add missing override for TaskOrchestrationEntityFeature

* Update to entities preview 2 (#2620)

* Add callback handler for entity dispatching (#2624)

* Core entities/propagate changes (#2625)

* add configuration for EnableEntitySupport

* rename includeStateless to includeTransient

* Rev dependencies to entities-preview.2 (#2627)

* Call EnsureLegalAccess from EntityFeature in dotnet-isolated  (#2633)

* create a better error message in situations where client entity functions are called on a backend that does not support entities (#2630)

* Rev package versions, update release notes (#2638)

* Address smoke test build issue (#2647)

* fix translation of legacy query to new entity query support (#2648)

* fix translation of legacy query to new entity query support

* comment out CleanEntityStorage_Many

* try to enable CI on feature branch

* Revert "comment out CleanEntityStorage_Many"

This reverts commit aeaa4b8.

* update to preview.2 packages

---------

Co-authored-by: Varshitha Bachu <[email protected]>
Co-authored-by: Jacob Viau <[email protected]>
  • Loading branch information
3 people authored Oct 19, 2023
1 parent 4388987 commit bbd7ad6
Show file tree
Hide file tree
Showing 31 changed files with 1,189 additions and 59 deletions.
5 changes: 5 additions & 0 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
pr:
- main
- dev
- feature/*

jobs:

- job: FunctionsV1Tests
Expand Down
23 changes: 21 additions & 2 deletions release_notes.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,31 @@
# Release Notes

## Microsoft.Azure.Functions.Worker.Extensions.DurableTask v1.1.0-preview.1

### New Features

- Support entities for .NET isolated

### Bug Fixes

- Fix support for distributed tracing v2 in dotnet-isolated and Java (https://github.com/Azure/azure-functions-durable-extension/pull/2634)
- Update Microsoft.DurableTask.\* dependencies to v1.0.5
### Breaking Changes

### Dependency Updates

`Microsoft.DurableTask.*` to `1.1.0-preview.1`

## Microsoft.Azure.WebJobs.Extensions.DurableTask v2.12.0-preview.1

### New Features

- Updates to take advantage of new core-entity support

### Bug Fixes

### Breaking Changes

### Dependency Updates

`Microsoft.Azure.DurableTask.Core` to `2.16.0-preview.2`
`Microsoft.Azure.DurableTask.AzureStorage` to `1.16.0-preview.2`

Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using DurableTask.AzureStorage;
using DurableTask.AzureStorage.Tracking;
using DurableTask.Core;
using DurableTask.Core.Entities;
using Microsoft.Extensions.Logging;
using Microsoft.WindowsAzure.Storage;
using Newtonsoft.Json;
Expand All @@ -19,6 +20,7 @@
using Microsoft.Azure.WebJobs.Host.Scale;
#endif
using AzureStorage = DurableTask.AzureStorage;
using DTCore = DurableTask.Core;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
Expand Down Expand Up @@ -54,8 +56,6 @@ public AzureStorageDurabilityProvider(
this.logger = logger;
}

public override bool SupportsEntities => true;

public override bool CheckStatusBeforeRaiseEvent => true;

/// <summary>
Expand Down Expand Up @@ -98,6 +98,29 @@ public async override Task<IList<OrchestrationState>> GetAllOrchestrationStatesW

/// <inheritdoc/>
public async override Task<string> RetrieveSerializedEntityState(EntityId entityId, JsonSerializerSettings serializerSettings)
{
EntityBackendQueries entityBackendQueries = (this.serviceClient as IEntityOrchestrationService)?.EntityBackendQueries;

if (entityBackendQueries != null) // entity queries are natively supported
{
var entity = await entityBackendQueries.GetEntityAsync(new DTCore.Entities.EntityId(entityId.EntityName, entityId.EntityKey), cancellation: default);

if (entity == null)
{
return null;
}
else
{
return entity.Value.SerializedState;
}
}
else // fall back to old implementation
{
return await this.LegacyImplementationOfRetrieveSerializedEntityState(entityId, serializerSettings);
}
}

private async Task<string> LegacyImplementationOfRetrieveSerializedEntityState(EntityId entityId, JsonSerializerSettings serializerSettings)
{
var instanceId = EntityId.GetSchedulerIdFromEntityId(entityId);
IList<OrchestrationState> stateList = await this.serviceClient.GetOrchestrationStateAsync(instanceId, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ internal class AzureStorageDurabilityProviderFactory : IDurabilityProviderFactor
private readonly AzureStorageOptions azureStorageOptions;
private readonly INameResolver nameResolver;
private readonly ILoggerFactory loggerFactory;
private readonly bool useSeparateQueueForEntityWorkItems;
private readonly bool inConsumption; // If true, optimize defaults for consumption
private AzureStorageDurabilityProvider defaultStorageProvider;

Expand Down Expand Up @@ -56,6 +57,7 @@ public AzureStorageDurabilityProviderFactory(
// different defaults for key configuration values.
int maxConcurrentOrchestratorsDefault = this.inConsumption ? 5 : 10 * Environment.ProcessorCount;
int maxConcurrentActivitiesDefault = this.inConsumption ? 10 : 10 * Environment.ProcessorCount;
int maxConcurrentEntitiesDefault = this.inConsumption ? 10 : 10 * Environment.ProcessorCount;
int maxEntityOperationBatchSizeDefault = this.inConsumption ? 50 : 5000;

if (this.inConsumption)
Expand All @@ -71,9 +73,18 @@ public AzureStorageDurabilityProviderFactory(
}
}

WorkerRuntimeType runtimeType = platformInfo.GetWorkerRuntimeType();
if (runtimeType == WorkerRuntimeType.DotNetIsolated ||
runtimeType == WorkerRuntimeType.Java ||
runtimeType == WorkerRuntimeType.Custom)
{
this.useSeparateQueueForEntityWorkItems = true;
}

// The following defaults are only applied if the customer did not explicitely set them on `host.json`
this.options.MaxConcurrentOrchestratorFunctions = this.options.MaxConcurrentOrchestratorFunctions ?? maxConcurrentOrchestratorsDefault;
this.options.MaxConcurrentActivityFunctions = this.options.MaxConcurrentActivityFunctions ?? maxConcurrentActivitiesDefault;
this.options.MaxConcurrentEntityFunctions = this.options.MaxConcurrentEntityFunctions ?? maxConcurrentEntitiesDefault;
this.options.MaxEntityOperationBatchSize = this.options.MaxEntityOperationBatchSize ?? maxEntityOperationBatchSizeDefault;

// Override the configuration defaults with user-provided values in host.json, if any.
Expand Down Expand Up @@ -188,6 +199,7 @@ internal AzureStorageOrchestrationServiceSettings GetAzureStorageOrchestrationSe
WorkItemQueueVisibilityTimeout = this.azureStorageOptions.WorkItemQueueVisibilityTimeout,
MaxConcurrentTaskOrchestrationWorkItems = this.options.MaxConcurrentOrchestratorFunctions ?? throw new InvalidOperationException($"{nameof(this.options.MaxConcurrentOrchestratorFunctions)} needs a default value"),
MaxConcurrentTaskActivityWorkItems = this.options.MaxConcurrentActivityFunctions ?? throw new InvalidOperationException($"{nameof(this.options.MaxConcurrentOrchestratorFunctions)} needs a default value"),
MaxConcurrentTaskEntityWorkItems = this.options.MaxConcurrentEntityFunctions ?? throw new InvalidOperationException($"{nameof(this.options.MaxConcurrentEntityFunctions)} needs a default value"),
ExtendedSessionsEnabled = this.options.ExtendedSessionsEnabled,
ExtendedSessionIdleTimeout = extendedSessionTimeout,
MaxQueuePollingInterval = this.azureStorageOptions.MaxQueuePollingInterval,
Expand All @@ -202,6 +214,9 @@ internal AzureStorageOrchestrationServiceSettings GetAzureStorageOrchestrationSe
LoggerFactory = this.loggerFactory,
UseLegacyPartitionManagement = this.azureStorageOptions.UseLegacyPartitionManagement,
UseTablePartitionManagement = this.azureStorageOptions.UseTablePartitionManagement,
UseSeparateQueueForEntityWorkItems = this.useSeparateQueueForEntityWorkItems,
EntityMessageReorderWindowInMinutes = this.options.EntityMessageReorderWindowInMinutes,
MaxEntityOperationBatchSize = this.options.MaxEntityOperationBatchSize,
};

if (this.inConsumption)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Protocols;
using Microsoft.Azure.WebJobs.Host.Triggers;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
Expand Down Expand Up @@ -57,6 +58,8 @@ public EntityTriggerAttributeBindingProvider(

private class EntityTriggerBinding : ITriggerBinding
{
private static readonly IReadOnlyDictionary<string, object?> EmptyBindingData = new Dictionary<string, object?>(capacity: 0);

private readonly DurableTaskExtension config;
private readonly ParameterInfo parameterInfo;
private readonly FunctionName entityName;
Expand All @@ -75,7 +78,10 @@ public EntityTriggerBinding(
this.BindingDataContract = GetBindingDataContract(parameterInfo);
}

public Type TriggerValueType => typeof(IDurableEntityContext);
// Out-of-proc V2 uses a different trigger value type
public Type TriggerValueType => this.config.OutOfProcProtocol == OutOfProcOrchestrationProtocol.MiddlewarePassthrough ?
typeof(RemoteEntityContext) :
typeof(IDurableEntityContext);

public IReadOnlyDictionary<string, Type> BindingDataContract { get; }

Expand All @@ -95,31 +101,52 @@ private static IReadOnlyDictionary<string, Type> GetBindingDataContract(Paramete

public Task<ITriggerData> BindAsync(object value, ValueBindingContext context)
{
var entityContext = (DurableEntityContext)value;
Type destinationType = this.parameterInfo.ParameterType;

object? convertedValue = null;
if (destinationType == typeof(IDurableEntityContext))
if (value is DurableEntityContext entityContext)
{
convertedValue = entityContext;
Type destinationType = this.parameterInfo.ParameterType;

object? convertedValue = null;
if (destinationType == typeof(IDurableEntityContext))
{
convertedValue = entityContext;
#if !FUNCTIONS_V1
((IDurableEntityContext)value).FunctionBindingContext = context.FunctionContext;
((IDurableEntityContext)value).FunctionBindingContext = context.FunctionContext;
#endif
}
else if (destinationType == typeof(string))
{
convertedValue = EntityContextToString(entityContext);
}

var inputValueProvider = new ObjectValueProvider(
convertedValue ?? value,
this.parameterInfo.ParameterType);
var inputValueProvider = new ObjectValueProvider(
convertedValue ?? value,
this.parameterInfo.ParameterType);

var bindingData = new Dictionary<string, object?>(StringComparer.OrdinalIgnoreCase);
bindingData[this.parameterInfo.Name!] = convertedValue;
var bindingData = new Dictionary<string, object?>(StringComparer.OrdinalIgnoreCase);
bindingData[this.parameterInfo.Name!] = convertedValue;

var triggerData = new TriggerData(inputValueProvider, bindingData);
return Task.FromResult<ITriggerData>(triggerData);
var triggerData = new TriggerData(inputValueProvider, bindingData);
return Task.FromResult<ITriggerData>(triggerData);
}
#if FUNCTIONS_V3_OR_GREATER
else if (value is RemoteEntityContext remoteContext)
{
// Generate a byte array which is the serialized protobuf payload
// https://developers.google.com/protocol-buffers/docs/csharptutorial#parsing_and_serialization
var entityBatchRequest = remoteContext.Request.ToEntityBatchRequest();

// We convert the binary payload into a base64 string because that seems to be the most commonly supported
// format for Azure Functions language workers. Attempts to send unencoded byte[] payloads were unsuccessful.
string encodedRequest = ProtobufUtils.Base64Encode(entityBatchRequest);
var contextValueProvider = new ObjectValueProvider(encodedRequest, typeof(string));
var triggerData = new TriggerData(contextValueProvider, EmptyBindingData);
return Task.FromResult<ITriggerData>(triggerData);
}
#endif
else
{
throw new ArgumentException($"Don't know how to bind to {value?.GetType().Name ?? "null"}.", nameof(value));
}
}

public ParameterDescriptor ToParameterDescriptor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ public Task<ITriggerData> BindAsync(object? value, ValueBindingContext context)
InstanceId = remoteContext.InstanceId,
PastEvents = { remoteContext.PastEvents.Select(ProtobufUtils.ToHistoryEventProto) },
NewEvents = { remoteContext.NewEvents.Select(ProtobufUtils.ToHistoryEventProto) },
EntityParameters = remoteContext.EntityParameters.ToProtobuf(),
};

// We convert the binary payload into a base64 string because that seems to be the most commonly supported
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
using System.Threading;
using System.Threading.Tasks;
using DurableTask.Core;
using DurableTask.Core.Entities;
using DurableTask.Core.History;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.AspNetCore.Mvc.WebApiCompatShim;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using DTCore = DurableTask.Core;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
Expand Down Expand Up @@ -548,6 +550,28 @@ Task<EntityStateResponse<T>> IDurableEntityClient.ReadEntityStateAsync<T>(Entity
}

private async Task<EntityStateResponse<T>> ReadEntityStateAsync<T>(DurabilityProvider provider, EntityId entityId)
{
if (this.HasNativeEntityQuerySupport(this.durabilityProvider, out var entityBackendQueries))
{
EntityBackendQueries.EntityMetadata? metaData = await entityBackendQueries.GetEntityAsync(
new DTCore.Entities.EntityId(entityId.EntityName, entityId.EntityKey),
includeState: true,
includeStateless: false,
cancellation: default);

return new EntityStateResponse<T>()
{
EntityExists = metaData.HasValue,
EntityState = metaData.HasValue ? this.messageDataConverter.Deserialize<T>(metaData.Value.SerializedState) : default,
};
}
else
{
return await this.ReadEntityStateLegacyAsync<T>(provider, entityId);
}
}

private async Task<EntityStateResponse<T>> ReadEntityStateLegacyAsync<T>(DurabilityProvider provider, EntityId entityId)
{
string entityState = await provider.RetrieveSerializedEntityState(entityId, this.messageDataConverter.JsonSettings);

Expand Down Expand Up @@ -611,6 +635,40 @@ private static EntityQueryResult ConvertToEntityQueryResult(IEnumerable<DurableE

/// <inheritdoc />
async Task<EntityQueryResult> IDurableEntityClient.ListEntitiesAsync(EntityQuery query, CancellationToken cancellationToken)
{
if (this.HasNativeEntityQuerySupport(this.durabilityProvider, out var entityBackendQueries))
{
var result = await entityBackendQueries.QueryEntitiesAsync(
new EntityBackendQueries.EntityQuery()
{
InstanceIdStartsWith = query.EntityName != null ? $"@{query.EntityName.ToLowerInvariant()}@" : null,
IncludeTransient = query.IncludeDeleted,
IncludeState = query.FetchState,
LastModifiedFrom = query.LastOperationFrom == DateTime.MinValue ? (DateTime?)null : (DateTime?)query.LastOperationFrom,
LastModifiedTo = query.LastOperationTo,
PageSize = query.PageSize,
ContinuationToken = query.ContinuationToken,
},
cancellationToken);

return new EntityQueryResult()
{
Entities = result.Results.Select(ConvertEntityMetadata).ToList(),
ContinuationToken = result.ContinuationToken,
};

DurableEntityStatus ConvertEntityMetadata(EntityBackendQueries.EntityMetadata metadata)
{
return new DurableEntityStatus(metadata);
}
}
else
{
return await this.ListEntitiesLegacyAsync(query, cancellationToken);
}
}

private async Task<EntityQueryResult> ListEntitiesLegacyAsync(EntityQuery query, CancellationToken cancellationToken)
{
var condition = new OrchestrationStatusQueryCondition(query);
EntityQueryResult entityResult;
Expand All @@ -633,6 +691,30 @@ async Task<EntityQueryResult> IDurableEntityClient.ListEntitiesAsync(EntityQuery

/// <inheritdoc />
async Task<CleanEntityStorageResult> IDurableEntityClient.CleanEntityStorageAsync(bool removeEmptyEntities, bool releaseOrphanedLocks, CancellationToken cancellationToken)
{
if (this.HasNativeEntityQuerySupport(this.durabilityProvider, out var entityBackendQueries))
{
var result = await entityBackendQueries.CleanEntityStorageAsync(
new EntityBackendQueries.CleanEntityStorageRequest()
{
RemoveEmptyEntities = removeEmptyEntities,
ReleaseOrphanedLocks = releaseOrphanedLocks,
},
cancellationToken);

return new CleanEntityStorageResult()
{
NumberOfEmptyEntitiesRemoved = result.EmptyEntitiesRemoved,
NumberOfOrphanedLocksRemoved = result.OrphanedLocksReleased,
};
}
else
{
return await this.CleanEntityStorageLegacyAsync(removeEmptyEntities, releaseOrphanedLocks, cancellationToken);
}
}

private async Task<CleanEntityStorageResult> CleanEntityStorageLegacyAsync(bool removeEmptyEntities, bool releaseOrphanedLocks, CancellationToken cancellationToken)
{
DateTime now = DateTime.UtcNow;
CleanEntityStorageResult finalResult = default;
Expand Down Expand Up @@ -706,6 +788,12 @@ async Task CheckForOrphanedLockAndFixIt(DurableOrchestrationStatus status, strin
return finalResult;
}

private bool HasNativeEntityQuerySupport(DurabilityProvider provider, out EntityBackendQueries entityBackendQueries)
{
entityBackendQueries = (provider as IEntityOrchestrationService)?.EntityBackendQueries;
return entityBackendQueries != null;
}

private async Task<OrchestrationState> GetOrchestrationInstanceStateAsync(string instanceId)
{
return await GetOrchestrationInstanceStateAsync(this.client, instanceId);
Expand Down
Loading

0 comments on commit bbd7ad6

Please sign in to comment.