Windows Azure Service Bus Scatter-Gather Implementation
- by Alan Smith
One of the more challenging enterprise integration patterns that developers may wish to implement is the Scatter-Gather pattern. In this article I will show the basic implementation of a scatter-gather pattern using the topic-subscription model of the windows azure service bus. I’ll be using the implementation in demos, and also as a lab in my training courses, and the pattern will also be included in the next release of my free e-book the “Windows Azure Service Bus Developer Guide”.  The Scatter-Gather pattern answers the following scenario.  How do you maintain the overall message flow when a message needs to be sent to multiple recipients, each of which may send a reply?    Use a Scatter-Gather that broadcasts a message to multiple recipients and re-aggregates the responses back into a single message.                       The Enterprise Integration Patterns website provides a description of the Scatter-Gather pattern here.                    The scatter-gather pattern uses a composite of the publish-subscribe channel pattern and the aggregator pattern. The publish-subscribe channel is used to broadcast messages to a number of receivers, and the aggregator is used to gather the response messages and aggregate them together to form a single message.  Scatter-Gather Scenario  The scenario for this scatter-gather implementation is an application that allows users to answer questions in a poll based voting scenario. A poll manager application will be used to broadcast questions to users, the users will use a voting application that will receive and display the questions and send the votes back to the poll manager. The poll manager application will receive the users’ votes and aggregate them together to display the results. The scenario should be able to scale to support a large number of users.     Scatter-Gather Implementation  The diagram below shows the overall architecture for the scatter-gather implementation.              Messaging Entities  Looking at the scatter-gather pattern diagram it can be seen that the topic-subscription architecture is well suited for broadcasting a message to a number of subscribers. The poll manager application can send the question messages to a topic, and each voting application can receive the question message on its own subscription. The static limit of 2,000 subscriptions per topic in the current release means that 2,000 voting applications can receive question messages and take part in voting.  The vote messages can then be sent to the poll manager application using a queue. The voting applications will send their vote messages to the queue, and the poll manager will receive and process the vote messages.  The questions topic and answer queue are created using the Windows Azure Developer Portal. Each instance of the voting application will create its own subscription in the questions topic when it starts, allowing the question messages to be broadcast to all subscribing voting applications.  Data Contracts  Two simple data contracts will be used to serialize the questions and votes as brokered messages. The code for these is shown below.                                  [DataContract]          public class Question          {              [DataMember]              public string QuestionText { get; set; }          }                               To keep the implementation of the voting functionality simple and focus on the pattern implementation, the users can only vote yes or no to the questions.                                  [DataContract]          public class Vote          {              [DataMember]              public string QuestionText { get; set; }                         [DataMember]              public bool IsYes { get; set; }          }                               Poll Manager Application  The poll manager application has been implemented as a simple WPF application; the user interface is shown below.    A question can be entered in the text box, and sent to the topic by clicking the Add button. The topic and subscriptions used for broadcasting the messages are shown in a TreeView control. The questions that have been broadcast and the resulting votes are shown in a ListView control.  When the application is started any existing subscriptions are cleared form the topic, clients are then created for the questions topic and votes queue, along with background workers for receiving and processing the vote messages, and updating the display of subscriptions.                                  public MainWindow()          {              InitializeComponent();                         // Create a new results list and data bind it.              Results = new ObservableCollection<Result>();              lsvResults.ItemsSource = Results;                         // Create a token provider with the relevant credentials.              TokenProvider credentials =                  TokenProvider.CreateSharedSecretTokenProvider                  (AccountDetails.Name, AccountDetails.Key);                         // Create a URI for the serivce bus.              Uri serviceBusUri = ServiceBusEnvironment.CreateServiceUri                  ("sb", AccountDetails.Namespace, string.Empty);                         // Clear out any old subscriptions.              NamespaceManager = new NamespaceManager(serviceBusUri, credentials);              IEnumerable<SubscriptionDescription> subs =                   NamespaceManager.GetSubscriptions(AccountDetails.ScatterGatherTopic);              foreach (SubscriptionDescription sub in subs)              {                  NamespaceManager.DeleteSubscription(sub.TopicPath, sub.Name);              }                         // Create the MessagingFactory              MessagingFactory factory = MessagingFactory.Create(serviceBusUri, credentials);                         // Create the topic and queue clients.              ScatterGatherTopicClient =                   factory.CreateTopicClient(AccountDetails.ScatterGatherTopic);              ScatterGatherQueueClient =                   factory.CreateQueueClient(AccountDetails.ScatterGatherQueue);                         // Start the background worker threads.              VotesBackgroundWorker = new BackgroundWorker();              VotesBackgroundWorker.DoWork += new DoWorkEventHandler(ReceiveMessages);              VotesBackgroundWorker.RunWorkerAsync();                         SubscriptionsBackgroundWorker = new BackgroundWorker();              SubscriptionsBackgroundWorker.DoWork += new DoWorkEventHandler(UpdateSubscriptions);              SubscriptionsBackgroundWorker.RunWorkerAsync();          }                               When the poll manager user nters a question in the text box and clicks the Add button a question message is created and sent to the topic. This message will be broadcast to all the subscribing voting applications. An instance of the Result class is also created to keep track of the votes cast, this is then added to an observable collection named Results, which is data-bound to the ListView control.                                  private void btnAddQuestion_Click(object sender, RoutedEventArgs e)          {              // Create a new result for recording votes.              Result result = new Result()              {                  Question = txtQuestion.Text              };              Results.Add(result);                         // Send the question to the topic              Question question = new Question()              {                  QuestionText = result.Question              };              BrokeredMessage msg = new BrokeredMessage(question);              ScatterGatherTopicClient.Send(msg);                         txtQuestion.Text = "";          }                               The Results class is implemented as follows.                                  public class Result : INotifyPropertyChanged          {              public string Question { get; set; }                         private int m_YesVotes;              private int m_NoVotes;                         public event PropertyChangedEventHandler PropertyChanged;                         public int YesVotes              {                  get { return m_YesVotes; }                  set                  {                      m_YesVotes = value;                      NotifyPropertyChanged("YesVotes");                  }              }                         public int NoVotes              {                  get { return m_NoVotes; }                  set                  {                      m_NoVotes = value;                      NotifyPropertyChanged("NoVotes");                  }              }                         private void NotifyPropertyChanged(string prop)              {                  if(PropertyChanged != null)                  {                      PropertyChanged(this, new PropertyChangedEventArgs(prop));                  }              }          }                               The INotifyPropertyChanged interface is implemented so that changes to the number of yes and no votes will be updated in the ListView control.  Receiving the vote messages from the voting applications is done asynchronously, using a background worker thread.                                   // This runs on a background worker.          private void ReceiveMessages(object sender, DoWorkEventArgs e)          {              while (true)              {                  // Receive a vote message from the queue                  BrokeredMessage msg = ScatterGatherQueueClient.Receive();                  if (msg != null)                  {                      // Deserialize the message.                      Vote vote = msg.GetBody<Vote>();                                 // Update the results.                      foreach (Result result in Results)                      {                          if (result.Question.Equals(vote.QuestionText))                          {                              if (vote.IsYes)                              {                                  result.YesVotes++;                              }                              else                              {                                  result.NoVotes++;                              }                              break;                          }                      }                                 // Mark the message as complete.                      msg.Complete();                  }                         }          }                               When a vote message is received, the result that matches the vote question is updated with the vote from the user. The message is then marked as complete.  A second background thread is used to update the display of subscriptions in the TreeView, with a dispatcher used to update the user interface.                       // This runs on a background worker.          private void UpdateSubscriptions(object sender, DoWorkEventArgs e)          {              while (true)              {                  // Get a list of subscriptions.                  IEnumerable<SubscriptionDescription> subscriptions =                       NamespaceManager.GetSubscriptions(AccountDetails.ScatterGatherTopic);                             // Update the user interface.                  SimpleDelegate setQuestion = delegate()                  {                      trvSubscriptions.Items.Clear();                      TreeViewItem topicItem = new TreeViewItem()                      {                          Header = AccountDetails.ScatterGatherTopic                      };                                 foreach (SubscriptionDescription subscription in subscriptions)                      {                          TreeViewItem subscriptionItem = new TreeViewItem()                          {                              Header = subscription.Name                          };                          topicItem.Items.Add(subscriptionItem);                      }                      trvSubscriptions.Items.Add(topicItem);                                 topicItem.ExpandSubtree();                  };                  this.Dispatcher.BeginInvoke(DispatcherPriority.Send, setQuestion);                             Thread.Sleep(3000);              }          }                                  Voting Application  The voting application is implemented as another WPF application. This one is more basic, and allows the user to vote “Yes” or “No” for the questions sent by the poll manager application. The user interface for that application is shown below.    When an instance of the voting application is created it will create a subscription in the questions topic using a GUID as the subscription name. The application can then receive copies of every question message that is sent to the topic.  Clients for the new subscription and the votes queue are created, along with a background worker to receive the question messages. The voting application is set to receiving mode, meaning it is ready to receive a question message from the subscription.                                  public MainWindow()          {              InitializeComponent();                         // Set the mode to receiving.              IsReceiving = true;                         // Create a token provider with the relevant credentials.              TokenProvider credentials =                  TokenProvider.CreateSharedSecretTokenProvider                  (AccountDetails.Name, AccountDetails.Key);                         // Create a URI for the serivce bus.              Uri serviceBusUri = ServiceBusEnvironment.CreateServiceUri                  ("sb", AccountDetails.Namespace, string.Empty);                         // Create the MessagingFactory              MessagingFactory factory = MessagingFactory.Create(serviceBusUri, credentials);                         // Create a subcription for this instance              NamespaceManager mgr = new NamespaceManager(serviceBusUri, credentials);              string subscriptionName = Guid.NewGuid().ToString();              mgr.CreateSubscription(AccountDetails.ScatterGatherTopic, subscriptionName);                         // Create the subscription and queue clients.              ScatterGatherSubscriptionClient = factory.CreateSubscriptionClient                  (AccountDetails.ScatterGatherTopic, subscriptionName);              ScatterGatherQueueClient =                   factory.CreateQueueClient(AccountDetails.ScatterGatherQueue);                         // Start the background worker thread.              BackgroundWorker = new BackgroundWorker();              BackgroundWorker.DoWork += new DoWorkEventHandler(ReceiveMessages);              BackgroundWorker.RunWorkerAsync();          }                               I took the inspiration for creating the subscriptions in the voting application from the chat application that uses topics and subscriptions blogged by Ovais Akhter here.  The method that receives the question messages runs on a background thread. If the application is in receive mode, a question message will be received from the subscription, the question will be displayed in the user interface, the voting buttons enabled, and IsReceiving set to false to prevent more questing from being received before the current one is answered.                                  // This runs on a background worker.          private void ReceiveMessages(object sender, DoWorkEventArgs e)          {              while (true)              {                  if (IsReceiving)                  {                      // Receive a question message from the topic.                      BrokeredMessage msg = ScatterGatherSubscriptionClient.Receive();                      if (msg != null)                      {                          // Deserialize the message.                          Question question = msg.GetBody<Question>();                                     // Update the user interface.                          SimpleDelegate setQuestion = delegate()                          {                              lblQuestion.Content = question.QuestionText;                              btnYes.IsEnabled = true;                              btnNo.IsEnabled = true;                          };                          this.Dispatcher.BeginInvoke(DispatcherPriority.Send, setQuestion);                          IsReceiving = false;                                     // Mark the message as complete.                          msg.Complete();                      }                  }                  else                  {                      Thread.Sleep(1000);                  }              }          }                               When the user clicks on the Yes or No button, the btnVote_Click method is called. This will create a new Vote data contract with the appropriate question and answer and send the message to the poll manager application using the votes queue. The user voting buttons are then disabled, the question text cleared, and the IsReceiving flag set to true to allow a new message to be received.                                  private void btnVote_Click(object sender, RoutedEventArgs e)          {              // Create a new vote.              Vote vote = new Vote()              {                  QuestionText = (string)lblQuestion.Content,                  IsYes = ((sender as Button).Content as string).Equals("Yes")              };                         // Send the vote message.              BrokeredMessage msg = new BrokeredMessage(vote);              ScatterGatherQueueClient.Send(msg);                         // Update the user interface.              lblQuestion.Content = "";              btnYes.IsEnabled = false;              btnNo.IsEnabled = false;              IsReceiving = true;          }                               Testing the Application  In order to test the application, an instance of the poll manager application is started; the user interface is shown below.    As no instances of the voting application have been created there are no subscriptions present in the topic. When an instance of the voting application is created the subscription will be displayed in the poll manager.      Now that a voting application is subscribing, a questing can be sent from the poll manager application. When the message is sent to the topic, the voting application will receive the message and display the question.      The voter can then answer the question by clicking on the appropriate button. The results of the vote are updated in the poll manager application.    When two more instances of the voting application are created, the poll manager will display the new subscriptions. More questions can then be broadcast to the voting applications.    As the question messages are queued up in the subscription for each voting application, the users can answer the questions in their own time. The vote messages will be received by the poll manager application and aggregated to display the results. The screenshots of the applications part way through voting are shown below.           The messages for each voting application are queued up in sequence on the voting application subscriptions, allowing the questions to be answered at different speeds by the voters.