Producteurs/Consommateurs

Avant-propos

Dans ce TP, on cherche à mettre en application le principe du producteur/consommateur. L'idée est de ne jamais faire de synchronisation "à la main", mais plutôt d'apprendre à utiliser judicieusement l'API java.util.concurrent.

Chaine de décodage

Dans cet exercice, on dispose d'une API simplifiée à l'extrême de décodage de messages reçus depuis internet. Elle fournit 3 méthodes static :

Récupérer la classe CodeAPI.java

Écrire dans le main d'une classe Codex un programme qui va utiliser 3 threads pour récupérer en boucle les messages codés depuis internet, 2 threads pour les décoder et 1 thread pour archiver les messages décodés. De plus, chaque thread affiche ce qu'il fait. Pour l'instant, on ignore (on ne fait rien dans ce cas) les messages dont le décodage lève une exception.

(à la maison) Copiez votre classe Codex dans une classe CodexWithInterruption et modifier le main pour que le programme s'arrête si l'un des messages codés produit une exception lors de son décodage.

API Request

Le but de cet exercice est d'apprendre à utiliser une API fictive de requêtes Request.java (ce n'est pas un exercice de concurrence). Cette API très simplifiée permet de demander le prix d'un objet sur un site de vente en ligne. Pour faire cette demande, on crée une Request avec le nom de l'objet et le nom du site. La méthode request(int timeoutMilli) effectue la demande et renvoie la réponse sous la forme d'une Answer.java. La méthode bloque en attendant la réponse mais cette attente est bornée à au plus timeoutMilli millisecondes.

Request request = new Request("amazon.fr", "pikachu");
Optional<Answer> answer = request.request(5_000);
if (answer.isPresent()) {
    System.out.println("The price is " + answer.orElseThrow().price());
} else {
    System.out.println("The price could not be retrieved from the site");
}

La réponse renvoyée par request() n'est pas nécessairement fructueuse, c'est pourquoi elle renvoie un Optional<Answer>. Si une réponse est fournie, on peut récupérer le prix avec answer.price().

La liste (List<String>) de tous les sites connus par l'API Request peut-être obtenue par Request.getAllSites().

Compléter la classe CheapestSequential.java pour que la méthode retrieve demande tour à tour le prix de l'item sur tous les sites et renvoie une Answer correspondant au prix le plus faible. Si l'item n'est disponible sur aucun site, la méthode renvoie Optional.empty().

Pour vous aider, l'API affiche des messages de debuggage. Bien entendu, avec une vraie API, ces affichages ne seraient pas présents.

Conseil : ne cherchez pas à optimiser le calcul du minimum, vous pouvez simplement stocker les résultats des requêtes et calculer le minimum à la fin. En Java, il existe les méthodes Stream.min(comparator) et Comparator.comparingInt(function).

Exercice 2

Dans cet exercice, on cherche à accélérer la recherche du meilleur prix en utilisant des threads.

Dans un premier temps, on veut écrire une classe Fastest qui lance un thread par site et renvoie la réponse du premier site qui donne un prix. Si l'article, n'est présent sur aucun site, la méthode renvoie Optional.empty()

var agregator = new Fastest("tortank", 2_000);
var answer = agregator.retrieve();
System.out.println(answer); // Optional[tortank@... : ...]

Écrire la classe Fastest.

  • Tous les threads démarrés doivent être interrompus à la sortie de retrieve.
  • Pensez à vérifier que votre méthode renvoie bien Optional.empty() si l'article n'est disponible sur aucun site. Vous pouvez tester avec l'objet pokeball.

On veut maintenant écrire une classe Cheapest qui lance un thread par site et collecte toutes les réponses pour renvoyer la moins chère. Si l'article, n'est présent sur aucun site, la méthode renvoie Optional.empty()

Écrire la classe Cheapest.

  System.out.println(new Cheapest("tortank",2_000).retrieve()); // Optional[tortank@laredoute.fr : 219]

Si Request.getAllSites() contient 1000 sites, combien de threads seront lancés simultanément. Est-ce bien raisonnable ?

Exercice 3

Dans cet exercice, on cherche à pallier le problème du trop grand nombre de threads démarrés simultanément.

L'idée est de démarrer un nombre fixé poolSize de threads, quel que soit le nombre de sites à interroger. Dans la suite, nous appellerons ces threads des worker threads. L'idée est que les worker threads vont, en boucle, exécuter des requêtes sur les différents sites et communiquer la réponse au thread qui exécute la méthode retrieve. Bien sûr, on ne veut pas que deux worker threads effectuent une requête pour le même site.

Pour communiquer entre le thread qui exécute la méthode retrieve et les worker threads, on utilise deux BlockingQueue sitesQueue et answersQueue. La file sitesQueue contient initialement tous les sites. Les worker threads vont en boucle prendre dans cette file un site, effectuer la Request et mettre la réponse dans la answersQueue.

La classe CheapestPooled prend à sa construction le nombre poolSize de worker threads et la valeur du timeout pour chacune des requêtes. Elle implémente une méthode retrieve() qui renvoie le prix le moins élevé, comme la classe Cheapest écrite précédemment.

Quel type de BlockingQueue peut-on utiliser pour sitesQueue et answersQueue ?

En partant de Cheapest.java, écrire la classe CheapestPooled.

Tous les threads démarrés doivent être interrompus à la sortie de retrieve.

Exercice 4 (Optionel)

Dans cet exercice, on cherche à écrire un classe FixedThreadPool qui abstrait le mécanisme de création des worker threads, l'envoi des tâches à exécuter et l'arrêt des worker threads.

Un objet FixedThreadPool est créé avec un nombre poolSize qui correspond au nombre de worker threads. La méthode submit permet d'ajouter des tâches à exécuter par les worker threads. Une tâche est une lambda du type Task.java:

@FunctionalInterface
public interface Task {
    void run() throws InterruptedException;
}

En cas d'interruption, les Task fournies à FixedThreadPool sont supposées s’arrêter soit en levant l'exception InterruptedException soit en laissant le statut d'interruption positionné.

La méthode start démarre les worker threads et la méthode stop les interrompt.

Récupérer le fichierFixedThreadPool.java et le compléter afin d'avoir une classe FixedThreadPool qui fonctionne.

En utilisant FixedThreadPool, réécrire la classe CheapestPooled dans une classe que vous nommerez CheapestPooledWithFixedThreadPool.