Futures in Scala/Akka

Madanasekaran

In a number of articles earlier,  we have  discussed “C10K” problem- explosion in the number of mobile clients- and the inability of multi-threading/ “one –thread-for one connection” model of OOP languages - C#, Java and Ruby- to scale and handle it. We also discussed how the JavaScript framework Node.js and the languages Erlang and Go, wherein one thread serviced numerous connections were able to handle the above “C10K” problem and got into lime-light. The newer frameworks on JVM were influenced by the “sub-thread” level concurrency management models of these framework/languages –event loop and events, process/actor and fiber and channel. Look at the Figure-1 below which shows the Reactive frameworks on JVM-Vertx, Akka, Quasar and Clojure Core.Async- and how they are influenced by these languages/frameworks. Tony Hoare—of “Quicksort” fame—in his 1978 paper Communicating Sequential Processes provided the theoretical/mathematical basis for the abstractions Process and Channels and the messages as the medium of communication.

In Erlang, which is closer to the above paper, one process sends message directly to the PID or registered name of another process. Channels in Go decouples the producer from the consumer of the message. Node.js brought the event-based concurrency model found on the Browser to the Server. Though Vertx exhibits the influence of all the three, its resemblance to Node.js is more. As in Node.js, it is very easy to create a web-application or Rest Service backed by a database with Vert.x .Akka resembles Erlang more, though Quasar was created on the ground that Akka implementation of actors is not that close enough to Erlang Process. Quasar supports Java and Kotlin languages and its Clojure version is Pulsar. Vert.x has moved away from the strict actor-model by doing away with the concept of a mail-box for an actor and its event-bus resembles the channel. If your use- case can be broken into small pieces, each of which can be handled by a message passing actor, Erlang/Akka model appears to be a better fit. One actor can directly send the result of its computation to another actor through messages. The supervisor hierarchy is another advantage. For Rest Services Akka requires Spray ,for web requires Play and for sending messages to external systems, a broker, like ZeroMQ.  Vert.x does not need any of these, as its built-in features can take care of the functionalities. Rich Hickey, the creator of Clojure, was not enthusiastic about actor model, which in his view couples the producer and consumer of a message. Hence he provided for Agent in Clojure and its library “core.Async” is similar to Go channel. We will see reactive programming with Clojure later and now we will see Akka.

Akka :

It got the JAX innovation award for the best open source technology in 2015.(Vert.x got in 2014). Akka is a framework written in Scala and for Scala, though it has a Java API also. It is a framework built on top of “java.util.concurrent” package to simplify concurrency programming on JVM and make it more scalable. Concurrency, as you is about is having several tasks running simultaneously  As shown in the Figure-1 above, it is influenced by the concurrency model of Erlang –process/actor-and it provides concurrency, scalability and fault-tolerance similar to what is found in Erlang OTP.  Akka has a number of concurrency constructs apart from Actors, like Future, Agent, STM etc. But the most-known ones are Future and Actor and we will have a re-look at them. Future provides a simpler way to run your algorithm concurrently. It is similar to making a call to a higher order function that will send result at some point in future. You can pass a named function or a block of code while calling Future and Scala provides several ways, as we are going to see, to obtain the results of future computations and use them. The “java.util.concurrent” provides an elementary futures implementation; Google's Guava provides a slightly better implementation. The Akka and Twitter implementations are much more advanced. For example, transformations and operations on collections of futures are supported, along with numerous asynchronous callbacks. Akka futures are integrated with Akka actors for distributed computing. Now Java-8 has introduced Completable Future which have these features.  Akka Futures also have become part of Scala distribution and hence Scala Futures. To the query, when to use a future, the answer is that in general, futures are preferred for one-off, uncoordinated, stateless jobs, which can range in duration from a web request of a few minutes to data processing jobs of hours or more. Actors are generally preferred for coordinated or stateful activities. After getting the results from futures,it is for you to compose the results of these futures. An actor can contain state, but the actor should not share its state and interact with other actors only through messages. We will see more about actors later. Future can be used separately or can be integrated with actors. As shown in the figure-2 below both Actors and Futures are built on top of java.util.concurrent.

Samples with Eclipse Scala IDE

Eclipse Scala IDE is available as a zip file and it is a bundle consisting of Eclipse Luna, Scala IDE, Scala version 2.11.8, sbt etc. You should have JDK6 or above installed in your system. You can unzip the bundle. Eclipse requires a Workspace. Create a new folder and rename it for example scalaApps and we will use this as workspace for our Scala applications.

Within the eclipse folder of your installation you can see

eclipse.exe

You can click on it and start Eclipse and see the Workspace Launcher as in Figure-3 below: ( Browse for scalaApps you created )

If you click “OK” the Scala IDE will appear as in Figure-4 as under:

Now you can Choose File->New -> Scala Project. In the New Scala Project wizard that follows as in Figure-5, fill up the Project Name and click Finish.

Figure-5

You can right click on “src” folder and choose New -> Package and in the wizard that follows fill up the name as “lrn” and Click Finish. Now right click on “lrn” and choose New -> Scala App

Give the name as Main and click Finish. The code for Main.scala can be modified as under:

lrn/Main.scala

import scala.concurrent._ //---------1)
import scala.util.{Success,Failure} //--------4)
import ExecutionContext.Implicits.global  //----3)
object Main extends App {  //-------2)
Future{ 1 +
2 } onComplete {    //-------4)
case Success(x) => println(x)   //-----5)
case Failure(ex) => println("${ex}")
}
}

  • The implementation for future has been moved from Akka to scala.concurrent package from scala version 2.10.The implementation for Actor is expected to move from Akka to scala.concurrent in later versions.
  • As object Main extends “App” trait in Scala, we need not provide main() method . Trait Application declares a main method of the appropriate signature, which our singleton object inherits, making it usable as a Scala application.
  • Future requires an Execution context/threadpool and it will be executed on a thread other than the calling thread.

        4) For side-effects Future Trait provides the OnComplete, OnSuccess,and OnFailure callbacks. A callback method is executed by some thread, sometime after the future is completed. Scala documentation says
“There is no guarantee that it will be called by the thread that completed the future or the thread that created the callback. The order in which callbacks are executed is not guaranteed.

  • For pattern matching case objects Success and Failure in scala.util are used.

You can right click on Main.scala and choose Run as à Scala Application and see the result.
Futures are asynchronous and they take a block of code as input and returns a Future[T]. Future requires an ExecutionContext/threadpool to execute it. Future is executed on a separate thread and the calling thread is released immediately. Future can be seen as a placeholder to hold the value returned by the code block passed to it , when the code block is executed. That is Future is a placeholder for a value that may become available later. It is comparable to a key of car that may be manufactured sometime later. It is the job of Promise to write the value to the future/placeholder when the code-bock passed to a future is executed. Future[T], Option[T], Try[T] all are monads in Scala. Remember that Monads, for the language/library designers, are data structures that satisfy some “algebraic” laws and they should provide certain operations like map, filter and flatMap while defining them. For us, what is more important is that it handles, in the words of Eric Meijer, “the unhappy path for us”.  You need not explicitly “catch” errors, as the monads will do it for you when you use them. Try will return Success/Failure. Option(otherwise called “maybe monad”) will return Some/None. You need not explicitly code to check for “Null” ,as you do in Java before Java- 8. It is pertinent to point out that these constructs have found their way into Java-8, which has joined the ranks of functional languages. Just remember that the monads give functional languages power to “combine” the abstractions and create a more concise and powerful programming idioms without bothering about the “unhappy” path.

One more sample:


We will see one more sample for Future. The code for the sample is given below:

Main.scala

package lrnfuture
import scala.util._
import scala.concurrent._
import scala.util.{Success,Failure}
import ExecutionContext.Implicits.global

object Main extends App { 
// this function will find the maximum of two numbers
def max(x: Int, y: Int): Int = {
if (x > y) x
else y
}
// two random numbers are generated
val a = Random.nextInt(100)
println(a)
val b = Random.nextInt()
println(b)
// one way of creating a Future; just like calling a function and using the return value
val x: Future[Int]= Future{
max(a,b) // the generated Randoms are passed here to max()
}
x.onComplete {
case Success(e) => println(e)
case Failure(excep) => excep.printStackTrace()
}           

Thread.sleep(2000)  // --------1)
}




1) The sleep statement at the end of the code is used so the program will keep running while the Future is being calculated. We won’t need this in real-world programs, but in small example programs like this, we have to keep the JVM running.

Composing Futures:

One of the features that distinguish a Future from an actor is that Futures are composable while actors are not. That is we can compose the results from future computations. As the return values are also  futures ,we can use map and flatMap operations on them but we will use instead  the “syntactic” sugar “for”, as  we will see in the samples below:

package futurefor
import scala.concurrent._
import scala.util.{Success,Failure}
import ExecutionContext.Implicits.global
object Main extends App{

 val f = for {                             //----1)
a <- Future(20 / 2) // you can use () in place of curly braces
b <- Future(a + 2)
c <- Future(a - 2)
if c > 3 // Future.filter
} yield {
b * c
}
Thread.sleep(1000) // without this you may see only None as f.value
println(f.value)  //-----2)
f foreach println //----3)
Thread.sleep(2000)
}

  • For comprehension when combining monads uses under the hood flatMap , Map and Filter. Future is a monad as mentioned earlier.
  • The output is Some(Success(96))- you can see that value() returns an Option[Try[T]].
  • The output is 96 forEach is able to extract the computed value from the future.

The Futures created within for are executed only sequentially. If you want parallel computation, create Futures first and then combine them together within “for expression”, as shown in the next sample.

package learn
import scala.concurrent._
import ExecutionContext.Implicits.global

case class Profile(name: String, score: Double, friends: List[String])
object Main  {

def Composition: Future[Profile] = {  // --------1)

   // NOTE: We declare these up here so they start in parallel.

val fName = Future {"Venkat" }  //-----------2)
val fScore = Future { 50.0}
val fFriends = Future{ List("Vekat","Subbu","Ganesh") }

val f =  for {       //----------------3)
rName <- fName
rScore <- fScore
rFriends <- fFriends
} yield {
Profile(rName, rScore,rFriends)
}
f // return value of the function. The key-word return is not required
}
def main(args: Array[String]):Unit ={
val f1 = Composition  //  ----4)
f1 foreach println
Thread.sleep(2000)
}
}

  • We are defining a function to return a Future of the type Profile.
  • Though Futures run on separate threads, if you create Futures within “for” comprehension the code will execute only sequentially.
  • The real benefit comes only when the Futures were created first and then combined together within “for expression”.  – another way, in fact a better way of combining futures is the new “Async-await” block.
  • Function is called and its is value assigned to a val. System can infer its type

Conclusion

We saw “Future”, a concurrency construct in Scala. It was moved from Akka to Scala from version 2.10.   A Future[T] is a container that runs a computation concurrently, and at some future time may return either (a) a result of type T or (b) an exception. Computation of our algorithm starts at some nondeterministic time after the future is created, running on a thread assigned to it by the execution context. The result of the computation becomes available once the future completes. When it returns a result, a future is said to be completed. It may either be successfully completed, or failed. It is the Promise, the companion object of Future, that is responsible for completing a Future and putting the computed value in the Future.  It is executed on a thread different from the one in which the calling method(main()) executes. As shown in the samples, a future provides an interface for reading the value that has been computed. This includes callback methods and other approaches, such as for comprehension, map, flatMap, etc. Future requires an ExecutionContext to run. An ExecutionContext executes a task it’s given. It is like a thread pool. The default global execution context “ExecutionContext.Implicits.global  “ was used by our samples. This is not required when we use Future with actors, as the default dispatcher in akka library will provide the execution context. The Callback methods are called asynchronously when a future completes. The callback methods onComplete, onSuccess, onFailure, are provided in the Future Trait. We will see more of Future and Actors in Scala/Akka in the coming articles.








}