Articles

Tutoriel Java 8 sur la concurence : Threads et exécuteurs

Posted on

Bienvenue dans la première partie de mon tutoriel Java 8 Concurrence. Ce guide vous apprend la programmation concurrente en Java 8 avec des exemples de code faciles à comprendre. C’est la première partie d’une série de tutoriels couvrant l’API Java Concurrency. Dans les 15 min suivantes, vous apprendrez à exécuter du code en parallèle via des threads, des tâches et des services d’exécution.

  • Partie 1 : Threads et exécuteurs
  • Partie 2 : Synchronisation et verrous
  • Partie 3 : Variables atomiques et ConcurrentMap

L’API Concurrence a été introduite pour la première fois avec la sortie de Java 5, puis progressivement améliorée avec chaque nouvelle version de Java. La majorité des concepts présentés dans cet article fonctionnent également dans les anciennes versions de Java. Cependant, mes exemples de code se concentrent sur Java 8 et font un usage intensif des expressions lambda et d’autres nouvelles fonctionnalités. Si vous n’êtes pas encore familier avec les lambdas, je vous recommande de lire d’abord mon tutoriel Java 8.

Threads et Runnables

Tous les systèmes d’exploitation modernes supportent la concurrence à la fois via les processus et les threads. Les processus sont des instances de programmes qui s’exécutent généralement indépendamment les uns des autres, par exemple, si vous lancez un programme java, le système d’exploitation génère un nouveau processus qui s’exécute en parallèle des autres programmes. À l’intérieur de ces processus, nous pouvons utiliser des threads pour exécuter du code de manière simultanée, afin de tirer le meilleur parti des cœurs disponibles du CPU.

Java prend en charge les threads depuis le JDK 1.0. Avant de démarrer un nouveau thread, vous devez spécifier le code qui sera exécuté par ce thread, souvent appelé la tâche. Cela se fait en implémentant Runnable – une interface fonctionnelle définissant une seule méthode void no-args run() comme le démontre l’exemple suivant :

Runnable task = () -> { String threadName = Thread.currentThread().getName(); System.out.println("Hello " + threadName);};task.run();Thread thread = new Thread(task);thread.start();System.out.println("Done!");

Puisque Runnable est une interface fonctionnelle, nous pouvons utiliser des expressions lambda Java 8 pour imprimer le nom du thread actuel sur la console. Tout d’abord, nous exécutons le runnable directement sur le thread principal avant de démarrer un nouveau thread.

Le résultat sur la console pourrait ressembler à ceci:

Hello mainHello Thread-0Done!

Ou cela :

Hello mainDone!Hello Thread-0

En raison de l’exécution concurrente, nous ne pouvons pas prédire si le runnable sera invoqué avant ou après l’impression de ‘done’. L’ordre est non déterministe, ce qui fait de la programmation concurrente une tâche complexe dans les grandes applications.

Les threads peuvent être mis en sommeil pendant une certaine durée. Ceci est assez pratique pour simuler des tâches à long terme dans les exemples de code suivants de cet article :

Runnable runnable = () -> { try { String name = Thread.currentThread().getName(); System.out.println("Foo " + name); TimeUnit.SECONDS.sleep(1); System.out.println("Bar " + name); } catch (InterruptedException e) { e.printStackTrace(); }};Thread thread = new Thread(runnable);thread.start();

Lorsque vous exécutez le code ci-dessus, vous remarquerez le délai d’une seconde entre la première et la deuxième instruction d’impression. TimeUnit est un enum utile pour travailler avec des unités de temps. Alternativement, vous pouvez obtenir la même chose en appelant Thread.sleep(1000).

Travailler avec la classe Thread peut être très fastidieux et sujet aux erreurs. C’est pour cette raison que l’API Concurrency a été introduite en 2004 avec la sortie de Java 5. L’API se trouve dans le package java.util.concurrent et contient de nombreuses classes utiles pour gérer la programmation concurrente. Depuis lors, l’API Concurrency a été améliorée à chaque nouvelle version de Java et même Java 8 fournit de nouvelles classes et méthodes pour traiter la concurrence.

Maintenant, examinons plus en profondeur l’une des parties les plus importantes de l’API Concurrency – les services d’exécution.

Exécuteurs

L’API Concurrency introduit le concept d’un ExecutorService comme remplacement de plus haut niveau pour travailler directement avec les threads. Les exécuteurs sont capables d’exécuter des tâches asynchrones et gèrent généralement un pool de threads, de sorte que nous n’avons pas à créer de nouveaux threads manuellement. Tous les threads du pool interne seront réutilisés sous le capot pour les tâches revenantes, nous pouvons donc exécuter autant de tâches simultanées que nous le souhaitons tout au long du cycle de vie de notre application avec un seul service exécuteur.

Voici à quoi ressemble le premier exemple de thread utilisant des exécuteurs:

ExecutorService executor = Executors.newSingleThreadExecutor();executor.submit(() -> { String threadName = Thread.currentThread().getName(); System.out.println("Hello " + threadName);});// => Hello pool-1-thread-1

La classe Executors fournit des méthodes de fabrique pratiques pour créer différents types de services exécuteurs. Dans cet échantillon, nous utilisons un exécuteur avec un pool de threads de taille un.

Le résultat ressemble à l’échantillon ci-dessus, mais en exécutant le code, vous remarquerez une différence importante : le processus java ne s’arrête jamais ! Les exécuteurs doivent être arrêtés explicitement – sinon ils continuent à écouter de nouvelles tâches.

Une ExecutorService fournit deux méthodes à cet effet : shutdown() attend que les tâches en cours d’exécution se terminent tandis que shutdownNow() interrompt toutes les tâches en cours d’exécution et arrête l’exécuteur immédiatement.

C’est la façon préférée dont j’arrête généralement les exécuteurs:

try { System.out.println("attempt to shutdown executor"); executor.shutdown(); executor.awaitTermination(5, TimeUnit.SECONDS);}catch (InterruptedException e) { System.err.println("tasks interrupted");}finally { if (!executor.isTerminated()) { System.err.println("cancel non-finished tasks"); } executor.shutdownNow(); System.out.println("shutdown finished");}

L’exécuteur s’arrête en douceur en attendant un certain temps la fin des tâches en cours d’exécution. Après un maximum de cinq secondes, l’exécuteur s’éteint finalement en interrompant toutes les tâches en cours d’exécution.

Callables et Futures

En plus de Runnable les exécuteurs supportent un autre type de tâche nommé Callable. Les callables sont des interfaces fonctionnelles tout comme les runnables mais au lieu d’être void ils retournent une valeur.

Cette expression lambda définit un callable renvoyant un entier après avoir dormi pendant une seconde:

Callable<Integer> task = () -> { try { TimeUnit.SECONDS.sleep(1); return 123; } catch (InterruptedException e) { throw new IllegalStateException("task interrupted", e); }};

Les callables peuvent être soumis à des services exécuteurs tout comme les runnables. Mais qu’en est-il du résultat des callables ? Puisque submit() n’attend pas que la tâche se termine, le service exécuteur ne peut pas retourner directement le résultat du callable. Au lieu de cela, l’exécuteur renvoie un résultat spécial de type Future qui peut être utilisé pour récupérer le résultat réel à un moment ultérieur.

ExecutorService executor = Executors.newFixedThreadPool(1);Future<Integer> future = executor.submit(task);System.out.println("future done? " + future.isDone());Integer result = future.get();System.out.println("future done? " + future.isDone());System.out.print("result: " + result);

Après avoir soumis le callable à l’exécuteur, nous vérifions d’abord si le futur a déjà terminé son exécution via isDone(). Je suis presque sûr que ce n’est pas le cas puisque le callable ci-dessus dort pendant une seconde avant de renvoyer l’entier.

L’appel de la méthode get() bloque le thread actuel et attend que le callable se termine avant de renvoyer le résultat réel 123. Maintenant, le futur est enfin terminé et nous voyons le résultat suivant sur la console:

future done? falsefuture done? trueresult: 123

Les futurs sont étroitement couplés au service exécuteur sous-jacent. N’oubliez pas que chaque futur non terminé lèvera des exceptions si vous arrêtez l’exécuteur :

executor.shutdownNow();future.get();

Vous avez peut-être remarqué que la création de l’exécuteur diffère légèrement de l’exemple précédent. Nous utilisons newFixedThreadPool(1) pour créer un service exécuteur soutenu par un thread-pool de taille un. Ceci est équivalent à newSingleThreadExecutor() mais nous pourrions plus tard augmenter la taille du pool en passant simplement une valeur supérieure à un.

Timeouts

Tout appel à future.get() bloquera et attendra jusqu’à ce que le callable sous-jacent soit terminé. Dans le pire des cas, un callable s’exécute éternellement – rendant ainsi votre application non réactive. Vous pouvez simplement contrer ces scénarios en passant un timeout :

ExecutorService executor = Executors.newFixedThreadPool(1);Future<Integer> future = executor.submit(() -> { try { TimeUnit.SECONDS.sleep(2); return 123; } catch (InterruptedException e) { throw new IllegalStateException("task interrupted", e); }});future.get(1, TimeUnit.SECONDS);

L’exécution du code ci-dessus donne lieu à un TimeoutException :

Exception in thread "main" java.util.concurrent.TimeoutException at java.util.concurrent.FutureTask.get(FutureTask.java:205)

Vous avez peut-être déjà deviné pourquoi cette exception est levée : Nous avons spécifié un temps d’attente maximal d’une seconde mais le callable a en fait besoin de deux secondes avant de renvoyer le résultat.

InvokeAll

Les exécuteurs prennent en charge la soumission par lot de plusieurs callables à la fois via invokeAll(). Cette méthode accepte une collection de callables et retourne une liste de futurs.

ExecutorService executor = Executors.newWorkStealingPool();List<Callable<String>> callables = Arrays.asList( () -> "task1", () -> "task2", () -> "task3");executor.invokeAll(callables) .stream() .map(future -> { try { return future.get(); } catch (Exception e) { throw new IllegalStateException(e); } }) .forEach(System.out::println);

Dans cet exemple, nous utilisons les flux fonctionnels de Java 8 afin de traiter tous les futurs retournés par l’invocation de invokeAll. Nous mappons d’abord chaque futur à sa valeur de retour, puis nous imprimons chaque valeur sur la console. Si vous n’êtes pas encore familier avec les flux, lisez mon tutoriel Java 8 Stream.

InvokeAny

Une autre façon de soumettre des appelables par lots est la méthode invokeAny() qui fonctionne légèrement différemment de invokeAll(). Au lieu de renvoyer des objets futurs, cette méthode bloque jusqu’à ce que le premier callable se termine et renvoie le résultat de ce callable.

Afin de tester ce comportement, nous utilisons cette méthode d’aide pour simuler des callables avec des durées différentes. La méthode renvoie un callable qui dort pendant un certain temps jusqu’à renvoyer le résultat donné:

Callable<String> callable(String result, long sleepSeconds) { return () -> { TimeUnit.SECONDS.sleep(sleepSeconds); return result; };}

Nous utilisons cette méthode pour créer un tas de callables avec des durées différentes de une à trois secondes. La soumission de ces callables à un exécuteur via invokeAny() renvoie le résultat de la chaîne du callable le plus rapide – dans ce cas, la tâche2 :

ExecutorService executor = Executors.newWorkStealingPool();List<Callable<String>> callables = Arrays.asList( callable("task1", 2), callable("task2", 1), callable("task3", 3));String result = executor.invokeAny(callables);System.out.println(result);// => task2

L’exemple ci-dessus utilise encore un autre type d’exécuteur créé via newWorkStealingPool(). Cette méthode de fabrique fait partie de Java 8 et renvoie un exécuteur de type ForkJoinPool qui fonctionne légèrement différemment des exécuteurs normaux. Au lieu d’utiliser un pool de threads de taille fixe, les ForkJoinPools sont créés pour une taille de parallélisme donnée qui, par défaut, est le nombre de cœurs disponibles du CPU des hôtes.

Les ForkJoinPools existent depuis Java 7 et seront couverts en détail dans un tutoriel ultérieur de cette série. Terminons ce tutoriel en examinant plus en profondeur les exécuteurs planifiés.

Exécuteurs planifiés

Nous avons déjà appris à soumettre et à exécuter des tâches une fois sur un exécuteur. Afin d’exécuter périodiquement des tâches communes plusieurs fois, nous pouvons utiliser des pools de threads planifiés.

Une ScheduledExecutorService est capable de planifier des tâches pour les exécuter soit périodiquement, soit une fois après un certain temps.

Cet exemple de code planifie l’exécution d’une tâche après l’écoulement d’un délai initial de trois secondes :

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS);TimeUnit.MILLISECONDS.sleep(1337);long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS);System.out.printf("Remaining Delay: %sms", remainingDelay);

La planification d’une tâche produit un futur spécialisé de type ScheduledFuture qui – en plus de Future – fournit la méthode getDelay() pour récupérer le délai restant. Une fois ce délai écoulé, la tâche sera exécutée simultanément.

Afin de planifier les tâches à exécuter périodiquement, les exécuteurs fournissent les deux méthodes scheduleAtFixedRate() et scheduleWithFixedDelay(). La première méthode est capable d’exécuter des tâches avec une cadence temporelle fixe, par exemple une fois par seconde comme le démontre cet exemple :

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());int initialDelay = 0;int period = 1;executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);

En outre, cette méthode accepte un délai initial qui décrit le temps d’attente principal avant que la tâche ne soit exécutée pour la première fois.

Veuillez garder à l’esprit que scheduleAtFixedRate() ne prend pas en compte la durée réelle de la tâche. Ainsi, si vous spécifiez une période d’une seconde mais que la tâche a besoin de 2 secondes pour être exécutée, alors le pool de threads fonctionnera à pleine capacité très rapidement.

Dans ce cas, vous devriez envisager d’utiliser scheduleWithFixedDelay() à la place. Cette méthode fonctionne exactement comme la contrepartie décrite ci-dessus. La différence est que le délai d’attente s’applique entre la fin d’une tâche et le début de la tâche suivante. Par exemple :

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);Runnable task = () -> { try { TimeUnit.SECONDS.sleep(2); System.out.println("Scheduling: " + System.nanoTime()); } catch (InterruptedException e) { System.err.println("task interrupted"); }};executor.scheduleWithFixedDelay(task, 0, 1, TimeUnit.SECONDS);

Cet exemple programme une tâche avec un délai fixe d’une seconde entre la fin d’une exécution et le début de l’exécution suivante. Le délai initial est de zéro et la durée de la tâche est de deux secondes. Nous nous retrouvons donc avec un intervalle d’exécution de 0s, 3s, 6s, 9s et ainsi de suite. Comme vous pouvez le voir, scheduleWithFixedDelay() est pratique si vous ne pouvez pas prédire la durée des tâches programmées.

Ceci était la première partie d’une série de tutoriels sur la concurrence. Je vous recommande de pratiquer les échantillons de code montrés par vous-même. Vous trouvez tous les échantillons de code de cet article sur GitHub, donc n’hésitez pas à forker le repo et à me donner une étoile.

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée. Les champs obligatoires sont indiqués avec *