Unable to Resolve Service for FunctionsDurableClientProvider

150 views Asked by At

I am trying to create the following workflow with Durable Azure functions. I am running in NET 6.0 in an isolated worker model.

BlobTriggerFunction -> OrchestratorFunction -> ActivityFunction(s)

I'm running into the following error:

Exception: System.InvalidOperationException: Unable to resolve service for type 'Microsoft.Azure.Functions.Worker.Extensions.DurableTask.FunctionsDurableClientProvider' while attempting to activate 'Microsoft.Azure.Functions.Worker.Extensions.DurableTask.DurableTaskClientConverter'.

BlobTriggerFunction

public static class BlobTriggerFunction
{
    [Function(nameof(BlobTriggerFunction))]
    public static async Task Run(
        [BlobTrigger("someDirectory/{fileName}", Connection = "AzureWebJobsStorage")] BlobClient blobClient,
        string fileName,
        [DurableClient] DurableTaskClient orchestrationClient,
        ILogger logger)
    {
        string myBlob = await new StreamReader(await blobClient.OpenReadAsync()).ReadToEndAsync();

        logger.LogInformation($"Function created and received input. myblob:{myBlob}, fileName:{fileName}");

        string[] inputList = myBlob.Split(new string[] { Environment.NewLine }, StringSplitOptions.None);

        string instanceId = await orchestrationClient.ScheduleNewOrchestrationInstanceAsync(nameof(OrchestrationFunction), inputList, new CancellationToken()).ConfigureAwait(true);

        logger.LogInformation($"Blob trigger function completed. Durable Orchestration started: instanceId:{instanceId}");
    }
}

OrchestrationFunction

public class OrchestrationFunction : TaskOrchestrator<List<string>, bool>
{
    public async override Task<bool> RunAsync(
        TaskOrchestrationContext context,
        List<string> inputList)
    {
        string instanceId = context.InstanceId;

        ILogger logger = context.CreateReplaySafeLogger<OrchestrationFunction>(); // orchestrations do NOT have access to DI.

        logger.LogInformation($"Orchestrator function started. instanceId:{instanceId}");

        // Some business logic

        logger.LogInformation($"Orchestrator function completed.");

        return true;
    }
}

CS Proj Dependencies:

<Project Sdk="Microsoft.NET.Sdk">
  <PropertyGroup>
    <TargetFramework>net6.0</TargetFramework>
    <AzureFunctionsVersion>v4</AzureFunctionsVersion>
    <OutputType>Exe</OutputType>
    <ImplicitUsings>enable</ImplicitUsings>
    <Nullable>enable</Nullable>
  </PropertyGroup>
  <ItemGroup>
    <PackageReference Include="Microsoft.Azure.Functions.Worker" Version="1.21.0" />
    <PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Storage" Version="6.0.0" />
    <PackageReference Include="Microsoft.Azure.Functions.Worker.Sdk" Version="1.16.4" />
    <PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.DurableTask" Version="1.0.2" />
  </ItemGroup>
  <ItemGroup>
    <None Update="host.json">
      <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
    </None>
    <None Update="local.settings.json">
      <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
      <CopyToPublishDirectory>Never</CopyToPublishDirectory>
    </None>
  </ItemGroup>
  <ItemGroup>
    <Using Include="System.Threading.ExecutionContext" Alias="ExecutionContext" />
  </ItemGroup>
</Project>

host.json

{
  "version": "2.0",
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      }
    }
  },
  "extensions": {
    "durableTask": {
      "hubName": "MyTaskHub",
      "storageProvider": {
        "connectionStringName": "AzureWebJobsStorage",
        "controlQueueBatchSize": 32,
        "partitionCount": 4
      },
      "tracing": {
        "traceInputsAndOutputs": false,
        "traceReplayEvents": false
      }
    },
    "maxConcurrentActivityFunctions": 10,
    "maxConcurrentOrchestratorFunctions": 1
  }
}

Program.cs

public static class Program
{
    public static Task Main(string[] args)
    {
        var host = ConfigureHostBuilder();
        return host.RunAsync();
    }

    private static IHost ConfigureHostBuilder()
    {
        return Host.CreateDefaultBuilder()
            .ConfigureFunctionsWorkerDefaults()
            .UseConsoleLifetime()
            .Build();
    }
}
1

There are 1 answers

2
Pavan On

I have created blob trigger function and durable orchestration function with runtime stack dotnet in visual studio. I made some changes in your code like below:

blob trigger function:

using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using Microsoft.DurableTask.Client;
using Azure.Storage.Blobs;
using FunctionApp1300;

namespace FunctionApp
{
    public static class BlobTriggerFunction
    {
        [Function(nameof(BlobTriggerFunction))]
        public static async Task Run(
            [BlobTrigger("pavan/{fileName}", Connection = "storageconnection")] BlobClient blobClient,
            string fileName,
            [DurableClient] DurableTaskClient orchestrationClient,
            FunctionContext context)
        {
            var logger = context.GetLogger(nameof(BlobTriggerFunction));

            // Read the content of the blob
            string myBlob;
            using (var streamReader = new StreamReader(await blobClient.OpenReadAsync()))
            {
                myBlob = await streamReader.ReadToEndAsync();
            }

            logger.LogInformation($"Function created and received input. myblob: {myBlob}, fileName: {fileName}");

            // Split the content into lines
            string[] inputList = myBlob.Split(new string[] { Environment.NewLine }, StringSplitOptions.None);

            // Start a new instance of the Durable Orchestration Function
            string instanceId = await orchestrationClient.StartNewAsync(nameof(Function1), inputList);

            logger.LogInformation($"Blob trigger function completed. Durable Orchestration started: instanceId: {instanceId}");
        }

        private static object OrchestratorFunction()
        {
            throw new NotImplementedException();
        }
    }
}

Orchestration function:

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;

namespace FunctionApp1300
{
    public static class Function1
    {
        [Function(nameof(Function1))]
        public static async Task RunOrchestrator(
            [Microsoft.Azure.Functions.Worker.OrchestrationTrigger] IDurableOrchestrationContext context)
        {
            var logger = context.CreateReplaySafeLogger();
            logger.LogInformation("Orchestration function started.");

            // Replace with your business logic
            var outputs = new List<string>();
            outputs.Add(await context.CallActivityAsync<string>(nameof(SayHello), "Tokyo"));
            outputs.Add(await context.CallActivityAsync<string>(nameof(SayHello), "Seattle"));
            outputs.Add(await context.CallActivityAsync<string>(nameof(SayHello), "London"));

            logger.LogInformation("Orchestration function completed.");
        }

        [Function(nameof(SayHello))]
        public static string SayHello([Microsoft.Azure.Functions.Worker.ActivityTrigger] string name, ILogger logger)
        {
            logger.LogInformation($"Saying hello to {name}.");
            return $"Hello {name}!";
        }
    }
}

Program.cs:

using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

var host = new HostBuilder()
    .ConfigureFunctionsWebApplication()
    .ConfigureServices(services =>
    {
        services.AddApplicationInsightsTelemetryWorkerService();
        services.ConfigureFunctionsApplicationInsights();
    })
    .Build();
host.Run();

.CSProj file:

<Project Sdk="Microsoft.NET.Sdk">
  <PropertyGroup>
    <TargetFramework>net6.0</TargetFramework>
    <AzureFunctionsVersion>v4</AzureFunctionsVersion>
    <OutputType>Exe</OutputType>
    <ImplicitUsings>enable</ImplicitUsings>
    <Nullable>enable</Nullable>
  </PropertyGroup>
  <ItemGroup>
    <FrameworkReference Include="Microsoft.AspNetCore.App" />
    <PackageReference Include="Microsoft.Azure.Functions.Worker" Version="1.20.1" />
    <PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.DurableTask" Version="1.0.0" />
    <PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Http" Version="3.1.0" />
    <PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore" Version="1.2.0" />
    <PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Storage.Blobs" Version="6.2.0" />
    <PackageReference Include="Microsoft.Azure.Functions.Worker.Sdk" Version="1.16.4" />
    <PackageReference Include="Microsoft.ApplicationInsights.WorkerService" Version="2.21.0" />
    <PackageReference Include="Microsoft.Azure.Functions.Worker.ApplicationInsights" Version="1.1.0" />
  </ItemGroup>
  <ItemGroup>
    <None Update="host.json">
      <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
    </None>
    <None Update="local.settings.json">
      <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
      <CopyToPublishDirectory>Never</CopyToPublishDirectory>
    </None>
  </ItemGroup>
  <ItemGroup>
    <Using Include="System.Threading.ExecutionContext" Alias="ExecutionContext" />
  </ItemGroup>
</Project>

host.json:

{
  "version": "2.0",
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      }
    }
  },
  "extensions": {
    "durableTask": {
      "hubName": "MyTaskHub",
      "storageProvider": {
        "connectionStringName": "AzureWebJobsStorage",
        "controlQueueBatchSize": 32,
        "partitionCount": 4
      },
      "tracing": {
        "traceInputsAndOutputs": false,
        "traceReplayEvents": false
      }
    },
    "maxConcurrentActivityFunctions": 10,
    "maxConcurrentOrchestratorFunctions": 1
  }
}

local.settings.json:

{
    "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "storageconnection": "your-storage conn-string",
    "FUNCTIONS_WORKER_RUNTIME": "dotnet-isolated"
  }
}

The above functions executed successfully. check below:

Output:

enter image description here

enter image description here