Articles

Java 8 Handleiding over Concurrency: Threads en Executors

Posted on

Welkom bij het eerste deel van mijn Java 8 Concurrency tutorial. Deze gids leert je gelijktijdig programmeren in Java 8 met gemakkelijk te begrijpen codevoorbeelden. Dit is het eerste deel uit een serie tutorials over de Java Concurrency API. In de komende 15 minuten leer je hoe je code parallel kunt uitvoeren via threads, tasks en executor services.

  • Deel 1: Threads en Executors
  • Deel 2: Synchronisatie en Locks
  • Deel 3: Atomic Variables en ConcurrentMap

De Concurrency API werd voor het eerst geïntroduceerd met de release van Java 5 en vervolgens geleidelijk verbeterd met elke nieuwe Java release. De meeste concepten in dit artikel werken ook in oudere versies van Java. Mijn codevoorbeelden zijn echter gericht op Java 8 en maken veel gebruik van lambda expressies en andere nieuwe functies. Als je nog niet bekend bent met lambda’s raad ik je aan eerst mijn Java 8 Tutorial te lezen.

Threads en Runnables

Alle moderne besturingssystemen ondersteunen concurrency zowel via processen als via threads. Processen zijn instanties van programma’s die onafhankelijk van elkaar draaien, bijvoorbeeld als je een java programma start, start het besturingssysteem een nieuw proces dat parallel aan andere programma’s draait. Binnen deze processen kunnen we threads gebruiken om code gelijktijdig uit te voeren, zodat we het meeste uit de beschikbare cores van de CPU kunnen halen.

Java ondersteunt threads sinds JDK 1.0. Voordat je een nieuwe thread start, moet je de code specificeren die door deze thread moet worden uitgevoerd, vaak de task genoemd. Dit wordt gedaan door Runnable te implementeren – een functionele interface die een enkele void no-args methode run() definieert, zoals in het volgende voorbeeld wordt gedemonstreerd:

Runnable task = () -> { String threadName = Thread.currentThread().getName(); System.out.println("Hello " + threadName);};task.run();Thread thread = new Thread(task);thread.start();System.out.println("Done!");

Omdat Runnable een functionele interface is, kunnen we Java 8 lambda expressies gebruiken om de naam van de huidige threads op de console af te drukken. Eerst voeren we de runnable direct op de main thread uit voordat we een nieuwe thread starten.

Het resultaat op de console kan er als volgt uitzien:

Hello mainHello Thread-0Done!

Of dat:

Hello mainDone!Hello Thread-0

Door gelijktijdige uitvoering kunnen we niet voorspellen of de runnable zal worden aangeroepen vóór of na het afdrukken van ‘done’. De volgorde is niet-deterministisch, waardoor gelijktijdig programmeren een complexe taak wordt in grotere toepassingen.

Threads kunnen voor een bepaalde tijd in slaapstand worden gezet. Dit is erg handig om langlopende taken te simuleren in de volgende codevoorbeelden van dit artikel:

Runnable runnable = () -> { try { String name = Thread.currentThread().getName(); System.out.println("Foo " + name); TimeUnit.SECONDS.sleep(1); System.out.println("Bar " + name); } catch (InterruptedException e) { e.printStackTrace(); }};Thread thread = new Thread(runnable);thread.start();

Wanneer u de bovenstaande code uitvoert, zult u merken dat er een vertraging van een seconde zit tussen het eerste en het tweede printstatement. TimeUnit is een handige enum voor het werken met eenheden van tijd. Als alternatief kunt u hetzelfde bereiken door Thread.sleep(1000) aan te roepen.

Werken met de Thread class kan erg vervelend en foutgevoelig zijn. Om die reden is de Concurrency API al in 2004 geïntroduceerd met de release van Java 5. De API bevindt zich in package java.util.concurrent en bevat vele nuttige klassen om gelijktijdig programmeren te behandelen. Sinds die tijd is de Concurrency API met elke nieuwe Java-release verbeterd en zelfs Java 8 biedt nieuwe klassen en methoden voor het omgaan met concurrency.

Nu gaan we dieper in op een van de belangrijkste onderdelen van de Concurrency API – de executor services.

Executors

De Concurrency API introduceert het concept van een ExecutorService als een hoger niveau vervanging voor het direct werken met threads. Executors zijn in staat om asynchrone taken uit te voeren en beheren meestal een pool van threads, zodat we niet handmatig nieuwe threads hoeven aan te maken. Alle threads van de interne pool worden onder de motorkap hergebruikt voor wraaknemende taken, zodat we zoveel gelijktijdige taken kunnen uitvoeren als we willen gedurende de hele levenscyclus van onze applicatie met een enkele executor service.

Zo ziet het eerste thread-voorbeeld eruit met behulp van executors:

ExecutorService executor = Executors.newSingleThreadExecutor();executor.submit(() -> { String threadName = Thread.currentThread().getName(); System.out.println("Hello " + threadName);});// => Hello pool-1-thread-1

De klasse Executors biedt handige fabrieksmethoden voor het maken van verschillende soorten executor-services. In dit voorbeeld gebruiken we een executor met een thread pool van grootte één.

Het resultaat ziet er ongeveer hetzelfde uit als in het bovenstaande voorbeeld, maar als je de code uitvoert zul je een belangrijk verschil opmerken: het java proces stopt nooit! Executors moeten expliciet worden gestopt – anders blijven ze luisteren naar nieuwe taken.

Een ExecutorService biedt twee methoden voor dat doel: shutdown() wacht tot de momenteel lopende taken klaar zijn, terwijl shutdownNow() alle lopende taken onderbreekt en de executor onmiddellijk afsluit.

Dit is de manier waarop ik executors gewoonlijk afsluit:

try { System.out.println("attempt to shutdown executor"); executor.shutdown(); executor.awaitTermination(5, TimeUnit.SECONDS);}catch (InterruptedException e) { System.err.println("tasks interrupted");}finally { if (!executor.isTerminated()) { System.err.println("cancel non-finished tasks"); } executor.shutdownNow(); System.out.println("shutdown finished");}

De executor wordt zachtjes afgesloten door een bepaalde tijd te wachten op de beëindiging van de momenteel lopende taken. Na maximaal vijf seconden sluit de executor uiteindelijk af door alle lopende taken te onderbreken.

Callables en Futures

Naast Runnable ondersteunen executors nog een ander soort taken genaamd Callable. Callables zijn functionele interfaces net als runnables maar in plaats van void geven ze een waarde terug.

Deze lambda-expressie definieert een callable die een geheel getal teruggeeft na één seconde te hebben geslapen:

Callable<Integer> task = () -> { try { TimeUnit.SECONDS.sleep(1); return 123; } catch (InterruptedException e) { throw new IllegalStateException("task interrupted", e); }};

Callables kunnen net als runnables aan executor services worden voorgelegd. Maar hoe zit het met het resultaat van de calllables? Omdat submit() niet wacht tot de taak is voltooid, kan de executor service het resultaat van de calllable niet direct retourneren. In plaats daarvan retourneert de executor een speciaal resultaat van het type Future dat kan worden gebruikt om het werkelijke resultaat op een later tijdstip op te halen.

ExecutorService executor = Executors.newFixedThreadPool(1);Future<Integer> future = executor.submit(task);System.out.println("future done? " + future.isDone());Integer result = future.get();System.out.println("future done? " + future.isDone());System.out.print("result: " + result);

Nadat we de calllable aan de uitvoerder hebben voorgelegd, controleren we eerst of de uitvoering van de future al is voltooid via isDone(). Ik ben er vrij zeker van dat dit niet het geval is, omdat de bovenstaande calllable een seconde slaapt voordat hij het gehele getal terugstuurt.

Het aanroepen van de methode get() blokkeert de huidige thread en wacht tot de calllable is voltooid voordat hij het daadwerkelijke resultaat terugstuurt 123. Nu is de future eindelijk klaar en zien we het volgende resultaat op de console:

future done? falsefuture done? trueresult: 123

Futures zijn nauw gekoppeld aan de onderliggende executor service. Houd er rekening mee dat elke niet-beëindigde future uitzonderingen zal gooien als u de uitvoerder afsluit:

executor.shutdownNow();future.get();

Het is u misschien opgevallen dat het aanmaken van de uitvoerder iets verschilt van het vorige voorbeeld. We gebruiken newFixedThreadPool(1) om een executor-service te maken die wordt ondersteund door een thread-pool van grootte één. Dit is equivalent aan newSingleThreadExecutor() maar we kunnen later de poolgrootte vergroten door simpelweg een waarde groter dan één door te geven.

Timeouts

Elke aanroep van future.get() zal blokkeren en wachten tot de onderliggende calllable is beëindigd. In het ergste geval blijft een calllable eeuwig lopen – en wordt je applicatie niet meer responsief. U kunt deze scenario’s eenvoudig tegengaan door een time-out door te geven:

ExecutorService executor = Executors.newFixedThreadPool(1);Future<Integer> future = executor.submit(() -> { try { TimeUnit.SECONDS.sleep(2); return 123; } catch (InterruptedException e) { throw new IllegalStateException("task interrupted", e); }});future.get(1, TimeUnit.SECONDS);

Uitvoering van de bovenstaande code resulteert in een TimeoutException:

Exception in thread "main" java.util.concurrent.TimeoutException at java.util.concurrent.FutureTask.get(FutureTask.java:205)

Je hebt misschien al geraden waarom deze uitzondering wordt gegooid: We hebben een maximale wachttijd van één seconde opgegeven, maar de calllable heeft in werkelijkheid twee seconden nodig voordat het resultaat wordt geretourneerd.

InvokeAll

Executors ondersteunen het batchgewijs indienen van meerdere calllables tegelijk via invokeAll(). Deze methode accepteert een verzameling van calllables en retourneert een lijst van futures.

ExecutorService executor = Executors.newWorkStealingPool();List<Callable<String>> callables = Arrays.asList( () -> "task1", () -> "task2", () -> "task3");executor.invokeAll(callables) .stream() .map(future -> { try { return future.get(); } catch (Exception e) { throw new IllegalStateException(e); } }) .forEach(System.out::println);

In dit voorbeeld gebruiken we Java 8 functionele streams om alle futures te verwerken die zijn geretourneerd door de aanroep van invokeAll. We mappen eerst elke future naar zijn return waarde en printen dan elke waarde naar de console. Als u nog niet bekend bent met streams lees dan mijn Java 8 Stream Tutorial.

InvokeAny

Een andere manier van batch-submit van calllables is de methode invokeAny() die iets anders werkt dan invokeAll(). In plaats van toekomstige objecten te retourneren, blokkeert deze methode totdat de eerste calllable eindigt en retourneert het resultaat van die calllable.

Om dit gedrag te testen gebruiken we deze helper-methode om calllables met verschillende looptijden te simuleren. De methode retourneert een calllable die een bepaalde tijd slaapt totdat het opgegeven resultaat wordt geretourneerd:

Callable<String> callable(String result, long sleepSeconds) { return () -> { TimeUnit.SECONDS.sleep(sleepSeconds); return result; };}

We gebruiken deze methode om een aantal calllables te maken met verschillende looptijden, variërend van één tot drie seconden. Als we deze calllables aan een uitvoerder voorleggen via invokeAny(), wordt het resultaat van de string van de snelste calllable geretourneerd – in dit geval task2:

ExecutorService executor = Executors.newWorkStealingPool();List<Callable<String>> callables = Arrays.asList( callable("task1", 2), callable("task2", 1), callable("task3", 3));String result = executor.invokeAny(callables);System.out.println(result);// => task2

Het bovenstaande voorbeeld maakt gebruik van nog een ander type executor dat is gemaakt via newWorkStealingPool(). Deze fabrieksmethode maakt deel uit van Java 8 en retourneert een executor van het type ForkJoinPool die iets anders werkt dan normale executors. In plaats van een thread-pool met een vaste grootte te gebruiken, worden ForkJoinPools gemaakt voor een bepaalde parallellisatiegrootte, die standaard het aantal beschikbare cores van de CPU van de host is.

ForkJoinPools bestaan sinds Java 7 en zullen in detail worden behandeld in een latere tutorial van deze serie. Laten we deze tutorial afsluiten met een diepere blik op geplande executors.

Geplande Executors

We hebben al geleerd hoe je taken eenmalig kunt indienen en uitvoeren op een executor.

Een ScheduledExecutorService kan taken plannen om ze periodiek uit te voeren, of eenmalig nadat een bepaalde tijd is verstreken.

Dit codevoorbeeld roostert een taak om te worden uitgevoerd nadat een eerste vertraging van drie seconden is verstreken:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS);TimeUnit.MILLISECONDS.sleep(1337);long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS);System.out.printf("Remaining Delay: %sms", remainingDelay);

Het plannen van een taak levert een gespecialiseerde future op van het type ScheduledFuture die – naast Future – de methode getDelay() bevat om de resterende vertraging op te vragen. Als deze vertraging is verstreken, wordt de taak gelijktijdig uitgevoerd.

Om taken in te plannen die periodiek moeten worden uitgevoerd, bieden uitvoerders de twee methoden scheduleAtFixedRate() en scheduleWithFixedDelay(). Met de eerste methode kunnen taken met een vaste tijdsfrequentie worden uitgevoerd, bijvoorbeeld eenmaal per seconde, zoals in dit voorbeeld wordt gedemonstreerd:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());int initialDelay = 0;int period = 1;executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);

Extra accepteert deze methode een initiële vertraging die de wachttijd beschrijft voordat de taak voor de eerste keer wordt uitgevoerd.

Bedenk wel dat scheduleAtFixedRate() geen rekening houdt met de werkelijke duur van de taak. Dus als u een periode van een seconde opgeeft, maar de taak heeft 2 seconden nodig om te worden uitgevoerd, dan zal de thread pool zeer snel vollopen.

In dat geval kunt u overwegen om scheduleWithFixedDelay() te gebruiken. Deze methode werkt net als de hierboven beschreven tegenhanger. Het verschil is dat de wachttijd geldt tussen het einde van een taak en het begin van de volgende taak. Bijvoorbeeld:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);Runnable task = () -> { try { TimeUnit.SECONDS.sleep(2); System.out.println("Scheduling: " + System.nanoTime()); } catch (InterruptedException e) { System.err.println("task interrupted"); }};executor.scheduleWithFixedDelay(task, 0, 1, TimeUnit.SECONDS);

Dit voorbeeld plant een taak met een vaste vertraging van één seconde tussen het einde van een uitvoering en het begin van de volgende uitvoering. De beginvertraging is nul en de duur van de taak is twee seconden. Dus we eindigen met een uitvoeringsinterval van 0s, 3s, 6s, 9s enzovoort. Zoals je ziet is scheduleWithFixedDelay() handig als je de duur van de geplande taken niet kunt voorspellen.

Dit was het eerste deel uit een serie van concurrency tutorials. Ik raad je aan de getoonde code voorbeelden zelf te oefenen. Je vindt alle codevoorbeelden uit dit artikel op GitHub, dus voel je vrij om de repo te forken en me een ster te geven.

Geef een reactie

Het e-mailadres wordt niet gepubliceerd. Vereiste velden zijn gemarkeerd met *