Understanding Azure Durable Functions - Part 8: The Fan Out/Fan In Pattern

This is the eighth part in a series of articles. If you’re not familiar with Durable Functions you should check out the previous articles before reading this.

In the previous article we saw the function chaining pattern where the output from one activity function is passed as the input to the next activity function to form a processing pipeline.

If you have a workload that you can split up into discrete chunks of data, you can parallelize the processing of those chunks to reduce the time it takes to complete the total workload. The fan out/fan in pattern can be used to do this.

This pattern essentially means running multiple instances of the activity function at the same time. The “fan out” part is the splitting up of the data into multiple chunks and then calling the activity function multiple times, passing in these chunks. The fanning out process invokes multiple instances of the activity function.

When each chunk has been processed, the “fan in” takes places and takes the results from each activity function instance and aggregates them into a single final result.

This pattern is only really useful if you can “chunk” the workload in a meaningful way for splitting up to be processed in parallel.

As an example, suppose we allow the client to specify a number of greetings to generate:

public class Greeting
{
    public string CityName { get; set; }
    public string Message { get; set; }
}

public class GreetingsRequest
{
    public List<Greeting> Greetings { get; set; }
}

Now the HTTP client function can be created that allows some JSON to be sent, this then calls the orchestrator:

[FunctionName("FanOutIn_HttpStart")]
public static async Task<HttpResponseMessage> HttpStart(
    [HttpTrigger(AuthorizationLevel.Function, "post")]HttpRequestMessage req,
    [OrchestrationClient]DurableOrchestrationClient starter,
    ILogger log)
{
    var data = await req.Content.ReadAsAsync<GreetingsRequest>();

    string instanceId = await starter.StartNewAsync("FanOutInOrchestrator", data);

    log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

    return starter.CreateCheckStatusResponse(req, instanceId);
}

At this point nothing is really different, the fan out/in is specified in the orchestrator function:

[FunctionName("FanOutInOrchestrator")]
public static async Task<string> RunOrchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
{
    log.LogInformation($"************** RunOrchestrator method executing ********************");

    GreetingsRequest greetingsRequest = context.GetInput<GreetingsRequest>();

    // Fanning out
    log.LogInformation($"************** Fanning out ********************");
    var parallelActivities = new List<Task<string>>();
    foreach (var greeting in greetingsRequest.Greetings)
    {
        // Start a new activity function and capture the task reference
        Task<string> task = context.CallActivityAsync<string>("FanOutIn_ActivityFunction", greeting);

        // Store the task reference for later
        parallelActivities.Add(task);
    }

    // Wait until all the activity functions have done their work
    log.LogInformation($"************** 'Waiting' for parallel results ********************");
    await Task.WhenAll(parallelActivities);
    log.LogInformation($"************** All activity functions complete ********************");

    // Now that all parallel activity functions have completed,
    // fan in AKA aggregate the results, in this case into a single
    // string using a StringBuilder
    log.LogInformation($"************** fanning in ********************");
    var sb = new StringBuilder();
    foreach (var completedParallelActivity in parallelActivities)
    {
        sb.AppendLine(completedParallelActivity.Result);
    }

    return sb.ToString();
}

The preceding code is the orchestrator function that handles the fan out/in, I’ve added comments to illustrate what’s going on. Essentially each Greeting is being treated as a “chunk” to be processed in parallel. Each chunk is passed to an instance of the FanOutIn_ActivityFunction. However rather than just awaiting the CallActivityAsync call, the task is stored in the parallelActivities list. Now when all activities have completed, the fan in can happen which just aggregates all the results into a single string containing all the greetings.

The activity function is defined as:

[FunctionName("FanOutIn_ActivityFunction")]
public static string SayHello([ActivityTrigger] Greeting greeting, ILogger log)
{            
    // simulate longer processing delay to demonstrate parallelism
    Thread.Sleep(15000); 

    return $"{greeting.Message} {greeting.CityName}";
}

If we run this, we get the following (simplified) output:

Executing 'FanOutIn_HttpStart' 
Executing 'FanOutInOrchestrator'
************** RunOrchestrator method executing ********************
************** Fanning out ********************
Function 'FanOutIn_ActivityFunction (Activity)' scheduled. 
Function 'FanOutIn_ActivityFunction (Activity)' scheduled. 
Function 'FanOutIn_ActivityFunction (Activity)' started. 
Function 'FanOutIn_ActivityFunction (Activity)' started. 
Executing 'FanOutIn_ActivityFunction' (Reason='', Id=9a33abd6-4594-4285-bbcd-0e428cf15d76)
Executing 'FanOutIn_ActivityFunction' (Reason='', Id=e3afbcb2-1f90-4f3f-a638-3983ea8db1a7)
Executed 'FanOutIn_ActivityFunction' (Succeeded, Id=9a33abd6-4594-4285-bbcd-0e428cf15d76)
Executed 'FanOutIn_ActivityFunction' (Succeeded, Id=e3afbcb2-1f90-4f3f-a638-3983ea8db1a7)
Function 'FanOutIn_ActivityFunction (Activity)' completed. 
Function 'FanOutIn_ActivityFunction (Activity)' completed. 
************** 'Waiting' for parallel results ********************
************** All activity functions complete ********************
************** fanning in ********************
Executed 'FanOutInOrchestrator' (Succeeded, Id=fba76372-758f-433c-af22-299a3b38dc5a)

Recall in the activity function there is a 15 second delay:

[FunctionName("FanOutIn_ActivityFunction")]
public static string SayHello([ActivityTrigger] Greeting greeting, ILogger log)
{            
    // simulate longer processing delay to demonstrate parallelism
    Thread.Sleep(15000); 

    return $"{greeting.Message} {greeting.CityName}";
}

If we look at the timings (below) notice that the createdTime and lastUpdatedTime are not 30 seconds apart but rather about 15 seconds apart (04:06:36 to 04:06:52), this is because the 2 activities have been run in parallel at the same time:

{
    "name": "FanOutInOrchestrator",
    "instanceId": "5704559dc4d94e26998ead2f47ea9821",
    "runtimeStatus": "Completed",
    "input": {
        "$type": "DurableDemos.FanOutInPatternExample+GreetingsRequest, DurableDemos",
        "Greetings": [
            {
                "$type": "DurableDemos.FanOutInPatternExample+Greeting, DurableDemos",
                "CityName": "New York",
                "Message": "Yo"
            },
            {
                "$type": "DurableDemos.FanOutInPatternExample+Greeting, DurableDemos",
                "CityName": "London",
                "Message": "Good day"
            }
        ]
    },
    "customStatus": null,
    "output": "Yo New York\r\nGood day London\r\n",
    "createdTime": "2019-08-21T04:06:36Z",
    "lastUpdatedTime": "2019-08-21T04:06:52Z"
}

Also note in the preceding status result the output is the aggregated “fanned-in” result: “Yo New York\r\nGood day London\r\n”.

Just as with the Function Chaining pattern discussed in the previous article, you could implement the fan out/fan in pattern without Durable Functions but you would have to manage the complexity of the process manually, such as knowing when all the parallel activity functions have completed and also the fan in/aggregation could be quite complex to implement manually.

SHARE:

Add comment

Loading