Azure for Developers Tutorial Step 6: Processing the queue

This is the sixth step of the Azure for Developers tutorial, in which we set up a WCF service running in Azure to provide CRUD operations to a client application. For more information, please check out the Introduction.

In the last post, we changed our service to write messages to a queue. In this post, we will set up a worker role to pull the messages off of the queue and process them. But first, a few words about queues.

How would you use a queue?

GoldMail is an application that lets you add "slides” (pictures, screenshots, PowerPoint slides, etc.), and you record audio (voice or sound file) over them and then “share” the GoldMail, which sends your assets up to the cloud. When someone plays your GoldMail, the player retrieves those assets. When played on a mobile device, we don’t want to return the same large images we are using for our desktop player.

We could resize the images on the customer’s computer and send them up with the originals, but why should he have to wait for that to happen? So we use a queue and a worker role to handle this. When the service gets the “finished” message from the desktop application, it puts a message on the queue. A worker role picks the message off of the queue, downloads the large images and resizes them to small images, and then uploads the small images to the same folder where the original images reside and flips a boolean in the database.

Another use of queues if to offload database updates that don’t need to be written immediately. If someone sends a GoldMail to a thousand people, and half of them view it, we would get 500 updates to our database at the same time. To meter these updates, we write those update requests to a queue, and we have a worker role that pulls the entries off of the queue and updates the database.

What is Invisibility?

I have a lot of trace logging, and when I put the processing of the mobile slides in, I noticed that there was a problem. It looked like the first instance of the worker role was picking the entry off the queue, and before it could finish, the second role was picking the entry off the queue and trying to process it, too.

When you read a message from the queue, it doesn’t really remove the message from the queue, it marks it as invisible. After a certain amount of time, if the message has not been deleted from the queue, it reappears to be processed again. The default time is 30 seconds. It takes about 1 minute and 30 seconds to process a hundred slides. So the message was becoming visible again and the second instance of the worker role was picking it up and trying to process it, even though the first instance was still working on it. Oops.

When you take the message off of the queue, you can set the amount of time you want it to be invisible. I ran hundreds of GoldMails through the worker role and determined that the longest conversion time was a minute and a half, so I set the invisibility time to 2 minutes. (Better safe than sorry).

How can I mess up my production queue processing?

Queues are storage, and can be accessed by any instance of any service. We had another case where we published a new version of our service to the staging instance in preparation for going into production, and it started processing entries from the production queue, and since we had changed the format of the messages, the processing failed. Oops.

Since we publish our worker roles and web roles in the same Azure service, we can’t stop one without stopping the other, and we don’t want to stop production. So when we’re publishing new versions, we change the queue name (which is in the Azure role configuration) for the worker role to an unused queue and publish it to staging. After doing the VIP swap, we stop the old production service now in staging, then change the queue name in the production configuration, at which point the worker role will start processing any accumulated messages.

What if a queue message just won’t process?

If we have a problem processing an entry from the queue, we don’t delete it from the queue, we let it come back around and try processing it again. You obviously don’t want to do this infinitely, or you could end up with one message stopping up your queue. You should check the dequeue count for the message, and if it’s over some threshold, add the message to an error queue and delete it from the regular queue. Then check the error queue every now and then to see if you’ve had any problems.

Can we get back to the code now?

With all that information, let’s now add processing for our queue. We’ll add a worker role. Open our AzureCustomerServices solution. In the cloud project, right-click on Roles, and select “Add New Worker Role Project”.

You’ll be prompted for the role to add.

Pick the Worker Role, and name it CustomerWorker, then click Add.

First, let’s set up the configuration. Double-click on the ServiceConfiguration.Local.cscfg in the cloud project. If you go down to the bottom, you will find that it has added a new section for the worker role. Add these configuration settings:

      <Setting name="Microsoft.WindowsAzure.Plugins.Diagnostics.ConnectionString" 
               value="UseDevelopmentStorage=true" />
      <Setting name="DataConnectionString" value="UseDevelopmentStorage=true" />
      <!-- frequency, in seconds, to retrieve the perf counters -->
      <Setting name="PerfMonSampleRate" value="60" />
      <!-- frequency, in seconds, to transfer the perf counters to the logs from the system-->
      <Setting name="PerfMonScheduledTransferPeriod" value="120" />
      <Setting name="ProcessQueueName" value="codecampqueue" />
      <Setting name="QueueMessageVisibilityTime" value="120" />

 

Then open ServiceConfiguration.Cloud.cscfg and find the worker role section at the bottom, then make the same changes, or put in your real values if you want to test the service in the cloud:

      <Setting name="Microsoft.WindowsAzure.Plugins.Diagnostics.ConnectionString" 
           value="DefaultEndpointsProtocol=https;AccountName=PUTYOURACCOUNTNAMEHERE;
           AccountKey=PUTYOURACCOUNTKEYHERE" />
      <Setting name="DataConnectionString" value="DefaultEndpointsProtocol=https;
               AccountName=PUTYOURACCOUNTNAMEHERE;AccountKey=PUTYOURACCOUNTKEYHERE" />
      <!-- frequency, in seconds, to retrieve the perf counters -->
      <Setting name="PerfMonSampleRate" value="60" />
      <!-- frequency, in seconds, to transfer the perf counters to the logs from the system-->
      <Setting name="PerfMonScheduledTransferPeriod" value="120" />
      <Setting name="ProcessQueueName" value="codecampqueue" />
      <Setting name="QueueMessageVisibilityTime" value="120" />

Now open ServiceDefinition.csdef and find the worker role section at the bottom, and add a section for the configuration settings.

    <ConfigurationSettings>
      <Setting name="DataConnectionString" />
      <Setting name="PerfMonSampleRate" />
      <Setting name="PerfMonScheduledTransferPeriod" />
      <Setting name="ProcessQueueName" />
      <Setting name="QueueMessageVisibilityTime" />
    </ConfigurationSettings>

I’ll explain the settings as we use them. Now let’s add a class to our worker role project and call it GlobalStaticProperties. Rather than retrieve the configuration values from the role repeatedly, I retrieve them from this class, which only reads them the first time they are retrieved. So right-click on CustomerServicesWebRole and choose Add Class. Name it GlobalStaticProperties and click OK. Change the class from Public to Internal Static.

Add these using statements at the top:

using Microsoft.WindowsAzure.ServiceRuntime;
using System.Diagnostics;

And here is the code you need to put in the class. I’ve put comments inline to explain what each variable is.

internal static class GlobalStaticProperties
{

  private static string _ProcessQueueName;
  /// <summary>
  /// name of the queue
  /// </summary>
  internal static string ProcessQueueName
  {
    get
    {
      if (string.IsNullOrEmpty(_ProcessQueueName))
      {
        _ProcessQueueName = RoleEnvironment.GetConfigurationSettingValue("ProcessQueueName");
        Trace.TraceInformation("[CustomerWorker.GlobalStaticProperties] "
          + "ProcessQueueName to {0}", _ProcessQueueName);
      }
      return _ProcessQueueName;
    }
  }

  private static int _QueueMessageVisibilityTime { get; set; }
  /// <summary>
  /// This is the amount of time the message remains invisible after being
  /// read from the queue, before it becomes visible again (unless it is deleted)
  /// </summary>
  internal static int QueueMessageVisibilityTime
  {
    get
    {
      if (_QueueMessageVisibilityTime <= 0)
      {
        //hasn't been loaded yet, so load it 
        string VisTime = 
          RoleEnvironment.GetConfigurationSettingValue("QueueMessageVisibilityTime");
        int intTest = 0;
        bool success = int.TryParse(VisTime, out intTest);
        if (!success || intTest <= 0)
        {
          _QueueMessageVisibilityTime = 120;
        }
        else
        {
          _QueueMessageVisibilityTime = intTest;
        }
        Trace.TraceInformation("[CustomerWorker.GlobalStaticProperties] " 
          + "Setting QueueMessageVisibilityTime to {0}", _QueueMessageVisibilityTime);
      }
      return _QueueMessageVisibilityTime;
    }
  }
}

Now we have the configuration set up and handled, we want to add code to set up the diagnostics configuration. We need the diagnostics configuration code to execute when the worker role starts up, so open WorkerRole.cs. Add this using statement at the top:

using Microsoft.WindowsAzure.Diagnostics.Management;

In the OnStart() method, right after this code:

// Set the maximum number of concurrent connections 
ServicePointManager.DefaultConnectionLimit = 12;

add the diagnostics configuration code. I’ve included comments to explain the code. This is identical to the code we put in the web role back in part 1.

// Get a reference to the initial default configuration.
string wadConnectionString = "Microsoft.WindowsAzure.Plugins.Diagnostics.ConnectionString";

// First, get a reference to the storage account where the diagnostics will be written. 
// It is recommended that you use a separate account for diagnostics and data, so the 
//   performance of your data access is not impacted by the diagnostics.
CloudStorageAccount storageAccount =
    CloudStorageAccount.Parse(
    RoleEnvironment.GetConfigurationSettingValue(wadConnectionString));

// Get an reference to the diagnostic manager for the role instance, 
//   and then get the default initial configuration, which we will then change.
RoleInstanceDiagnosticManager roleInstanceDiagnosticManager =
    storageAccount.CreateRoleInstanceDiagnosticManager(RoleEnvironment.DeploymentId,
    RoleEnvironment.CurrentRoleInstance.Role.Name, RoleEnvironment.CurrentRoleInstance.Id);
DiagnosticMonitorConfiguration config = DiagnosticMonitor.GetDefaultInitialConfiguration();

// Change the polling interval for checking for configuration changes
//   and the buffer quota for the logs. 
config.ConfigurationChangePollInterval = TimeSpan.FromSeconds(30.0);
config.DiagnosticInfrastructureLogs.BufferQuotaInMB = 256;

// The diagnostics data is written locally and then transferred to Azure Storage. 
// These are the transfer intervals for doing that operation.
config.Logs.ScheduledTransferPeriod = TimeSpan.FromMinutes(1.0); //for trace logs
config.Directories.ScheduledTransferPeriod = TimeSpan.FromMinutes(1.0); //for iis logs

// Configure the monitoring of one Windows performance counter
// and add it to the configuration.
int sampleRate = 0;
int scheduledTransferPeriod = 0;
bool success = false;
//this is sample rate, in seconds, for the performance monitoring in %CPU.
//By making this configurable, you can change the azure config rather than republish the role.
success = int.TryParse(RoleEnvironment.GetConfigurationSettingValue("PerfMonSampleRate"),
  out sampleRate);
if (!success || sampleRate <= 0)
  sampleRate = 60;  //default is 60 seconds
success =
  int.TryParse(RoleEnvironment.GetConfigurationSettingValue("PerfMonScheduledTransferPeriod"),
  out scheduledTransferPeriod);
if (!success || scheduledTransferPeriod <= 0)
  scheduledTransferPeriod = 120;  //default is 120 seconds

PerformanceCounterConfiguration perfConfig
    = new PerformanceCounterConfiguration();
perfConfig.CounterSpecifier = @"\Processor(*)\% Processor Time";
perfConfig.SampleRate = TimeSpan.FromSeconds((double)sampleRate);
config.PerformanceCounters.DataSources.Add(perfConfig);
config.PerformanceCounters.ScheduledTransferPeriod =
  TimeSpan.FromSeconds((double)scheduledTransferPeriod);

// Configure monitoring of Windows Application and System Event logs,
// including the quota and scheduled transfer interval, and add them 
// to the configuration.
WindowsEventLogsBufferConfiguration eventsConfig
    = new WindowsEventLogsBufferConfiguration();
eventsConfig.BufferQuotaInMB = 256;
eventsConfig.ScheduledTransferLogLevelFilter = LogLevel.Undefined; //was warning
eventsConfig.ScheduledTransferPeriod = TimeSpan.FromMinutes(2.0); //was 10
eventsConfig.DataSources.Add("Application!*");
eventsConfig.DataSources.Add("System!*");
config.WindowsEventLog = eventsConfig;

//set the configuration to be used by the current role instance
roleInstanceDiagnosticManager.SetCurrentConfiguration(config);

//add an event handler for the configuration being changed while the role is running
RoleEnvironment.Changing += 
  new EventHandler<RoleEnvironmentChangingEventArgs>(RoleEnvironment_Changing);
return base.OnStart();

Add the event handler for the RoleEnvironment Changing event:

void RoleEnvironment_Changing(object sender, RoleEnvironmentChangingEventArgs e)
{
  // If a configuration setting is changing
  if (e.Changes.Any(change => change is RoleEnvironmentConfigurationSettingChange))
  {
    // Set e.Cancel to true to restart this role instance
    e.Cancel = true;
  }
}

That takes care of the diagnostics. Now let’s put in the code for handling the queue. We need to define our queue, so add this at the top of the WorkerRole class:

CloudQueue queue;

Now we need to add a method to be run only once after the worker role starts up that makes sure the queue exists, and if it doesn’t, it creates it. Let’s call it StartupQueue(). I’ve added comments to explain what the code is doing.

private void StartUpQueue()
{
  //get a reference to the storage account
  CloudStorageAccount storageAccount =
      CloudStorageAccount.Parse(RoleEnvironment.GetConfigurationSettingValue(
      "Microsoft.WindowsAzure.Plugins.Diagnostics.ConnectionString"));

  // initialize the queue client that will be used to access the queue 
  CloudQueueClient queueStorage = storageAccount.CreateCloudQueueClient();
  string queueName = GlobalStaticProperties.ProcessQueueName;
  //get a reference to the queue
  queue = queueStorage.GetQueueReference(queueName);

  //only initialize this once after the role starts up
  //so check this boolean and loop until it manages to make sure the queue is present 
  //because this role can't run without the queue
  bool storageInitialized = false;
  while (!storageInitialized)
  {
    try
    {
      // create the message queue if it doesn't already exist
      queue.CreateIfNotExist();
      // set this to true, because at this point, we know it's there
      storageInitialized = true;
    }
    catch (StorageClientException ex)
    {
      // for this error, give a reminder about the dev storage service being started
      if (ex.ErrorCode == StorageErrorCode.TransportError)
      {
        Trace.TraceError("[CustomerWorker.StartUpQueue] Storage services initialization failure."
          + " Check your storage account configuration settings. If running locally,"
          + " ensure that the Development Storage service is running. Message: '{0}'", 
          ex.Message);
        //sleep 5 seconds and then loop back around and try again
        System.Threading.Thread.Sleep(5000);
      }
      else
      {
        Trace.TraceError("[CustomerWorker.StartUpQueue] StorageClientException thrown. "
          + "Ex = {0}", ex.ToString());
        throw;
      }
    }
    catch (Exception ex)
    {
      Trace.TraceError("[CustomerWorker.StartupQueue] Exception thrown "
        + "trying to initialize the queue. Ex = {0}", ex.ToString());
      throw;
    }
  }
}

Now we need to add the processing. At the top of the worker role, you have a Run() method that looks like this:

public override void Run()
{
  // This is a sample worker implementation. Replace with your logic.
  Trace.WriteLine("$projectname$ entry point called", "Information");

  while (true)
  {
    Thread.Sleep(10000);
    Trace.WriteLine("Working", "Information");
  }
}

After all these years of programming, it totally goes against my grain to put in an infinite loop, but that’s how the worker role works. It will run until it breaks out of the loop or the service goes down. We’re going to replace this code. Our code will call our StartupQueue method, and then loop infinitely looking for a message on the queue, and processing the message when it finds one.

public override void Run()
{
  //start up the queue
  StartUpQueue();

  //loop infinitely until the service shuts down
  while (true)
  {
    try
    {
        // retrieve a new message from the queue, set the visibility
      // this is hours, minutes, seconds, and the global static property is in seconds
      TimeSpan visTimeout = 
        new TimeSpan(0, 0, GlobalStaticProperties.QueueMessageVisibilityTime);
      CloudQueueMessage msg = queue.GetMessage(visTimeout);
      if (msg != null)
      {
        Trace.TraceInformation("[CustomerWorker.Run] message = {0}, time = {1}, "
          + "next visible time = {2}",
          msg.AsString, DateTime.UtcNow, msg.NextVisibleTime.Value.ToString());
        string errorMessage = string.Empty;
        //process the message 
        //assume comma-delimited, first is command. check it and handle the message accordingly
        string[] msgFields = msg.AsString.Split(new char[] { ',' });
        string command = msgFields[0];
        switch (command)
        {
          case "process":
            string firstName = msgFields[1];
            string lastName = msgFields[2];
            string favoriteMovie = msgFields[3];
            string favoriteLanguage = msgFields[4];
            ProcessQueue pq = new ProcessQueue();
            pq.ProcessQueueEntry(firstName, lastName, favoriteMovie, favoriteLanguage,
              container);
            break;
        }

        // remove message from queue
        //http://blog.smarx.com/posts/deleting-windows-azure-queue-messages-handling-exceptions            
        try
        {
          queue.DeleteMessage(msg);
        }
        catch (StorageClientException ex)
        {
          if (ex.ExtendedErrorInformation.ErrorCode == "MessageNotFound")
          {
            // pop receipt must be invalid
            // ignore or log (so we can tune the visibility timeout)
          }
          else
          {
            // not the error we were expecting
            throw;
          }
        }
      }
      else
      {
        //no message found, sleep for 5 seconds
        Thread.Sleep(5000);
      }
    }
    catch (Exception ex)
    {
      Trace.TraceError("[CustomerWorker.Run] Exception thrown "
        + "trying to read from the queue = {0}", ex.ToString());
      Thread.Sleep(5000);
    }
  }    
}

This reads the message off of the queue. The message will be null if there is no message available. After retrieving it, we know it is a comma-delimited string, so just split it into an array to get the different values. I always send a command as the first variable. Your worker role may only process one command, but if you set it up this way, you can send other commands to your worker role to be handled as well.

We want our ProcessQueue class to write the data to blob storage. To do that, it needs a reference to the blob container to pass in to ProcessQueue. So define the container for the blob at the top of the Worker Role class, under the definition for the queue:

CloudBlobContainer container;

In the OnStart event, right before calling base.OnStart(), let’s add some code to set up our container.

//****************Container****************
//get a reference to the blob client
CloudBlobClient cbc = storageAccount.CreateCloudBlobClient();
//get a reference to the container, and create it if it doesn't exist
container = cbc.GetContainerReference("codecamp");
container.CreateIfNotExist();
//now set the permissions so the container is private, 
//  but the blobs are public, so they can be accessed with a specific URL
BlobContainerPermissions permissions = new BlobContainerPermissions();
permissions.PublicAccess = BlobContainerPublicAccessType.Blob;
container.SetPermissions(permissions);

This basically makes sure the container exists, and sets the reference to the container. If you don’t set the permissions on the container after it’s created, it will make the container and blobs private, and you won’t be able to access the blobs through a URL. This code sets the permissions so the container as private, but the blobs are public. This means nobody can iterate through the blobs in the container, but they can access a blob if they have a URL for it.

Now let’s set up a method to process our queue entry. Right-click on the CustomerWorker project and select AddClass. Call the class ProcessQueue. You need a method that will take the four attributes, format them, and write the result to the container in blob storage. Here’s the ProcessQueueEntry method:

public string ProcessQueueEntry(string firstName, string lastName, string favoriteMovie, 
  string favoriteLanguage, CloudBlobContainer container)
{
  string errorMessage = string.Empty;

  try
  {
    Trace.TraceInformation("[ProcessQueueEntry] for command [process], " +
        "firstName = {0}, lastName = {1}, favoriteMovie = {2}, favoriteLanguage = {3}",
        firstName, lastName, favoriteMovie, favoriteLanguage);

    //let's write the information to blob storage. First, create the message.
    string messageToWrite = string.Format("FirstName = {0}{1}LastName={2}{1}" +
      "FavoriteMovie = {3}{1}FavoriteLanguage={4}",
      firstName, Environment.NewLine, lastName,
      favoriteMovie, favoriteLanguage);

    //now create the file name -- I'm putting the date/time stamp in the name.
    string fileName = "test_" + DateTime.Now.ToUniversalTime().ToString("yyyyMMdd-hh-mm-ss",
      new System.Globalization.CultureInfo("en-US")) + ".txt";

    //get a reference to the blob 
    var blob = container.GetBlobReference(fileName);

    //upload the text to the blob 
    blob.UploadText(messageToWrite);

  }
  catch (Exception ex)
  {
    errorMessage = "Error processing entry.";
    Trace.TraceError("[ProcessQueueEntry] Exception thrown = {0}", ex);
  }
  return errorMessage;
}

Now run the service. The entries that were in the queue before this step will now be processed. You can look in the development storage blob storage account for a folder called “codecamp”, and you should see your blobs in there. Here’s one of mine, opened in Notepad.

Now run the client and fill in a first and last name and click AddToQueue and it will add it to the queue, and then the worker role will pick it up and process it and write it to blob storage. If it doesn’t work right, put some breakpoints into the service and debug it while it’s running.

So we now have a WCF service running in a web role that reads from and writes to a Windows Azure SQL Database. It submits messages to an Azure queue, and there is a worker role that retrieves the entries from the queue and writes them to blob storage. We have a client that we can use to test the service.

We also have the diagnostics working. If I check the WADLogsTable in development storage, I can see the messages that are being logged:

I’m just showing the message field; it also stores the deploymentID, role instance, etc. There are also tables for the other diagnostics, and in blob storage, I can see my IIS logs.

What if you want to use Windows Azure table storage instead of a SQL database? In the next part, we’ll change our service to do exactly that, and be able to toggle back and forth between the two methods.

Tags:

5 Responses to “Azure for Developers Tutorial Step 6: Processing the queue”

  1. Keith Montgomery Says:

    I’m following your Azure series closely, I think… but where does ProcessQueue come from? I can’t seem to find it in the .NET docs, and other google searches don’t really return anything that matches your usage here.

    • robindotnet Says:

      Hi Keith,
      Close to the end of this article, it says “…let’s set up a method to process our queue entry. Right-click on the CustomerWorker project and select AddClass. Call the class ProcessQueue…” and then it provides the code for the ProcessQueueEntry that goes in that class. So you should be able to add the class and then paste the code into it.
      Robin

  2. Azure for Developers: Introduction to the Tutorial | RobinDotNet's Blog Says:

    […] Step 6: Processing the queue […]

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s


%d bloggers like this: