Articles

Java 8 Samouczek współbieżności: Threads and Executors

Posted on

Witamy w pierwszej części mojego samouczka współbieżności w Javie 8. Ten przewodnik nauczy Cię programowania współbieżnego w Javie 8 z łatwymi do zrozumienia przykładami kodu. Jest to pierwsza część z serii tutoriali omawiających API współbieżności w Javie. W ciągu następnych 15 minut dowiesz się, jak wykonywać kod równolegle za pomocą wątków, zadań i usług wykonawczych.

  • Część 1: Wątki i egzekutorzy
  • Część 2: Synchronizacja i zamki
  • Część 3: Zmienne atomowe i ConcurrentMap

The Concurrency API zostało po raz pierwszy wprowadzone wraz z wydaniem Javy 5, a następnie stopniowo ulepszane z każdym nowym wydaniem Javy. Większość koncepcji pokazanych w tym artykule działa również w starszych wersjach Javy. Jednak moje próbki kodu skupiają się na Javie 8 i mocno wykorzystują wyrażenia lambda oraz inne nowe funkcje. Jeśli nie jesteś jeszcze zaznajomiony z lambdami, polecam najpierw przeczytać mój Samouczek Javy 8.

Wątki i runnable

Wszystkie nowoczesne systemy operacyjne obsługują współbieżność zarówno poprzez procesy jak i wątki. Procesy są instancjami programów, które zazwyczaj działają niezależnie od siebie, np. jeśli uruchamiasz program java, system operacyjny tworzy nowy proces, który działa równolegle do innych programów. Wewnątrz tych procesów możemy wykorzystywać wątki do współbieżnego wykonywania kodu, dzięki czemu możemy w pełni wykorzystać dostępne rdzenie procesora.

Java obsługuje wątki od JDK 1.0. Przed rozpoczęciem nowego wątku należy określić kod, który ma być przez ten wątek wykonywany, często nazywany zadaniem. Robi się to poprzez implementację Runnable – funkcjonalnego interfejsu definiującego pojedynczą metodę void no-args run() jak zademonstrowano w poniższym przykładzie:

Runnable task = () -> { String threadName = Thread.currentThread().getName(); System.out.println("Hello " + threadName);};task.run();Thread thread = new Thread(task);thread.start();System.out.println("Done!");

Ponieważ Runnable jest interfejsem funkcjonalnym możemy wykorzystać wyrażenia lambda Javy 8 do wypisania nazwy bieżącego wątku na konsolę. Najpierw wykonujemy runnable bezpośrednio na głównym wątku przed rozpoczęciem nowego wątku.

Wynik na konsoli może wyglądać następująco:

Hello mainHello Thread-0Done!

Albo to:

Hello mainDone!Hello Thread-0

Z powodu współbieżnego wykonywania nie możemy przewidzieć, czy runnable zostanie wywołany przed czy po wydrukowaniu 'done'. Kolejność jest niedeterministyczna, przez co programowanie współbieżne staje się skomplikowanym zadaniem w większych aplikacjach.

Wątki mogą być usypiane na określony czas. Jest to bardzo przydatne do symulowania długo działających zadań w kolejnych próbkach kodu tego artykułu:

Runnable runnable = () -> { try { String name = Thread.currentThread().getName(); System.out.println("Foo " + name); TimeUnit.SECONDS.sleep(1); System.out.println("Bar " + name); } catch (InterruptedException e) { e.printStackTrace(); }};Thread thread = new Thread(runnable);thread.start();

Po uruchomieniu powyższego kodu zauważysz sekundowe opóźnienie pomiędzy pierwszą a drugą instrukcją print. TimeUnit jest użytecznym enum do pracy z jednostkami czasu. Alternatywnie możesz osiągnąć to samo, wywołując Thread.sleep(1000).

Praca z klasą Thread może być bardzo żmudna i podatna na błędy. Z tego powodu w 2004 roku, wraz z wydaniem Javy 5, wprowadzono API współbieżności. API znajduje się w pakiecie java.util.concurrent i zawiera wiele przydatnych klas do obsługi programowania współbieżnego. Od tego czasu API współbieżności było ulepszane z każdym nowym wydaniem Javy i nawet Java 8 dostarcza nowych klas i metod do radzenia sobie ze współbieżnością.

Przyjrzyjrzyjmy się teraz dokładniej jednej z najważniejszych części API współbieżności – usługom wykonawczym.

Wykonawcy

Aplikacja API współbieżności wprowadza pojęcie ExecutorService jako zamiennik wyższego poziomu do bezpośredniej pracy z wątkami. Executory są zdolne do uruchamiania asynchronicznych zadań i zazwyczaj zarządzają pulą wątków, więc nie musimy tworzyć nowych wątków ręcznie. Wszystkie wątki z wewnętrznej puli będą ponownie wykorzystane pod maską do zadań revenant, więc możemy uruchomić tyle współbieżnych zadań, ile chcemy w całym cyklu życia naszej aplikacji z pojedynczą usługą executora.

Tak wygląda pierwszy przykład wątku z wykorzystaniem executorów:

ExecutorService executor = Executors.newSingleThreadExecutor();executor.submit(() -> { String threadName = Thread.currentThread().getName(); System.out.println("Hello " + threadName);});// => Hello pool-1-thread-1

Klasa Executors udostępnia wygodne metody fabryczne do tworzenia różnego rodzaju usług executorów. W tym przykładzie używamy executora z pulą wątków o rozmiarze jeden.

Wynik wygląda podobnie jak w powyższym przykładzie, ale podczas uruchamiania kodu zauważysz ważną różnicę: proces java nigdy się nie zatrzymuje! Executory muszą być zatrzymywane w sposób jawny – w przeciwnym razie ciągle nasłuchują na nowe zadania.

An ExecutorService udostępnia dwie metody do tego celu: shutdown() czeka na zakończenie aktualnie uruchomionych zadań, natomiast shutdownNow() przerywa wszystkie uruchomione zadania i natychmiast zamyka executor.

To jest preferowany sposób, w jaki zazwyczaj zamykam executory:

try { System.out.println("attempt to shutdown executor"); executor.shutdown(); executor.awaitTermination(5, TimeUnit.SECONDS);}catch (InterruptedException e) { System.err.println("tasks interrupted");}finally { if (!executor.isTerminated()) { System.err.println("cancel non-finished tasks"); } executor.shutdownNow(); System.out.println("shutdown finished");}

Wykonawca zamyka się łagodnie, czekając określoną ilość czasu na zakończenie aktualnie uruchomionych zadań. Po maksymalnie pięciu sekundach executor ostatecznie wyłącza się przerywając wszystkie działające zadania.

Wywołania i kontrakty terminowe

Oprócz Runnable executory obsługują jeszcze jeden rodzaj zadań o nazwie Callable. Callables są funkcjonalnymi interfejsami tak jak runnables, ale zamiast być void zwracają wartość.

To wyrażenie lambda definiuje callable zwracającą liczbę całkowitą po uśpieniu przez jedną sekundę:

Callable<Integer> task = () -> { try { TimeUnit.SECONDS.sleep(1); return 123; } catch (InterruptedException e) { throw new IllegalStateException("task interrupted", e); }};

Callables mogą być przekazywane do usług executora tak samo jak runnable. Ale co z wynikiem wywołań? Ponieważ submit() nie czeka aż zadanie się zakończy, usługa wykonawcza nie może zwrócić wyniku wywołania bezpośrednio. Zamiast tego executor zwraca specjalny wynik typu Future , który może być użyty do pobrania rzeczywistego wyniku w późniejszym czasie.

ExecutorService executor = Executors.newFixedThreadPool(1);Future<Integer> future = executor.submit(task);System.out.println("future done? " + future.isDone());Integer result = future.get();System.out.println("future done? " + future.isDone());System.out.print("result: " + result);

Po przekazaniu callable do executora najpierw sprawdzamy, czy future zostało już zakończone wykonywanie poprzez isDone(). Jestem pewien, że tak nie jest, ponieważ powyższa wywołanie uśpione jest przez jedną sekundę przed zwróceniem liczby całkowitej.

Wywołanie metody get() blokuje bieżący wątek i czeka, aż wywołanie zakończy się przed zwróceniem rzeczywistego wyniku 123. Teraz future jest w końcu wykonany i widzimy następujący wynik na konsoli:

future done? falsefuture done? trueresult: 123

Futures są ściśle powiązane z bazową usługą executora. Należy pamiętać, że każdy nie zakończony future będzie rzucał wyjątkami, jeśli zamkniemy executor:

executor.shutdownNow();future.get();

Można zauważyć, że tworzenie executora różni się nieco od poprzedniego przykładu. Używamy newFixedThreadPool(1) aby utworzyć usługę executora wspieraną przez pulę wątków o rozmiarze jeden. Jest to odpowiednik newSingleThreadExecutor() ale możemy później zwiększyć rozmiar puli po prostu przekazując wartość większą niż jeden.

Timeouty

Każde wywołanie future.get() będzie blokować i czekać aż bazowa wywoływalna zostanie zakończona. W najgorszym przypadku wywołanie będzie działać wiecznie – czyniąc twoją aplikację nieresponsywną. Możesz po prostu przeciwdziałać takim scenariuszom, przekazując timeout:

ExecutorService executor = Executors.newFixedThreadPool(1);Future<Integer> future = executor.submit(() -> { try { TimeUnit.SECONDS.sleep(2); return 123; } catch (InterruptedException e) { throw new IllegalStateException("task interrupted", e); }});future.get(1, TimeUnit.SECONDS);

Wykonanie powyższego kodu skutkuje powstaniem TimeoutException:

Exception in thread "main" java.util.concurrent.TimeoutException at java.util.concurrent.FutureTask.get(FutureTask.java:205)

Możesz się już domyślić, dlaczego ten wyjątek jest rzucany: Określiliśmy maksymalny czas oczekiwania wynoszący jedną sekundę, ale wywołanie w rzeczywistości potrzebuje dwóch sekund przed zwróceniem wyniku.

InvokeAll

Eksperci wspierają wsadowe składanie wielu wywołań jednocześnie poprzez invokeAll(). Metoda ta akceptuje kolekcję wywołań i zwraca listę futures.

ExecutorService executor = Executors.newWorkStealingPool();List<Callable<String>> callables = Arrays.asList( () -> "task1", () -> "task2", () -> "task3");executor.invokeAll(callables) .stream() .map(future -> { try { return future.get(); } catch (Exception e) { throw new IllegalStateException(e); } }) .forEach(System.out::println);

W tym przykładzie wykorzystujemy strumienie funkcjonalne Javy 8 do przetwarzania wszystkich futures zwróconych przez wywołanie invokeAll. Najpierw mapujemy każdy future na jego wartość zwracaną, a następnie wypisujemy każdą wartość na konsolę. Jeśli nie jesteś jeszcze zaznajomiony ze strumieniami, przeczytaj mój poradnik Java 8 Stream Tutorial.

InvokeAny

Innym sposobem na wsadowe przesyłanie callables jest metoda invokeAny(), która działa nieco inaczej niż invokeAll(). Zamiast zwracać przyszłe obiekty, metoda ta blokuje się do momentu, aż pierwsza wywołanie zakończy się i zwraca wynik tego wywołania.

Aby przetestować to zachowanie, używamy tej metody pomocniczej do symulowania wywołań o różnym czasie trwania. Metoda zwraca wywołanie, które śpi przez określony czas, aż do zwrócenia podanego wyniku:

Callable<String> callable(String result, long sleepSeconds) { return () -> { TimeUnit.SECONDS.sleep(sleepSeconds); return result; };}

Użyjemy tej metody do stworzenia kilku wywołań o różnym czasie trwania od jednej do trzech sekund. Przekazanie tych callables do executora poprzez invokeAny() zwraca wynik łańcuchowy najszybszej callable – w tym przypadku task2:

ExecutorService executor = Executors.newWorkStealingPool();List<Callable<String>> callables = Arrays.asList( callable("task1", 2), callable("task2", 1), callable("task3", 3));String result = executor.invokeAny(callables);System.out.println(result);// => task2

Powyższy przykład wykorzystuje jeszcze jeden typ executora tworzonego za pośrednictwem newWorkStealingPool(). Ta metoda fabryczna jest częścią Javy 8 i zwraca executor typu ForkJoinPool, który działa nieco inaczej niż zwykłe executory. Zamiast używać stałej wielkości puli wątków, ForkJoinPools są tworzone dla danego rozmiaru równoległości, który domyślnie jest liczbą dostępnych rdzeni procesora hosta.

ForkJoinPools istnieją od Javy 7 i będą szczegółowo omówione w późniejszym tutorialu z tej serii. Na koniec przyjrzyjmy się bliżej zaplanowanym executorom.

Zaplanowane executory

Uczyliśmy się już jak przesyłać i uruchamiać zadania raz na executorze. Aby okresowo uruchamiać wspólne zadania wiele razy, możemy wykorzystać zaplanowane pule wątków.

A ScheduledExecutorService jest w stanie zaplanować zadania do uruchamiania okresowo lub jednorazowo po upływie określonego czasu.

Ta próbka kodu ustawia zadanie do uruchomienia po upłynięciu początkowego opóźnienia trzech sekund:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS);TimeUnit.MILLISECONDS.sleep(1337);long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS);System.out.printf("Remaining Delay: %sms", remainingDelay);

Zaplanowanie zadania powoduje wytworzenie wyspecjalizowanej przyszłości typu ScheduledFuture, która -. oprócz Future – udostępnia metodę getDelay() do pobierania pozostałego opóźnienia. Po upływie tego opóźnienia zadania będą wykonywane współbieżnie.

Aby zaplanować okresowe wykonywanie zadań, executory udostępniają dwie metody scheduleAtFixedRate() i scheduleWithFixedDelay(). Pierwsza z metod jest w stanie wykonywać zadania z ustaloną częstotliwością, np. raz na sekundę, jak pokazano w tym przykładzie:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());int initialDelay = 0;int period = 1;executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);

Dodatkowo metoda ta akceptuje początkowe opóźnienie, które opisuje wiodący czas oczekiwania, zanim zadanie zostanie wykonane po raz pierwszy.

Pamiętaj, że scheduleAtFixedRate() nie bierze pod uwagę rzeczywistego czasu trwania zadania. Jeśli więc określisz czas trwania zadania na jedną sekundę, ale zadanie będzie wymagało 2 sekund na wykonanie, to pula wątków bardzo szybko osiągnie swoją pojemność.

W takim przypadku powinieneś rozważyć użycie scheduleWithFixedDelay(). Ta metoda działa tak samo jak jej odpowiednik opisany powyżej. Różnica polega na tym, że czas oczekiwania dotyczy okresu pomiędzy zakończeniem zadania a rozpoczęciem kolejnego zadania. Na przykład:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);Runnable task = () -> { try { TimeUnit.SECONDS.sleep(2); System.out.println("Scheduling: " + System.nanoTime()); } catch (InterruptedException e) { System.err.println("task interrupted"); }};executor.scheduleWithFixedDelay(task, 0, 1, TimeUnit.SECONDS);

Ten przykład planuje zadanie ze stałym opóźnieniem jednej sekundy między końcem wykonania a początkiem następnego wykonania. Początkowe opóźnienie wynosi zero, a czas trwania zadania to dwie sekundy. Więc kończymy z interwałem wykonania 0s, 3s, 6s, 9s i tak dalej. Jak widzisz scheduleWithFixedDelay() jest przydatny, jeśli nie możesz przewidzieć czasu trwania zaplanowanych zadań.

To była pierwsza część z serii tutoriali dotyczących współbieżności. Polecam samodzielne przećwiczenie pokazanych próbek kodu. Wszystkie próbki kodu z tego artykułu znajdziesz na GitHubie, więc nie krępuj się rozwidlić repo i dać mi gwiazdkę.

Dodaj komentarz

Twój adres email nie zostanie opublikowany. Pola, których wypełnienie jest wymagane, są oznaczone symbolem *