Akka Actors

P Madanasekaran

Introduction

As mentioned in earlier articles, Akka actor is influenced by Erlang process and is suitable for use-cases where in the domain-model can be broken into small classes that share no data but communicate only through messages. The main difference is that in Erlang the processes are spawned from functions and in Akka, actors are objects, like anything in Scala including functions. In this model one thread is used to handle many processes/actors. So it is able to scale well to handle the “C10K” problem- explosion in the number of mobile users of the application. The number of threads used depends on the number of cores in the system. Messages received are not immediately executed but queued in the mail-box of the actor by the Dispatcher. The Dispatcher makes use of the services of java.util. concurrency.ExecutorService, which is available from JDK-5 and abstracts out lower level thread- management- details, such as locks, from the user. Dispatcher carries out the following  jobs 1) controls and co-ordinates message dispatching to mail-boxes of actors 2)  allots threads /resources and time for execution of actors when there are messages in its mail-box 3 ) Dispatcher re-allots the thread to another ready actor, once an actor completes processing  the messages in the mail-box. Look at the figure-1 below. Dispatcher runs in a thread and allots threads in the thread pool to actors which have messages in their mail-boxes.

Thus the threads/resources are efficiently managed for the concurrent/parallel execution of actors. We will see more about it later and now see some samples.

Sample

We will create the sample in the Scala IDE-4.4 and it comes with Scala version 2.11. The IDE has plug-in for SBT build cum dependency management tool. But we will use the build facilities in the IDE, which uses the ANT build tool underneath.  So we have to down load separately two jar files.1) akka-actor_2.11-2.3.15.jar 2)config-1.3.10.jar.In the IDE choose File à New à Scala Project and in the New Scala Project wizard give the Project Name as lrnActor and click Finish. The project skeleton will be created by the IDE.    Go to the workspace/lrnActor folder and create a New à Folder and name it as lib. Copy the above two jar files you downloaded into it. Now you can go back to the IDE and Right click on lrnActor in Project Explorer and choose Refresh. Now the lib folder will appear in the Project Explorer. We have to add the jar files it contains to our Build Path. Right click on lrnActor and choose Build path à Configure Build Path. In the Properties for lrnActor wizard if click Libraries tab the wizard will appear as in Figure-2 as under:

Figure-2

Click on AddJARs  button. In the JAR Selection wizard that appears if you expand lrnActor and then  lib, it will appear as in Figure-3 below:

Figure-3
 Select the two jars in it and click OK. Now the jars will appear in the Properties for lrnActor wizard. Click OK.
Now our build environment is ready. Though you can put your Scala code under src, it is good practice to create a package under it and put your code there. Right click on src and choose New à Package. The wizard will suggest a name and you change it to something more familiar, say ram and click Finish.

Scala App

If you right click on ram and choose New you can see a number of options. If you click on Scala App you can see the New File Wizard as in Figure-4 below:

Figure-4
You can fill up the Name as ram.Main and click Finish. The generated code will look as under:

ram/Main.scala

package ram

object Main extends App {

}

A Scala object with a main() method is the entry point of a Scala application. But there are two ways to create this launching point for our application: 1) to define an object that extends the App trait, 2) to define an object with a properly defined main method. By selecting New à Scala App we made the first option. Hence the generated code defines an object that extends the App trait. We need not provide a main() in our code, as it will use main() that comes with App trait. Had we chosen New àScala Object, then we have to define main() with the following signature:
def main(args: Array[String]): Unit

By choosing Scala App we have avoided this and our object will use the main() built in the App trait that it extends.

Actor code

Main.scala

package ram
import akka.actor._
case class Greet(name: String)  //-----4)
class HelloActor extends Actor{  //---------1)
def receive = {
case Greet(name) =>println(s"You are great $name")
case "hello" => println("How are You")
case _ => println("Fine")
}

}
object Main extends App {
val system = ActorSystem("HelloSystem")     //---------2)
val helloActor = system.actorOf(Props[HelloActor], name="helloActor") //------3)
helloActor ! Greet("Venkat")  //--------5)
helloActor ! "hello" //----6)
helloActor ! "hi"  //-------7)
} 

  • An actor object is instantiated from a class that extends Actor trait. The class should implement the abstract method  receive() in Actor trait and it  implements  the behaviour of the actor. The receive message loop is exhaustive and you need to provide a pattern match for all messages that it can accept and to handle unknown messages you need to have a default case.
  • Before creating an actor, an actor system must be created and this process enables the creation of a hierarchical structure of actors. We can pass any arbitrary string to its constructor.
  • Actors are created by passing a Props instance to the actorOf factory method which is available in ActorSystem. The actorOf() creates an actor but returns an ActorRef object which acts as a proxy to the actor.
  • In the receive() method you can pattern match using “strings”. But the suggested practice is to use immutable case class (which takes parameters) or case object when there are no parameters.
  • The !(Tell) method here is used to send messages to the actor. This is a Fire and forget one-way message model, where the producer of the message expects no reply from the consumer. The message is sent asynchronously and the method returns immediately. But when a reply from the actor to the sender is expected, there is another method ask(?) and it combines Futures with the actor model. The message sent now is pattern matched to case Greet(name)
  • This is pattern matched to case “hello”
  • This is pattern matched to default case.

See the figure below which explains various steps involved.

  1. A customer here, Main () creates and uses the ActorSystem. This is the first step before sending a message to an actor.
  2. In Akka, an actor is created using the actorOf(...) function call. Akka doesn't return the actual actor but instead returns a reference to the actor reference helloActor for safety. The message is sent to this actor-reference.
  3. The actor reference helloActor sends this message to the Dispatcher.
  4. Dispatcher then enqueues the message into the helloActor’s actor mailbox.
  5. Dispatcher then puts the mailbox on the thread.
  6. Finally, the mailbox dequeues and sends the message to the helloActor’s receive method.

Run the application
Right click on Main.scala and choose Run As à Scala Application. You can see the following output:

You are great Venkat // output of first case
How are You //second case
Fine  // default case

One more sample

In this sample Waiter actor is created within ActorSystem and the Chef actor is created within the context of Waiter actor, as its child actor. There are facilities in Akka to watch or Supervise the child actors. In this sample they are not made use of.

ram/Main.scala

package ram
import akka.actor.{ Actor, ActorRef, Props, ActorSystem }

import Messages._

class Chef extends Actor{
def receive = {
case ExtraMasal => println("I am stuffing Extra Masal")
case OnionDosa => println("Onion Dosa Ready!")
}
}

class Waiter extends Actor {

val chef =
context.actorOf(Props[Chef], "Chef")

def receive = {
case MasalDosa   =>
println("I have a Masala dosa request with extra masal!")
chef ! ExtraMasal

case OnionDosa =>
println("I have a request for Onion dosa!")
chef ! OnionDosa
case DosaException    =>
println("Dosa burnt!")
}
}
object Customer extends App {
val system = ActorSystem("Dosa")
val dosa: ActorRef =  system.actorOf(Props[Waiter],
"Waiter")

dosa ! MasalDosa
dosa ! OnionDosa

system.shutdown()
}



The code for Messages Object which is imported in the above code is given below:

ram/Messages.scala

package ram

object Messages {
case object OnionDosa
case object ExtraMasal
case object MasalDosa
case object DosaException
}



I have a Masala dosa request with extra masal!
I have a request for Onion dosa!
I am stuffing Extra Masal
Onion Dosa Ready!

Conclusion

The important point about the Akka is that actor is only one of the many concurrency constructs like Future, Streams etc available in it and it gives you a choice to use the construct suitable for your use-case. The actor is asynchronous and non-blocking and event-driven.  Sending a message and processing it is done in an asynchronous and non-blocking fashion. The dispatcher will allot one of the available threads to the actor if it has a message in its Mailbox. This is closely related to (and uses, behind the scenes) Java's java.util.concurrent.ExecutorService facility. The purpose is to abstract out how threads are managed and how actors are given time and resources in the form of these threads in which to perform their tasks. The mailbox uses this thread to send the message to the actor’s receive() method and the receive() is executed by this thread. The actor blocks the thread to which it is allocated for as long as it continues to process messages- the default configuration specifies the maximum number of messages it can process before releasing the thread. Though this doesn’t block the senders of the messages or the dispatcher, lengthy operations can degrade overall performance, as all the other actors have to be scheduled for processing messages on one of the remaining threads. Akka by-default uses fork-join framework in Java and efficiently manages the threads. Threads available for execution depends on min, max and factor for parallelism specified in the default specification and the available cores in the system. The Fork/Join framework is well suited for actor-based workflows because of the work-stealing approach it uses to keep all threads in the pool busy when possible.

After the success story of WhatsApp and the 2 million simultaneous connections handled by it in a single node, a lot of debate is going on about Akka vs Erlang OTP. Erlang was created for a domain for which concurrency, distributed architecture and fault-tolerance are requirements and it provided support for the same in the language and the VM. Hence the programming model is simple. In Akka the functionalities have to be provided in the library.  The Dispatcher in Akka library provides the functionality provided by Scheduler in Erlang VM. It en-queues the message received to the mail-box of actor. As mentioned above it utilizes underneath java.util.concurrent.ExecutorService and allots a thread /resources and time for an actor and its mail-box to execute. A thread is allotted to an actor only when there are messages in the mailbox. Once the message is processed, the thread is re-allotted to the next ready actor or released to the pool.. That is, a single thread handles many actors; actors are higher level abstractions that handle concurrency in a neater way than multi-threaded programming. As Fork-Join framework, available in Java from JDK-7, is used, it obtains threads depending on the cores in the system and parallelism information in the configuration file and efficiently manages them using a “work-stealing approach”. You can recall that the same approach is used by Erlang schedulers and it does not allow any thread to remain idle, while other threads are over-loaded. But the thread used to handle many actors in Akka is an OS thread. On the other hand in Erlang light weight user- threads execute processes. As JVM does not have native support for user- light-weight threads, Quasar framework  uses agent model to provide Fiber/light-weight- user threads and its actors are built upon them and we will see the same some time later.








}