aws partner logo
ibm partner logo
googlecloud partner logo
microsoft partner logo
Partners
Blogs

Easy concurrent programming with Akka

Concurrent, Parallel or distributed applications always take a lot of efforts to create . While doing multithreading you must have noticed, how much hurdle it takes to create it. You have to take care of all sort of implementing Runnable interface, serialization in methods, handle dead-lock occurrence and lots other stuffs. And then there is some toolkit like Akka, which gives you a easy and convenient way of doing the same. The Actor Model of Akka raises the abstraction level and provide a better platform to build correct, concurrent, and scalable applications. Now what is actor model? We will come to that slowly, but let’s first have a look on what are the hurdles that takes while doing this concurrent programming conventionally. Lets take the example of banking, money are withdraw, deposit and transfer from the accounts. Below is the code how we would do that sequentially.

class Account{ private var balance =0 def deposit(amount: Int): Unit ={ if(amount>0) balance = balance+amount } def withdraw(amount: Int): Int ={	val b = balance	if(0< amont && amount <=balance ){	balance = balance-amount balance	}else{	throw new Error("insufficient funds")	} }
}
Now suppose, the balance in the account is 10000, both the man(two different threads) tries to withdraw two different amounts, 5000 and 3000. Now how the program will respond to it? First default it will check the balance of the account. As both of them tries to withdraw it at the same time, both of them will get the balance as 10000, but when they will try to withdraw the amount, the balance by one of them will be 5000 and one will be 7000. Whoever will be ended second will be the resultant balance. Therefore clearly if it happens either the account of holder will go bankrupt or will never go out of money. Then how do we solve it?The answer is we will use synchronization here to perform the task. Below is the code which uses synchronization in it.
class Account{ private var balance =0 def deposit(amount: Int): Unit = this.synchronized{ if(amount>0) balance = balance+amount } def withdraw(amount: Int): Int = this.synchronized{ if(0<amount && amount <=balance){ balance = balance-amount balance }else throw new Error("Insufficient funds") }
}
Threads take a lock on object level therefore when we do so, we won’t get the previous problem as when one thread is trying to access the withdraw method (which is synchronized) other won’t be able to get access to it. But what about the case when try to transfer the money, how will we do that, below is the code how will we do that, and will see what sort of problem could arise even after that.
def transfer(from: Account, to: Account, amount: Int) : Unit ={ from.synchronized{ to.synchronized{ from.withdraw(amount) to.deposit(amount) } }
}
For transferring the money what we need to withdraw the money from one account and need to deposit the money into another account. But as one might again access the account in between the withdraw and deposit, both ‘from’ and ‘to’ object need to be in block state. Here we first take the lock on ‘from’ and then on ‘to’, and in between we do our withdraw and deposit and the transfer is done. Now if we take a look closely we will find that, there is a case of dead-lock. Suppose one thread tries to transfer the amount from accountA to accountB, and at the same time another one tries to do the transfer from accountB to accountA, now in this case, the first thread will take the lock on accountA and but it won’t get the lock on accountB as the second thread has already taken it. now both thread will wait for one another to release their locks and ends with dead-lock. In case of java, to solve the problem we would have use the thread pool as the executor framework, which is based on Producer consumer design pattern, where application thread produces task and worker thread consumers or execute those task, So it also suffers with limitation of Producer consumer task like if production speed is substantially higher than consumption than you may run OutOfMemory because of queued task, of course only if your queue is unbounded. But still it has to go through the blocking synchronization which is definitely the cause of dead-lock, bad for CPU utilization, couples the sender and receiver as a synchronous communication. Now let’s take a look at Akka. Akka’s Actor gives you asynchronous, non-blocking and highly performant event-driven programming model. Therefore we can avoid the dead-lock, better utilization for CPU, and no couple between sender and receiver. We will go through the codings of the banking problem but let’s go through the basics of Actor, actor systems and message passing between sender and receiver and other features of akka. Akka provides scalable real-time transaction processing. Akka is an unified runtime and programming model for, Scale up (Concurrency), Scale out (Remoting), Fault tolerance. The core of Akka, akka-actor, is very small and easily dropped into an existing project to keep asynchronicity and lockless concurrency without hassle. Actor classes are implemented by extending the Actor class and implementing the receive method. The receive method should define a series of case statements (which has the type PartialFunction[Any, Unit]) that defines which messages your Actor can handle, using standard Scala pattern matching, along with the implementation of how the messages should be processed. Here is an example:
import akka.actor.Actor
import akka.actor.Props
import akka.event.Logging
class MyActor extends Actor {
val log = Logging(context.system, this)
def receive = { case "test" ⇒ log.info("received test") case _ ⇒ log.info("received unknown message")
}
To know more about akka please go to http://doc.akka.io/docs/akka/2.1.0/scala/actors.html. Actors are objects which encapsulate state and behavior, they communicate exclusively by exchanging messages which are placed into the recipient’s mailbox.Like in an economic organization, actors naturally form hierarchies. One actor, which is to oversee a certain function in the program might want to split up its task into smaller, more manageable pieces. For this purpose it starts child actors which it supervises. For more information on Actor system visit http://doc.akka.io/docs/akka/2.1.0/general/actor-systems.htmlActor communicates through messages. The only way to get the actor’s state or behaviour is by messages. Message can be sent to know addresses via ActorRef. Actor knows its own address, which helps to send messages to other actors and can tell them where to reply(Similarly it can send other ActorRef within the message as well). An actor can create a new actor within it, but that doesn’t mean that it can call its methods directly, it has to go through the message passing technique only. While creating a new actor it gets its ActorRef and through that it sends the message to do the task. Actors are like human brains, independently works. The only way they communicate with each other is via messages just like human brains, can not read other minds, but definitely can ask them what’s going on their mind.They do not need any global synchronization for the steps they do, they can run concurrently. It process the messages sequentially, hence it gets the benefits of synchronization and avoid the disadvantages of blocking as the messages are enqueued. Now let’s solve the banking problem through akka actors. Here we have created a object called Amount and in it, there are case classes which is used for sending messages, carries other useful values for processing. In the below example Deposit carries amount, which changes the state of the actor when it processed.
object Amount{
case class Deposit(amount: BigInt)
case class Withdraw(amount: BigInt)
case class Transfer(from: ActorRef, to: ActorRef, amount: BigInt)
}
Here we have the Account class which is an actor(as it extends Actor), has the main methods of depositing and Withdrawing the money.
class Account extends Actor{
import Amount._
var balanace = BigInt(0) //the balance of the particular account
def receive = {
case Deposit(amount) =>{ // when a message for Deposit will send
balance +=amount //this portion of the code will run.
sender ! “Done”
}
case Withdraw(amount) =>{ // when a message for Withdraw will send
if(balance > amount){ //this portion of the code will run.
balance -=amount //by first checking if the amount is
sender ! “Done” //withdrawable
}else sender ! “Failed”
}
case _ => sender ! “Failed”
}
}
Actors have the reference of the sender from where it got the message to process, and can use the reference to send a message back. However it the sender actor should have the message receiver in itself as well. Here we are sending a successful message after depositing or withdrawing (sender ! “Done”) and sends a fail message if somehow it could not do the operations the message is send for (sender ! “Failed”). Now let’s come to the money transfer part. We have created a class called TransferAmount for it. I have giving some description in line by as comments so that it would be easy to understand the part.
class TransferAmount extends Actor{	// Creating the TransferAmount as Actor
import Amount._	// importing Amount so that the message	// objects can be used easily	def receive = {	// While sending a message to an Actor first	// it goes to the receive method executes
// messages according to the cases	case Transfer(from, to, amount) =>{	from ! Withdraw(amount)	context.become(waitForWithdraw(to, amount, from))
}	}
/****	The above portion will execute for the message Transfer where the amount to be
transfered, from whom it need to be withdraw and to whom it need to be deposit are
sent. the from and have to reference of ActorRef, so we can send message to Account
actor through this two. Here from ! Withdraw(amount) sends a message to the Account	actor to withdraw a amount, and then it changes it states through context.become, and	wait for the withdraw operation to be performed.
****/	def waitForWithdraw(to: ActorRef, amount: BigInt, from: ActorRef): Receive ={	case “Done” =>{	to ! Deposit(amount)	context.become(waitForDeposit(from))	}	case “Failed” =>{	from ! “Failed”	context.stop(self)	}	}
/****	In the waitForWithdraw method it takes three parameters, two of them as ActorRef i.e.
from whom to whom the transfered would happen, and the amount that need to be
transferred. If the Account Actor sends a “Done” message it send a message to the other
Account to deposit the amount in it, then wait for the deposit operation to
be finished.
****/	def waitForDeposit(from: ActorRef): Receive ={	case “Done”=>{	from ! “Done”	context.stop(self)
}	}
/****	When the account sends back the message as Done for depositing money, the
waitForDeposit method receive it and sends the a Done message to the sender.
****/
}
We can execute or test the above mentioned operations through the Main class mentioned below
class Main extends Actor{	import Amount._	val accountA = context.actorOf(Props[Account, "accountA"])	val accountB = context.actorOf(Props[Account, "accountB"])	/***	Here we have created two accounts from and to whom the transaction will be
happen.	***/	accountA ! Deposit(1000)	//depositing an amount of 1000 in the first account	def receive ={	case “Done” => transfer(500)	}	// When this actor will receive a message of Done it will call the method transfer	def transfer(amount: BigInt) : Unit ={	val transfer= context.actorOf(Props[TransferAmount], "transfer")	//here we have created the Actor TransferAmount and transfer is
//the ActorRef, using which the message will be sent to it.	transfer ! Transfer(accountA, accountB, amount)	//here we are sending the amount and from whom and to whom
//the amount need to be transferred are sent as arguments	context.become(	case “Done” =>{	sender ! "success"	context.stop(self)	}	)	}
}
Unlike multithreading, it is very easy to write unit test cases for Actors . (have a look a see the solutions and opinions about the unit testing of multithreading http://stackoverflow.com/questions/12159/how-should-i-unit-test-threaded-code). Below are
implicit val system = ActorSystem("TestSys")	// Here we are creating a
//ActorSystem and keeping it as //implicit
val transAmnt= system.actorOf(Props[TransferAmount])	// Created the Actor
val p = TestProbe()	//TestProbe is a class the akka Testkit
p.send(transAmnt, "Done")	// now we are sending the message to
//TransferAmount to do the operation
p.expectMsg("success")	//Here are expecting a message call success which
//would be sent as the result from the actor
system.shutdown()	//and finally shutting down the actor system
There are lots of features in akka which we can use in our applications and can make our application concurrency and fault tolerance precisely. I have just wrote about very basic of it. Unit test case mentioned here is a simple example of it, we can do the testing in a very precise way with the akka testkit.

Category : App Development