La backpressure du pauvre avec Akka et les futures

Leave a Comment

Il m’est souvent arrivé d’utiliser des acteurs Akka pour interroger des services tiers comme un index Elasticsearch ou une base de données. Je me suis trouvé dans des situations où le service est submergé par les requêtes de l’acteur consommateur. La solution facile consiste à bloquer sur chaque appel au service et à lui envoyer une nouvelle requête seulement s’il a répondu à la précédente. Soit la méthode pour évoquer le service :

 
def callService: Future[Response] = ???

Dans le code cela se traduit comme suit :

 
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._

val response = Await.result(callService, 3 seconds) 

Afin de comprendre la solution non bloquante que je vais vous présenter prenons cet exemple. Nous avons un service exposant une API pour récupérer des données en spécifiant un index de départ et le nombre d’entrées que l’on souhaite récupérer. La contrainte est que si on demande un nombre trop important d’entrées d’un coup le service peut se casser la gueule ! De même il peut s’écrouler et ou avoir des temps de réponse trop élevés s’il reçoit trop de requêtes simultanément.

La solution consiste en un système d’acteurs constitué d’un master et de plusieurs workers. Le master crée un ensemble de tâches qu’il fait exécuter par les workers. Une tâche est représentée par la structure suivante :

 
case class Task(from: Int, size: Int)

Au démarrage de l’application le master crée les tâches à exécuter :

 
class Master extends Actor with ActorLogging {
  var tasks = Tasks.empty
  def receive: Receive = {
    case message @ FetchData(from, to) =>
      tasks = Tasks.from(from, to, step)
      context.become(working)
      createAndStartWorkers()
  }
}

J’ai créé une petite structure, Tasks, afin de simplifier la gestion des tâches :

 
case class Tasks(values: Vector[Task]) {
  def isDone = values.isEmpty

  def next: Option[(Task, Tasks)] =
    if(isDone) None
    else Option((values.head, copy(values = values.drop(1))))
}

A la réception d’un message de type Task le worker récupère les données correspondantes :

 
class Worker(master: ActorRef) extends Actor {
  def receive: Receive = {
    case task @ Task(from, size) =>
      fetchData(from, size) onSuccess {
        case data =>  master ! GetTask
      }
  }

  def fetchData(from: Int, size: Int): Future[Data] = ???
}

Une fois que le worker finit de traiter une tâche il notifie le master qui lui assigne une nouvelle tâche ou arrête si tous les workers ont fini de bosser :

 
class Master extends Actor with ActorLogging {
  var tasks = Tasks.empty
  val step = 10
  val numberOfWorkers = 2
  var remainingWorkingWorkers = numberOfWorkers

  def waiting: Receive = {
    case message @ FetchData(from, to) =>
      tasks = Tasks.from(from, to, step)
      context.become(working)
      createAndStartWorkers()
  }

  def receive = waiting

  def working: Receive = {
    case GetTask =>
      tasks.next.fold({
        log.info("No more tasks... ;-)")
        remainingWorkingWorkers = remainingWorkingWorkers - 1
        shutdownIfAllTasksCompleted()
      }) {
        case (task, newTasks) =>
          tasks = newTasks
          sender ! task
      }
  }

  def createAndStartWorkers(): Unit = {
    (0 until numberOfWorkers) foreach(_ => context.actorOf(Props(new Worker(self))) ! StartWorking)
  }

  def allTasksCompleted = remainingWorkingWorkers == 0

  def shutdownIfAllTasksCompleted() =
    if(allTasksCompleted) context.system.shutdown()
}

Tout le code est disponible sur Github : https://github.com/nouhoum/akka-stuff/tree/poor-man-backpressure-akka.

© Nouhoum TRAORE.. Fourni par Blogger.