Aggregator Pattern in Akka using Scala

May - 30 2016 | By

The Aggregator Pattern is pretty useful for adding / removing / tracking akka actors.

(kinda like treating an functional array like an array of actors)

 

Let start with the base code from Akka here. Its not part of the akka library by default

https://github.com/akka/akka/blob/master/akka-contrib/src/main/scala/akka/contrib/pattern/Aggregator.scala

 

Once we import this into our project we can create a class like this to interact with it

package Training

import akka.actor._
import akka.testkit.{ImplicitSender, TestKit}
import org.scalatest.{FlatSpecLike, BeforeAndAfterAll, FlatSpec, Matchers}
import Training.AggregatorActorManager.{GetAllItems, RemoveItem, AddItem}
import Training.AggregatorInterface.{GetEntities, AggregatedList}
import Training.actors.aggregator.Aggregator
import scala.concurrent.duration._
import scala.collection.mutable.ArrayBuffer


// we will use this as our data object to work with inside our Aggregator to add/ remove / track
case class MyDataObject(name : String, key :String) extends ObjectWithKey {
  override def getKey(): String = key
}
trait ObjectWithKey {
   def getKey(): String
}






object AggregatorInterface {
  case class GetEntities(pattern: String)
  case class AggregateReply(entity: Any)
  case class Give()
  case class AggregatedList(label: String, list: List[Any])
  val all = "*"
  def props(orgSender: ActorRef, size: Int, label: String): Props = Props(new AggregatorInterface(orgSender, size, label))
}


  /**
  * @param orgSender the actor that this aggregator will return results too
  * @param size the expected number of records to be returned
  * @param label labels the search, used for case checks and logging
  */
class AggregatorInterface(orgSender: ActorRef, size: Int, label: String) extends Actor
with Aggregator with ActorLogging {
  import AggregatorInterface._
  import context._

  case object TimedOut

  //expectOnce is part of the Aggregator Pattern
  expectOnce
  {
    case GetEntities(pattern) =>      new AggregatorHelper(orgSender, s"../$pattern")
    case _                    =>      context.stop(self) // fail fast
  }

  //create a generic class to handle actors of type T
  class AggregatorHelper[T](originalSender: ActorRef, pattern: String) {

    println("Fired up the Aggregator Helper for "+label)

    val results = ArrayBuffer.empty[T]
    fetchEntities()

    // we wait a second and see if we get all our actor information
    context.system.scheduler.scheduleOnce(1.second, self, TimedOut)

    //expect is part of the Aggregator Pattern
    expect {
      case TimedOut =>
        // aggregator has timed out while searching for the pattern passed in
        collect(force = true)
    }

    def fetchEntities() {
      //go though each entity in the pattern and request them to give us there data of Type T
      context.actorSelection(pattern) ! Give()
      expect {
        case AggregateReply(g) ⇒
          results += g.asInstanceOf[T]
          collect()
      }
    }

    def collect(force: Boolean = false)
    {
      if (results.size == size || force)
      {
        val asList = results.toList // Make sure it becomes immutable
        //Send the actor who requested the data - all the collected Data objects
        originalSender ! AggregatedList(label, asList)
        context.stop(self)
      }
    }
  }
}








//lets create actors that the AggregatorInterface can add / remove / track
object AggregatorActor {
  def props[T](data: T, key: String): Props = Props(new AggregatorActor(data, key))
}

class AggregatorActor[T](var data: T, key: String) extends Actor with ActorLogging {
  import AggregatorInterface._

  println("Created Aggregator Actor for Data Object "+data.toString)

  def receive = {
    //The AggregatorInterface has asked us for our Data - send it back to him
  case Give() =>
      sender ! AggregateReply(data)

    //can do update commands to the data also if needed
  }
}




object AggregatorActorManager {
  case class AddItem(entity : ObjectWithKey)
  case class RemoveItem(key : String)
  case class GetAllItems()
  def props() = Props(new AggregatorActorManager())
}

//make sure the objects we work with have a unique key "extends ObjectWithKey"
class AggregatorActorManager[T >: ObjectWithKey]() extends Actor with ActorLogging {

  var currentCount = 0
  def AddActor(entity : ObjectWithKey) =
  {
    //add new actor and watch for failure
    context.watch(context.actorOf(AggregatorActor.props(entity, entity.getKey), entity.getKey))
    currentCount = currentCount +1
  }

  def RemoveActor(key : String) =
  {
    currentCount = currentCount -1  }

  def GetAllActors() =
  {
    //ask the aggregator to return "currentCount" rows to "self" and label the request "Request1", request all entites that match the "all" string
    context.actorOf(AggregatorInterface.props(self, currentCount, "Request1")) ! GetEntities(AggregatorInterface.all)

  }


   def receive = {
     case AggregatedList(label, list) =>
       println("We got all actors in our Aggregator -> " + list.toString)

     case AddItem(entity: ObjectWithKey) => {
       println("Received AddItem Request")
       AddActor(entity)
     }

     case RemoveItem(key: String) => {
       RemoveActor(key)
     }

     case GetAllItems() =>
       println("Received GetAllItems Request")
       GetAllActors()
   }
 }



class AggregatorTest extends TestKit(ActorSystem("AggregatorActorManagerTest"))
  with ImplicitSender
  with FlatSpecLike
  with Matchers
  with BeforeAndAfterAll {

  val aggregatorActorManager = AggregatorActorManager.props()
  val actorUnderTest = system.actorOf(aggregatorActorManager, "AggregatorActorManager")

  behavior of "The Manger is to utilize the Aggregator"
  it should "Use the Aggregator to Add and get Actors with our data Objects" in {

    actorUnderTest ! AddItem(new MyDataObject("test", "MyKey"))

    //wait for the actor to be created (we could do this on a reply bases)
    Thread.sleep(500)
    actorUnderTest ! GetAllItems()

    expectNoMsg(20 seconds)
  }


  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }
}

 

And this will print out

 

Received AddItem Request
Created Aggregator Actor for Data Object MyDataObject(test,MyKey)
Received GetAllItems Request
Fired up the Aggregator Helper for Request1
We got all actors in our Aggregator -> List(MyDataObject(test,MyKey))

Comments are closed. Please see front page on how to contact me