Introduction à Akka

2 comments
Dans ce billet, je vous présente Akka, un framework pour écrire des applications concurrentes, scalabes et robustes. Nous allons écrire une petite application qui reprend l'exemple de mon billet sur les expressions régulières. Il s'agit d'une application qui prend en entrée un ensemble de fichiers et les parse pour en extraire des numéros de téléphone suivant le plan de numérotation français. L'API d'Akka est disponible à la fois en Java et en Scala.

Quelques mots sur Akka et sa philosophie


Les concepteurs d'Akka sont partis du constat qu'écrire une application concurrente et scalable est difficile, manipuler directement des threads et des verrous (locks) est difficile pour nous fournir des abstractions comme les acteurs ou la STM (Software Transactional Memory) pour aider dans cette tâche. En outre Akka adopte une attitude particulièrement intéressante de la gestion des erreurs en disant la chose suivante:

  • Les erreurs sont inévitables - apprenons à les gérer au lieu d'essayer de les empêcher d'arriver. Ce qui donne dans la langue de Shakespeare: "Let it crash or embrace failure". En savoir plus: StackOverflow ou c2.com.
Cette philosophie, plus des outils comme l'API des Acteurs et celle de la STM simplifient grandement l'écriture d'applications concurrentes, scalables et robustes avec Akka.
Dans cet article nous mettrons en pratique le modèle des acteurs (les autres points seraient traités dans d'autres billets à venir).

Les acteurs


Le modèle des acteurs offre un modèle "simple" de faire communiquer différents processus par échange asynchrone de messages grâce à une abstraction plus haut niveau que celle que nous rencontrons avec l'API des threads et des verrous. Nous le verrons avec notre application le modèle des acteurs est particulièrement adapté à l'exécution de tâches indépendantes. Une application conçue avec ce modèle est constituée d'acteurs qui communiquent de manière asynchrone par échange de messages immuables. Un acteur n'expose pas son état interne au monde extérieur, il le garde jalousement pour lui seul évitant ainsi la nécessité de synchroniser lors de l'accès à des données partagées et les erreurs courantes comme les "dead locks" ou autres "race conditions". Chaque acteur dispose d'une boîte à messages qui contient les messages que le monde extérieur lui a envoyés. La vie d'un acteur est relativement routinière :  il reçoit des messages, fait des traitements, envoie des messages. C'est presque tout ce qu'il sait faire.
Voici comment implémenter un acteur avec l'API d'Akka: on étend le trait akka.actor.Actor et implémente la méthode receive.

import akka.actor.Actor

class MyActor extends Actor {
  def receive = {
    case "Hello" => println("Hello there!")
    case _ => println("What to do?")
  }
}
Que fait cet acteur? Il affiche "Hello there!" lorsqu'il reçoit le message "Hello". Pour tout autre message différent de "Hello" il affiche "What to do?". Pigé? Oui oui!

Pour créer une instance de notre acteur nous nous servons de la méthode akka.actor.Actor.actorOf. Cette méthode retourne un objet immuable de type akka.actor.ActorRef.

import akka.actor.Actor.actorOf

val myActor = actorOf[MyActor]

On peut aussi créer et démarrer un acteur en même temps:

import akka.actor.Actor.actorOf

val myActor = actorOf[MyActor].start()

Attends! Comment j'envoie un message à mon acteur? Beh il suffit d'invoquer la méthode ! (bang) sur l'objet de type ActorRef obtenu lors de la création de l'instance de notre classe MyActor. Voyons par exemple comment envoyer "Hello" à myActor:

import akka.actor.Actor.actorOf

val myActor = actorOf[MyActor].start()
myActor ! "Hello" //Send "Hello" message to the actor

Une dernière pour la route: la création d'un acteur anonyme:

import akka.actor.Actor
import Actor._

val myActor = actorOf(
  new Actor {
    def receive = {
      case "Hello" => println("Hello Toto!!")
      case _ => println("Oops!")
    }
  }
).start()  

myActor ! "Hello"

Dans l'application d'exemple nous allons implémenter deux types d'acteurs:

  • Un Master qui distribue les tâches entre les Workers et agrège les résultats pour créer le résultat final.
  • Des Workers qui effectuent les tâches c'est-à-dire parsent le contenu des fichiers


La supervision


La philosophie "Let it crash" implique la nécessité de pouvoir détecter les crashes des composants de l'application. En clair nous devons "monitorer", superviser le système et prendre les décisions qui s'imposent en cas de panne d'un ou de plusieurs composants. Pour cela il faut choisir quels composants de l'application doivent être supervisés et quels autres jouent le rôle de superviseurs. Une fois ce choix opéré nous établissons les liaisons entre composants superviseurs et ceux supervisés.
A ce jour, Akka offre deux stratégies de supervision

  • OneForOne: Dans cette stratégie le superviseur redémarre seulement le composant ayant crashé les autres n'étant pas impactés.
  • AllForOne:Ici contrairement à la stratégie précédente lorsqu'un composant supervisé tombe, celui-ci ainsi que tous les autres sont redémarrés.

Comme on peut le voir sur la figure ci-dessus un superviseur peut être, à son tour, supervisé.
Référez-vous à la page d'Akka pour plus d'information.

Le choix de la stratégie de supervision et celui des composants à superviser dépendent grandement du type d'application.

La STM


Un autre outil dans la boite à outils d'Akka est la STM (Software Transactional Memory). La STM offre une alternative à la synchronisation basée sur les verrous. Elle est utile pour contrôler l'accès à une zone mémoire partagée par plusieurs acteurs. Elle traite la mémoire comme une base de données et on retrouve la syntaxe habituelle pour les transactions: begin, commit et rollback en cas de souci. Pour supporter la STM Akka n'est pas parti de zéro, il s'est basé sur le projet Multiverse. C'est un sujet qui reste à creuser pour moi j'en dis pas plus donc.

L'application PhoneNumberSearchApp


Voyons maintenant en image l'application que nous allons développer:

Comment marche cette application? Le client dispose d'un ensemble de fichiers dont il souhaite extraire des numéros de téléphone (je dois avouer que cette application m'a été commandée la société Spamming Inc.). Il soumet donc ces fichiers à notre super application qui accomplit cette tâche si gratifiante. Puisque les fichiers peuvent être parsés indépendamment et parallèlement les uns des autres nous créons une tâche par fichier. Les tâches sont traitées suivant le pattern Master/Worker avec un Master qui distribue les tâches entre les Workers qui qant à eux parsent le contenu des fichiers.


Un mot de Simple Build Tool - SBT

Nous allons utiliser SBT qui est un outil de build pour Scala. Pour avoir avoir une introduction en règle à SBT rendez-vous sur sa page officielle.
Procédez comme suit pour créer le projet SBT:

  1. Créez un répertoire PhoneNumberSearchApp, placez-vous dedans et invoquez la commande sbt.
    moi@pc:~/PhoneNumberSearchApp$ sbt
    
    Puis suivez les instructions.
    Voici l’arborescence du projet que l'on obtient au final.
  2. Nous allons maintenant configurer notre projet SBT en créant sa classe de définition dans le fichier project/build/PhoneNumberSearchAppProject.
    import sbt._
    import de.element34.sbteclipsify._
    
    class PhoneNumberSearchAppProject(info: ProjectInfo) 
        extends DefaultProject(info) 
        with Eclipsify 
        with AkkaProject
    
    Remarquez nous ajoutons en même temps le support d'Akka et d'Eclipify qui est un plugin SBT pour Eclipse.
    Ensuite nous créons la classe project/plugins/Plugins.scala qui contiendra les définitions des plugins Akka et Eclipsify.

    import sbt._
    
    class Plugins(info: ProjectInfo) extends PluginDefinition(info) {
      //Add Eclipsify plugin
      lazy val eclipse = "de.element34" % "sbt-eclipsify" % "0.7.0"
      val akkaRepo = "Akka Repo" at "http:akka.io/repository"
      //Add Akka plugin
      val akkaPlugin = "se.scalablesolutions.akka" % "akka-sbt-plugin" % "1.1"
    }
    
    
  3. Ensuite faite tapez les commandes:
    moi@pc:~/PhoneNumberSearchApp$ sbt update 
    moi@pc:~/PhoneNumberSearchApp$ sbt eclipse
    
    A partir de là vous avez les dépendances Akka et vous pouvez aussi importer votre projet dans Eclipse.


Maintenant les mains dans le cambouis

Je vous sens impatient! Mais voici le moment tant attendu arrivé: le code!! Ça va aller très vite notamment grâce à l'expressivité de Scala.
Nous avons deux types de messages dans notre application: Ceux qui représentent une tâche à exécuter et ceux qui contiennent les résultats fournis par les Workers. Nous définissons les messages comme des case classes.

abstract class Message
case class Task(fileContent: String) extends Message
case class Result(numbers: Set[String]) extends Message
Chaque Worker met les numéros qu'il a trouvés dans une collection de type Set.

Maintenant voici le code des Workers:

/**
* A worker parses a file and returns found phone numbers as
* a set to the master actor.
*/
class Worker extends Actor {
  val phoneRegex = "0[1-9]([ .-]?[0-9]{2}){4}".r
 
  def receive = {
    case Task(fileContent) =>
      println("Worker doing hard work!!...")
      var phoneNumbers = Set[String]()
      
      for(phoneNumber <- phoneRegex findAllIn fileContent) {
        phoneNumbers += phoneNumber
      }
      self reply Result(phoneNumbers)
  }

  override def preStart() = {
    println("=> Worker pre start step!!")
  }

  override def postStop() = {
    println("=> Worker post stop step!!")
  }
}

Vous remarquerez les méthodes preStart() et postStop() qui sont des "hooks" (callbacks) où vous placez du code qui s'exécute avant le démarrage d'un acteur et après son arrêt.

Le Worker extrait le contenu du fichier sous forme de String et lui applique l'expression régulière fournie et renvoie le résultat à l'acteur qui lui a envoyé le message Task. La ligne qui envoie la réponse est:

self reply Result(phoneNumbers)

Le code du Master étant relativement complexe nous l'introduisons petit à petit. Commençons d'abord par la création des Workers.

val workers = Vector.fill(nbOfWorkers)(actorOf[Worker].start())
Ici nous utilisons l'une des méthodes du "companion object" de la classe Vector pour créer et démarrer en même temps autant de Workers qu'on a de tâches à effectuer. Eh oui! Tout cela en une ligne!
Nous en arrivons maintenant au Load Balancer, le composant qui distribue les tâches entre nos Workers. Akka fournit une API qui facilite grandement la création d'un tel composant comme en témoigne la ligne suivante:
val loadBalancer = Routing.loadBalancerActor(CyclicIterator(workers))
La méthode akka.routing.Routing#loadBalancer prend en argument un objet de type akka.routing.InfiniteIterator qui est un itérateur sur un ensemble d'acteurs. Cet objet implémente l'algorithme de distribution des tâches aux Workers. Nous avons choisi akka.routing.CyclicIterator qui implémente le round-robin et convient parfaitement à notre besoin. Si vous avez un algorithme de load balancing particulier pensez à étendre le trait akka.routing.InfiniteIterator.

Le prochain bout de code lit le contenu des fichiers fournis en ligne de commande à notre application, crée les tâches puis crée et démarre l'acteur Master avant de lui envoyer les tâches.

val tasks =
   for {
    file <- args
    fileContent = Source.fromFile(file).mkString
   } yield Task(fileContent)

val master = actorOf(new Master(tasks.length)).start()

val response = master !! tasks

Remarquez l'usage du double bang (!!) pour envoyer les tâches au Master. Cette méthode permet d'envoyer un message à un acteur et bloque l'appelant sur l'attente de la réponse.

Le Master forwarde les tâches aux Workers en passant par le Load Balancer. Il conserve aussi une référence sur l'expéditeur du message afin de pouvoir lui répondre plus tard quand toutes les réponses intermédiaires seront arrivées:

sender = self.senderFuture

for (task <- tasks) {
  loadBalancer ! task
}
Voici ce que fait le Master à la réception des résultats intermédiaires:
def receive = {
  case Result(intermediateNumbers) =>
    phoneNumbers = phoneNumbers ++ intermediateNumbers
    responseCount += 1

    if (responseCount == nbOfWorkers) {
      senderFuture.foreach(_.completeWithResult(phoneNumbers))
      loadBalancer ! Broadcast(PoisonPill)
      loadBalancer ! PoisonPill
      self.stop()
    }
  
  case tasks: Array[Task] => ...
}
Le Master agrège les résultats intermédiaires dans une collection et une fois que tous les Workers ont fini leur travail, le résultat final est renvoyé à l'application grâce à la méthode completeWithResult du Future de l'appelant:
senderFuture.foreach(_.completeWithResult(phoneNumbers))
Puisque c'est bien élevé de nettoyer derrière soi, le Master envoie d'abord un message spécial PoisonPill à tous les Workers via le load balancer, puis le même message à ce dernier avant de s'arrêter soi-même. Le PoisonPill (un vrai poison cette chose) a pour effet l'arrêt de l'acteur récepteur. Les 3 lignes correspondantes sont:
senderFuture.foreach(_.completeWithResult(phoneNumbers))
loadBalancer ! Broadcast(PoisonPill)
loadBalancer ! PoisonPill

Enfin le bout de code qui affiche le résultat final sur la sortie standard:

val response = master !! tasks

response match {
  case Some(numbers) =>
    numbers match {
      case numbersAsIter:Iterable[Any] =>
        println(numbersAsIter mkString("\n"))
      case _ @response =>
        println("Unknown response format : " + response)
    }   
       
  case None => println("No phone number found")
}
Nous utilisons la méthode scala.collection.immutable.Iterable#mkString pour afficher un numéro par ligne.

Allons exécuter ce programme!

Placez-vous dans le répertoire du projet et lancez SBT puis lancer la commande suivante:

>run test.txt test1.txt
Nous obtenons une sortie similaire à ce qui suit:
> run test.txt test1.txt
[info] 
[info] == copy-resources ==
[info] == copy-resources ==
[info] 
[info] == compile ==
[info]   Source analysis: 1 new/modified, 0 indirectly invalidated, 0 removed.
[info] Compiling main sources...
[warn] there were 1 unchecked warnings; re-run with -unchecked for details
[warn] one warning found
[info] Compilation successful.
[info]   Post-analysis: 21 classes.
[info] == compile ==
[info] 
[info] == run ==
[info] Running com.nouhoum.akka.PhoneNumberSearchApp test.txt test1.txt
=========================
AKKA_HOME is defined as [/home/toto/tools/akka-1.1/akka-core-1.1], loading config from [/home/toto/tools/akka-1.1/akka-core-1.1/config/akka.conf].
=> preStart() of the Worker
=> preStart() of the Worker
=> preStart() of the Master
Waiting for the master response....
Received task list size = 2
Worker doing hard work!!...
Worker doing hard work!!...
====The following numbers has been found====
0102030405
08 05 05 05 03
0405050503
09 45 33 54 22
06-05-05-05-03
0304458966
0404458966
Total time : 220 ms
=========================
=> postStop() of the Master
=> postStop() of the Worker
=> postStop() of the Worker
[info] == run ==
[success] Successful.
[info] 

Code source et conclusion

Je souhaitais, à travers cet article, vous donner un aperçu d'Akka. L'exemple traité ici permet d'avoir une idée de la puissance d'Akka. J'espère qu'il vous a donné envie d'explorer l'outil. Quant à moi je continue à l'explorer et vous ferais découvrir ici mes futures découvertes et aventures avec Akka.

Le code source est disponible ici.

Happy hAkking!

2 commentaires :

PhiLho a dit…

Très bonne introduction à Akka, vivante et concrète, ça donne envie d'essayer. Le problème est de trouver une tâche à paralléliser dans un petit projet à échelle individuelle.
L'exemple donné ici est donc bien choisi...

Note : j'ai vu quelques "fautes de frappe", comme:
s/routière/routinière/
s/aggrève/agrège/
s/double bag/double bang/

Nouhoum TRAORE a dit…

Merci PhiLho! J'ai corrigé les fautes signalées. Effectivement c'est vrai que l'exemple se prête bien à la parallélisation des tâches. Happy hAkking comme on dit.

© Nouhoum TRAORE.. Fourni par Blogger.