Articles

Java 8 Concurrency Tutorial: Threads und Executoren

Posted on

Willkommen zum ersten Teil meines Java 8 Concurrency-Tutorials. Dieser Leitfaden bringt Ihnen die nebenläufige Programmierung in Java 8 mit leicht verständlichen Codebeispielen bei. Es ist der erste Teil aus einer Reihe von Tutorials, die die Java Concurrency API behandeln. In den nächsten 15 Minuten lernen Sie, wie Sie Code mit Hilfe von Threads, Tasks und Executor-Diensten parallel ausführen können.

  • Teil 1: Threads und Executors
  • Teil 2: Synchronisation und Locks
  • Teil 3: Atomare Variablen und ConcurrentMap

Die Concurrency-API wurde erstmals mit der Veröffentlichung von Java 5 eingeführt und dann mit jeder neuen Java-Version schrittweise erweitert. Die meisten der in diesem Artikel gezeigten Konzepte funktionieren auch in älteren Java-Versionen. Meine Codebeispiele konzentrieren sich jedoch auf Java 8 und nutzen Lambda-Ausdrücke und andere neue Funktionen intensiv. Wenn Sie noch nicht mit Lambdas vertraut sind, empfehle ich Ihnen, zuerst mein Java 8-Tutorial zu lesen.

Threads und Runnables

Alle modernen Betriebssysteme unterstützen Gleichzeitigkeit sowohl über Prozesse als auch über Threads. Prozesse sind Instanzen von Programmen, die typischerweise unabhängig voneinander laufen, z. B. wenn Sie ein Java-Programm starten, legt das Betriebssystem einen neuen Prozess an, der parallel zu anderen Programmen läuft. Innerhalb dieser Prozesse können wir Threads verwenden, um Code gleichzeitig auszuführen, so dass wir die verfügbaren Kerne der CPU optimal nutzen können.

Java unterstützt Threads seit JDK 1.0. Bevor Sie einen neuen Thread starten, müssen Sie den von diesem Thread auszuführenden Code angeben, der oft als Task bezeichnet wird. Dies geschieht durch die Implementierung von Runnable – einer funktionalen Schnittstelle, die eine einzelne void no-args-Methode run() definiert, wie im folgenden Beispiel gezeigt:

Runnable task = () -> { String threadName = Thread.currentThread().getName(); System.out.println("Hello " + threadName);};task.run();Thread thread = new Thread(task);thread.start();System.out.println("Done!");

Da es sich bei Runnable um eine funktionale Schnittstelle handelt, können wir Java-8-Lambda-Ausdrücke verwenden, um den Namen des aktuellen Threads auf der Konsole auszugeben. Zuerst führen wir die Runnable direkt auf dem Hauptthread aus, bevor wir einen neuen Thread starten.

Das Ergebnis auf der Konsole könnte so aussehen:

Hello mainHello Thread-0Done!

Oder so:

Hello mainDone!Hello Thread-0

Durch die parallele Ausführung können wir nicht vorhersagen, ob das Runnable vor oder nach dem Drucken von ‚done‘ aufgerufen wird. Die Reihenfolge ist nicht-deterministisch, was die nebenläufige Programmierung in größeren Anwendungen zu einer komplexen Aufgabe macht.

Threads können für eine bestimmte Dauer in den Schlaf versetzt werden. Dies ist recht praktisch, um lang laufende Aufgaben in den folgenden Codebeispielen dieses Artikels zu simulieren:

Runnable runnable = () -> { try { String name = Thread.currentThread().getName(); System.out.println("Foo " + name); TimeUnit.SECONDS.sleep(1); System.out.println("Bar " + name); } catch (InterruptedException e) { e.printStackTrace(); }};Thread thread = new Thread(runnable);thread.start();

Wenn Sie den obigen Code ausführen, werden Sie die einsekündige Verzögerung zwischen der ersten und der zweiten Druckanweisung bemerken. TimeUnit ist ein nützliches Enum für die Arbeit mit Zeiteinheiten. Alternativ können Sie dasselbe durch den Aufruf von Thread.sleep(1000) erreichen.

Die Arbeit mit der Klasse Thread kann sehr mühsam und fehleranfällig sein. Aus diesem Grund wurde bereits 2004 mit der Veröffentlichung von Java 5 die Concurrency-API eingeführt. Die API befindet sich im Paket java.util.concurrent und enthält viele nützliche Klassen für den Umgang mit gleichzeitiger Programmierung. Seitdem wurde die Concurrency-API mit jeder neuen Java-Version erweitert und auch Java 8 bietet neue Klassen und Methoden für den Umgang mit Nebenläufigkeit.

Nun wollen wir uns einen der wichtigsten Teile der Concurrency-API genauer ansehen – die Executor-Dienste.

Executors

Die Concurrency-API führt das Konzept eines ExecutorService als übergeordneten Ersatz für das direkte Arbeiten mit Threads ein. Executors sind in der Lage, asynchrone Aufgaben auszuführen und verwalten typischerweise einen Pool von Threads, so dass wir nicht manuell neue Threads erstellen müssen. Alle Threads des internen Pools werden unter der Haube für wiederkehrende Aufgaben wiederverwendet, so dass wir während des gesamten Lebenszyklus unserer Anwendung mit einem einzigen Executor-Dienst so viele gleichzeitige Aufgaben ausführen können, wie wir wollen.

So sieht das erste Thread-Beispiel mit Executors aus:

ExecutorService executor = Executors.newSingleThreadExecutor();executor.submit(() -> { String threadName = Thread.currentThread().getName(); System.out.println("Hello " + threadName);});// => Hello pool-1-thread-1

Die Klasse Executors bietet komfortable Factory-Methoden zum Erstellen verschiedener Arten von Executor-Diensten. In diesem Beispiel verwenden wir einen Executor mit einem Thread-Pool der Größe eins.

Das Ergebnis sieht ähnlich aus wie im obigen Beispiel, aber wenn Sie den Code ausführen, werden Sie einen wichtigen Unterschied bemerken: Der Java-Prozess wird nie gestoppt! Executors müssen explizit gestoppt werden – sonst horchen sie weiter auf neue Tasks.

Ein ExecutorService stellt dafür zwei Methoden zur Verfügung: shutdown() wartet auf das Ende der aktuell laufenden Tasks, während shutdownNow() alle laufenden Tasks unterbricht und den Executor sofort herunterfährt.

Dies ist der bevorzugte Weg, wie ich typischerweise Executors herunterfahre:

try { System.out.println("attempt to shutdown executor"); executor.shutdown(); executor.awaitTermination(5, TimeUnit.SECONDS);}catch (InterruptedException e) { System.err.println("tasks interrupted");}finally { if (!executor.isTerminated()) { System.err.println("cancel non-finished tasks"); } executor.shutdownNow(); System.out.println("shutdown finished");}

Der Executor fährt sanft herunter, indem er eine bestimmte Zeit auf die Beendigung der aktuell laufenden Tasks wartet. Nach maximal fünf Sekunden fährt der Executor schließlich herunter, indem er alle laufenden Tasks unterbricht.

Callables und Futures

Neben Runnable unterstützen Executors eine weitere Art von Task namens Callable. Callables sind funktionale Schnittstellen, genau wie Runnables, aber statt void geben sie einen Wert zurück.

Dieser Lambda-Ausdruck definiert ein Callable, das eine ganze Zahl zurückgibt, nachdem es eine Sekunde lang geschlafen hat:

Callable<Integer> task = () -> { try { TimeUnit.SECONDS.sleep(1); return 123; } catch (InterruptedException e) { throw new IllegalStateException("task interrupted", e); }};

Callables können genau wie Runnables an Executor-Dienste übergeben werden. Aber was ist mit dem Ergebnis der Callables? Da submit() nicht wartet, bis die Aufgabe abgeschlossen ist, kann der Executor-Dienst das Ergebnis des Callables nicht direkt zurückgeben. Stattdessen gibt der Executor ein spezielles Ergebnis vom Typ Future zurück, mit dem das eigentliche Ergebnis zu einem späteren Zeitpunkt abgerufen werden kann.

ExecutorService executor = Executors.newFixedThreadPool(1);Future<Integer> future = executor.submit(task);System.out.println("future done? " + future.isDone());Integer result = future.get();System.out.println("future done? " + future.isDone());System.out.print("result: " + result);

Nach der Übergabe des Callables an den Executor prüfen wir zunächst über isDone(), ob der Future bereits fertig ausgeführt wurde. Ich bin mir ziemlich sicher, dass dies nicht der Fall ist, da das obige Callable eine Sekunde lang schläft, bevor es den Integer zurückgibt.

Der Aufruf der Methode get() blockiert den aktuellen Thread und wartet, bis das Callable fertig ist, bevor es das eigentliche Ergebnis zurückgibt 123. Nun ist der Future endlich fertig und wir sehen folgendes Ergebnis auf der Konsole:

future done? falsefuture done? trueresult: 123

Futures sind eng mit dem zugrundeliegenden Executor-Dienst gekoppelt. Beachten Sie, dass jeder nicht terminierte Future Exceptions wirft, wenn Sie den Executor herunterfahren:

executor.shutdownNow();future.get();

Sie haben vielleicht bemerkt, dass sich die Erstellung des Executors leicht vom vorherigen Beispiel unterscheidet. Wir verwenden newFixedThreadPool(1), um einen Executor-Dienst zu erstellen, der von einem Thread-Pool der Größe eins unterstützt wird. Dies ist äquivalent zu newSingleThreadExecutor(), aber wir könnten später die Pool-Größe erhöhen, indem wir einfach einen Wert größer als eins übergeben.

Timeouts

Jeder Aufruf von future.get() blockiert und wartet, bis die zugrunde liegende Callable beendet wurde. Im schlimmsten Fall läuft eine Callable ewig – und macht damit Ihre Anwendung unansprechbar. Sie können diesen Szenarien einfach entgegenwirken, indem Sie einen Timeout übergeben:

ExecutorService executor = Executors.newFixedThreadPool(1);Future<Integer> future = executor.submit(() -> { try { TimeUnit.SECONDS.sleep(2); return 123; } catch (InterruptedException e) { throw new IllegalStateException("task interrupted", e); }});future.get(1, TimeUnit.SECONDS);

Die Ausführung des obigen Codes führt zu einem TimeoutException:

Exception in thread "main" java.util.concurrent.TimeoutException at java.util.concurrent.FutureTask.get(FutureTask.java:205)

Sie haben vielleicht schon erraten, warum diese Exception geworfen wird: Wir haben eine maximale Wartezeit von einer Sekunde angegeben, aber das Callable braucht tatsächlich zwei Sekunden, bevor es das Ergebnis zurückgibt.

InvokeAll

Executors unterstützen das Batch-Senden mehrerer Callables auf einmal über invokeAll(). Diese Methode akzeptiert eine Sammlung von Callables und gibt eine Liste von Futures zurück.

ExecutorService executor = Executors.newWorkStealingPool();List<Callable<String>> callables = Arrays.asList( () -> "task1", () -> "task2", () -> "task3");executor.invokeAll(callables) .stream() .map(future -> { try { return future.get(); } catch (Exception e) { throw new IllegalStateException(e); } }) .forEach(System.out::println);

In diesem Beispiel verwenden wir Java 8 Funktionsströme, um alle Futures zu verarbeiten, die durch den Aufruf von invokeAll zurückgegeben werden. Wir mappen zunächst jeden Future auf seinen Rückgabewert und geben dann jeden Wert auf der Konsole aus. Wenn Sie noch nicht mit Streams vertraut sind, lesen Sie mein Java 8 Stream Tutorial.

InvokeAny

Eine weitere Möglichkeit, Callables im Stapel zu übergeben, ist die Methode invokeAny(), die etwas anders funktioniert als invokeAll(). Anstatt zukünftige Objekte zurückzugeben, blockiert diese Methode, bis die erste Callable beendet ist und gibt das Ergebnis dieser Callable zurück.

Um dieses Verhalten zu testen, verwenden wir diese Hilfsmethode, um Callables mit unterschiedlichen Laufzeiten zu simulieren. Die Methode gibt ein Callable zurück, das für eine bestimmte Zeit schläft, bis es das angegebene Ergebnis zurückgibt:

Callable<String> callable(String result, long sleepSeconds) { return () -> { TimeUnit.SECONDS.sleep(sleepSeconds); return result; };}

Wir verwenden diese Methode, um eine Reihe von Callables mit unterschiedlichen Dauern von einer bis drei Sekunden zu erzeugen. Die Übergabe dieser Callables an einen Executor über invokeAny() liefert das String-Ergebnis der schnellsten Callable – in diesem Fall Task2:

ExecutorService executor = Executors.newWorkStealingPool();List<Callable<String>> callables = Arrays.asList( callable("task1", 2), callable("task2", 1), callable("task3", 3));String result = executor.invokeAny(callables);System.out.println(result);// => task2

Das obige Beispiel verwendet einen weiteren Typ von Executor, der über newWorkStealingPool() erzeugt wird. Diese Factory-Methode ist Teil von Java 8 und liefert einen Executor vom Typ ForkJoinPool, der etwas anders arbeitet als normale Executors. Anstatt einen Thread-Pool mit fester Größe zu verwenden, werden ForkJoinPools für eine bestimmte Parallelitätsgröße erstellt, die standardmäßig die Anzahl der verfügbaren Kerne der Host-CPU ist.

ForkJoinPools gibt es seit Java 7 und werden in einem späteren Tutorial dieser Serie ausführlich behandelt. Lassen Sie uns dieses Tutorial abschließen, indem wir einen tieferen Blick auf geplante Executors werfen.

Geplante Executors

Wir haben bereits gelernt, wie man Aufgaben einmalig an einen Executor übergibt und ausführt. Um allgemeine Aufgaben periodisch mehrfach auszuführen, können wir geplante Thread-Pools verwenden.

Ein ScheduledExecutorService kann Aufgaben so planen, dass sie entweder periodisch oder einmal nach Ablauf einer bestimmten Zeit ausgeführt werden.

Dieses Codebeispiel plant eine Aufgabe so ein, dass sie nach einer anfänglichen Verzögerung von drei Sekunden ausgeführt wird:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS);TimeUnit.MILLISECONDS.sleep(1337);long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS);System.out.printf("Remaining Delay: %sms", remainingDelay);

Die Einplanung einer Aufgabe erzeugt einen spezialisierten Future vom Typ ScheduledFuture, der – zusätzlich zu Future – die Methode getDelay() zum Abrufen der verbleibenden Verzögerung bereitstellt. Nach Ablauf dieser Verzögerung wird die Aufgabe parallel ausgeführt.

Um Aufgaben zur periodischen Ausführung einzuplanen, stellt Executor die beiden Methoden scheduleAtFixedRate() und scheduleWithFixedDelay() bereit. Die erste Methode ist in der Lage, Tasks mit einer festen Zeitrate auszuführen, z.B. einmal pro Sekunde wie in diesem Beispiel:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());int initialDelay = 0;int period = 1;executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);

Zusätzlich akzeptiert diese Methode eine initiale Verzögerung, die die führende Wartezeit beschreibt, bevor der Task zum ersten Mal ausgeführt wird.

Bitte beachten Sie, dass scheduleAtFixedRate() nicht die tatsächliche Dauer der Aufgabe berücksichtigt. Wenn Sie also eine Zeitspanne von einer Sekunde angeben, die Aufgabe aber 2 Sekunden zur Ausführung benötigt, wird der Thread-Pool sehr bald ausgelastet sein.

In diesem Fall sollten Sie stattdessen scheduleWithFixedDelay() verwenden. Diese Methode funktioniert genauso wie das oben beschriebene Gegenstück. Der Unterschied ist, dass die Wartezeit zwischen dem Ende eines Tasks und dem Start des nächsten Tasks gilt. Beispiel:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);Runnable task = () -> { try { TimeUnit.SECONDS.sleep(2); System.out.println("Scheduling: " + System.nanoTime()); } catch (InterruptedException e) { System.err.println("task interrupted"); }};executor.scheduleWithFixedDelay(task, 0, 1, TimeUnit.SECONDS);

In diesem Beispiel wird ein Task mit einer festen Verzögerung von einer Sekunde zwischen dem Ende einer Ausführung und dem Start der nächsten Ausführung eingeplant. Die anfängliche Verzögerung ist Null und die Dauer des Tasks beträgt zwei Sekunden. Es ergibt sich also ein Ausführungsintervall von 0s, 3s, 6s, 9s usw. Wie Sie sehen können, ist scheduleWithFixedDelay() praktisch, wenn Sie die Dauer der geplanten Aufgaben nicht vorhersagen können.

Dies war der erste Teil einer Reihe von Gleichzeitigkeits-Tutorials. Ich empfehle, die gezeigten Codebeispiele selbst zu üben. Sie finden alle Code-Beispiele aus diesem Artikel auf GitHub, also zögern Sie nicht, das Repo zu forken und mir einen Stern zu geben.

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert.