Understanding Azure Durable Functions - Part 11: The Asynchronous Human Interaction Pattern

This is the eleventh part in a series of articles. If you’re not familiar with Durable Functions you should check out the previous articles before reading this.

The Asynchronous Human Interaction Pattern allows a Durable Functions orchestration to pause at some point during its lifecycle and wait for an external event such as a human to perform some action or make some decision.

Azure Functions Durable Functions and Twilio working together

As an example, suppose that a comment can be submitted on a website but before actually appearing it must be moderated by a human. The human moderator can see the comment and then decide whether to approve the comment so it appears on the website or decline the comment in which case it is deleted.

In this scenario, if the human moderator does not approve or decline the comment within a set amount of time then the comment will be escalated to a manager to review.

Azure Durable Functions makes this possible because the orchestration can wait for an external event during its execution.

The code in this article follows the following workflow:

  1. New comment submitted via HTTP
  2. Review/moderation orchestration started
  3. Orchestration sends SMS notification to moderator
  4. Moderator receives SMS that contains 2 links: one to approve and one to decline the comment
  5. Orchestration waits for human moderator to click on one of the 2 links
  6. When human clicks a link, the orchestration resumes and comment is approved or declined
  7. If human does not click link within a set deadline, the comment is escalated to a human manager

Let’s start by defining a class to be HTTP POSTed to the client function:

public class AddCommentRequest
{
    public string UserName { get; set; }
    public string Comment { get; set; }
}

And the client function:

[FunctionName("HumanPatternExample_HttpStart")]
public static async Task<HttpResponseMessage> HttpStart(
    [HttpTrigger(AuthorizationLevel.Function, "post")]HttpRequestMessage req,
    [OrchestrationClient]DurableOrchestrationClient starter,
    ILogger log)
{
    var commentRequest = await req.Content.ReadAsAsync<AddCommentRequest>();

    string instanceId = await starter.StartNewAsync("HumanPatternExample_Orchestrator", (commentRequest: commentRequest, requestUri: req.RequestUri));

    log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

    return new HttpResponseMessage(System.Net.HttpStatusCode.Accepted)
    {
        Content = new StringContent("Your comment has been submitted and is awaiting moderator approval.")
    };
}

The preceding client function initiates the HumanPatternExample_Orchestrator function and passes in a tuple containing the comment request and the request URI which will be used later to construct the approve/decline URL links.

We’ll have a look at this orchestrator function in a moment, but first let’s take a look at the activity function that sends the SMS to the moderator:

[FunctionName("HumanPatternExample_RequestApproval")]
[return: TwilioSms(AccountSidSetting = "TwilioAccountSid", AuthTokenSetting = "TwilioAuthToken", From = "%FromPhoneNumber%")]
public static CreateMessageOptions RequestApproval([ActivityTrigger] ModerationRequest moderationRequest, ILogger log)
{            
    log.LogInformation($"Requesting approval for comment: {moderationRequest.CommentRequest.Comment}.");

    // Here we provide some way for a human to know that there is a new comment pending approval.
    // This could be writing to a database representing requests not yet approved for a human
    // to work through, or an SMS message for them to reply to with either APPROVED or NOTAPPROVED
    // or an email for them to reply to etc etc.

    var approversPhoneNumber = new PhoneNumber(Environment.GetEnvironmentVariable("ApproverPhoneNumber", EnvironmentVariableTarget.Process));                        

    var message = new CreateMessageOptions(approversPhoneNumber)
    {
        Body = $"'{moderationRequest.CommentRequest.Comment}' \r\nApprove: {moderationRequest.ApproveRequestUrl} \r\nDecline: {moderationRequest.DeclineRequestUrl}"
    };

    log.LogInformation($"Sending SMS: {message.Body}");

    return message;
}

In the preceding code, the TwilioSms output binding is being used to send an SMS – the SMS will contain links to either approve or decline the comment as the following screenshot shows:

Azure Functions and Twilio integration

Also notice that the ActivityTrigger is bound to a ModerationRequest: public static CreateMessageOptions RequestApproval([ActivityTrigger] ModerationRequest moderationRequest, ILogger log) – this class is defined as follows:

public class ModerationRequest
{
    public AddCommentRequest CommentRequest { get; set; }
    public string ApproveRequestUrl { get; set; }
    public string DeclineRequestUrl { get; set; }
}

The orchestrator function is where the main workflow is defined:

[FunctionName("HumanPatternExample_Orchestrator")]
public static async Task RunOrchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
{
    log.LogInformation($"************** RunOrchestrator method executing ********************");

    // Using tuples but could also define a class for this data
    var (commentRequest, requestUri) = context.GetInput<Tuple<AddCommentRequest,Uri>>();
    var moderationRequest = new ModerationRequest
    {
        CommentRequest = commentRequest,
        ApproveRequestUrl = $"{requestUri.Scheme}://{requestUri.Host}:{requestUri.Port}/api/HumanPatternExample_Approve?id={context.InstanceId}",
        DeclineRequestUrl = $"{requestUri.Scheme}://{requestUri.Host}:{requestUri.Port}/api/HumanPatternExample_Decline?id={context.InstanceId}",
    };
    await context.CallActivityAsync("HumanPatternExample_RequestApproval", moderationRequest);

    // Define a time out - if the moderator hasn't approved/decline then escalate to someone else, e.g. a manager
    using (var timeout = new CancellationTokenSource())
    {
        DateTime moderationDeadline = context.CurrentUtcDateTime.AddMinutes(5); // probably would be longer in real life

        Task durableTimeout = context.CreateTimer(moderationDeadline, timeout.Token);

        Task<bool> moderatedEvent = context.WaitForExternalEvent<bool>("Moderation");

        if (moderatedEvent == await Task.WhenAny(moderatedEvent, durableTimeout))
        {
            timeout.Cancel();

            bool isApproved = moderatedEvent.Result;

            if (isApproved)
            {
                log.LogInformation($"************** Comment '{commentRequest.Comment}' was approved by a moderator ********************");
                // call an activity to make the comment live on the website, etc.
            }
            else
            {
                log.LogInformation($"************** Comment '{commentRequest.Comment}' was declined by a moderator ********************");
                // call an activity to delete the comment and don't make it live on website, etc.
            }
        }
        else
        {
            log.LogInformation($"************** Comment '{commentRequest.Comment}' was not reviewed by a moderator in time, escalating...  ********************");
            // await context.CallActivityAsync("Escalate"); call an activity to escalate etc.
        }
    }

    log.LogInformation($"************** Orchestration complete ********************");
}

The code may look a little complex at first, let’s break it down into the more important parts as they relate to the Asynchronous Human Interaction Pattern:

The line await context.CallActivityAsync("HumanPatternExample_RequestApproval", moderationRequest); calls the activity that actually notifies the human in some way that the orchestration is waiting for them.

Two tasks are created: Task durableTimeout = context.CreateTimer(moderationDeadline, timeout.Token); and Task<bool> moderatedEvent = context.WaitForExternalEvent<bool>("Moderation"); The first task represents the deadline/timeout that the moderator has. The second uses the DurableOrchestrationContext.WaitForExternalEvent method to pause the orchestration until an event occurs outside of the orchestration (i.e. the human interaction). Once these 2 tasks are defined, the line if (moderatedEvent == await Task.WhenAny(moderatedEvent, durableTimeout)) checks to see if the orchestration is continuing because of an external event or because of the timeout .

So if the orchestration is waiting for an external event, how is that event sent to the orchestration? This is done via the DurableOrchestrationClient.RaiseEventAsync method as the following code shows:

[FunctionName("HumanPatternExample_Approve")]
public static async Task<IActionResult> HumanPatternExample_Approve(
    [HttpTrigger(AuthorizationLevel.Function, "get")]HttpRequest req,
    [OrchestrationClient]DurableOrchestrationClient client,
    ILogger log)
{
    // additional validation/null check code omitted for brevity

    var id = req.Query["id"];

    var status = await client.GetStatusAsync(id);

    if (status.RuntimeStatus == OrchestrationRuntimeStatus.Running)
    {
        await client.RaiseEventAsync(id, "Moderation", true);
        return new OkObjectResult("Comment was approved.");
    }

    return new NotFoundResult();
}

[FunctionName("HumanPatternExample_Decline")]
public static async Task<IActionResult> HumanPatternExample_Decline(
    [HttpTrigger(AuthorizationLevel.Function, "get")]HttpRequest req,
    [OrchestrationClient]DurableOrchestrationClient client,
    ILogger log)
{
    // additional validation/null check code omitted for brevity

    var id = req.Query["id"];

    var status = await client.GetStatusAsync(id);
    if (status.RuntimeStatus == OrchestrationRuntimeStatus.Running)
    {
        await client.RaiseEventAsync(id, "Moderation", false);
        return new OkObjectResult("Comment was declined.");
    }

    return new NotFoundResult();
}

The preceding 2 functions are triggered via HTTP (the links that are sent in the SMS) and raise the “Moderation” event to the orchestration id with either true or false.

The orchestrator is waiting for this event: Task<bool> moderatedEvent = context.WaitForExternalEvent<bool>("Moderation"); If the event is received in the orchestrator,  the approval decision is determined: bool isApproved = moderatedEvent.Result; The sample code then uses an if statement to either publish the comment to the website or delete it (omitted for brevity).

Let’s take a look at some (simplified) output – first if the human moderator approves the comment:

Executing HTTP request: {
  "requestId": "c300dfdb-553a-4d1a-8685-7eec6e9fc375",
  "method": "POST",
  "uri": "/api/HumanPatternExample_HttpStart"
}
Executing 'HumanPatternExample_HttpStart' (Reason='This function was programmatically called via the host APIs.', Id=2ff016f6-d54e-4489-9497-393322165d24)
Started orchestration with ID = 'c5ea86d46e524641a49ca10d1c04efc5'.
Executing 'HumanPatternExample_Orchestrator' (Reason='', Id=43196ec8-83cd-4a82-9c63-fcc9f13bd114)
************** RunOrchestrator method executing ********************
Executing 'HumanPatternExample_RequestApproval' (Reason='', Id=c98b259c-100a-4730-a1a9-116f5ea11aa1)
Requesting approval for comment: I hate cheese.
Sending SMS: 'I hate cheese'
Approve: http://localhost:7071/api/HumanPatternExample_Approve?id=c5ea86d46e524641a49ca10d1c04efc5
Decline: http://localhost:7071/api/HumanPatternExample_Decline?id=c5ea86d46e524641a49ca10d1c04efc5
Executed 'HumanPatternExample_RequestApproval' (Succeeded, Id=c98b259c-100a-4730-a1a9-116f5ea11aa1)
'HumanPatternExample_Orchestrator (Orchestrator)' is waiting for input. Reason: CreateTimer
'HumanPatternExample_Orchestrator (Orchestrator)' is waiting for input. Reason: WaitForExternalEvent:Moderation

<I 'clicked' the approve link here>

Executing 'HumanPatternExample_Approve' 
Function 'HumanPatternExample_Orchestrator (Orchestrator)' scheduled. Reason: RaiseEvent:Moderation
Function 'HumanPatternExample_Orchestrator (Orchestrator)' received a 'Moderation' event
************** Comment 'I hate cheese' was approved by a moderator ********************
************** Orchestration complete ********************

If we submit another request and wait or 5 minutes (DateTime moderationDeadline = context.CurrentUtcDateTime.AddMinutes(5);) we get the following:

Executing HTTP request: {
  "requestId": "08354276-34b2-4183-8884-9fc92fbac13d",
  "method": "POST",
  "uri": "/api/HumanPatternExample_HttpStart"
}
Executing 'HumanPatternExample_HttpStart' 
Started orchestration with ID = 'e660e4ee29044d8ea9bbbcff0e7e001f'.
Executed 'HumanPatternExample_HttpStart' (Succeeded, Id=6b33326d-b0e5-4fd1-9671-43f176b12928)
Executing 'HumanPatternExample_Orchestrator' (Reason='', Id=e2845e74-9818-4a92-ab29-e85233c208f3)
************** RunOrchestrator method executing ********************
Function 'HumanPatternExample_RequestApproval (Activity)' started.
Executing 'HumanPatternExample_RequestApproval' (Reason='', Id=b25f71e4-810f-4fc4-bb71-0c8f819eccfc)
Requesting approval for comment: I LOOOOVEEEE cheese.
Sending SMS: 'I LOOOOVEEEE cheese'
Approve: http://localhost:7071/api/HumanPatternExample_Approve?id=e660e4ee29044d8ea9bbbcff0e7e001f
Decline: http://localhost:7071/api/HumanPatternExample_Decline?id=e660e4ee29044d8ea9bbbcff0e7e001f
Executed 'HumanPatternExample_RequestApproval' (Succeeded, Id=b25f71e4-810f-4fc4-bb71-0c8f819eccfc)
Function 'HumanPatternExample_Orchestrator (Orchestrator)' is waiting for input. Reason: CreateTimer
Function 'HumanPatternExample_Orchestrator (Orchestrator)' is waiting for input. Reason: WaitForExternalEvent:Moderation
Function 'HumanPatternExample_Orchestrator (Orchestrator)' was resumed by a timer scheduled for '2019-09-05T05:42:05.9852935Z'. State: TimerExpired
************** Comment 'I LOOOOVEEEE cheese' was not reviewed by a moderator in time, escalating...  ********************
************** Orchestration complete ********************
Executed 'HumanPatternExample_Orchestrator' (Succeeded, Id=eff52ac6-a7e8-4d81-afd5-9125bd5a5aaa)

Notice this time the orchestration was “un-paused” because the timer expired and the moderator didn’t respond: Function 'HumanPatternExample_Orchestrator (Orchestrator)' was resumed by a timer scheduled for '2019-09-05T05:42:05.9852935Z'. State: TimerExpired

The sample code in this article does not contain comprehensive error handling or security so you’d want to make sure you have both these in place if you were to implement this kind of workflow.

You can also use the Durable Functions API to send events though you’ll need to expose the system key which you probably won’t want to do – read Part 9: The Asynchronous HTTP API Pattern to learn more.

If you like this article or series be sure to share it :)

SHARE:

Add comment

Loading