Articles

Java 8 Concurrency チュートリアル。 Threads and Executors

Posted on

Java 8 Concurrency チュートリアルの第 1 部にようこそ。 このガイドでは、わかりやすいコード例を用いて、Java 8 での並行プログラミングを学びます。 これは、Java Concurrency API をカバーする一連のチュートリアルのうちの最初の部分です。

  • Part 1: Threads and Executors
  • Part 2: Synchronization and Locks
  • Part 3: Atomic Variables and ConcurrentMap

Concurrency API は、Java 5 のリリース時に初めて導入され、その後、新しい Java のリリースごとに徐々に強化されてきました。 この記事で紹介されているコンセプトの大半は、古いバージョンの Java でも機能します。 しかし、私のコードサンプルはJava 8に焦点を当て、ラムダ式やその他の新機能を多用しています。

スレッドとランナブル

すべての最新のオペレーティング システムは、プロセスとスレッドの両方による同時実行をサポートしています。 プロセスは、通常、互いに独立して実行されるプログラムのインスタンスです。たとえば、Java プログラムを起動すると、オペレーティング システムは、他のプログラムと並行して実行される新しいプロセスを生成します。

Java は JDK 1.0 からスレッドをサポートしています。 新しいスレッドを開始する前に、このスレッドで実行されるコードを指定する必要がありますが、これはしばしばタスクと呼ばれます。 これは、Runnablerun()は、次の例で示すように、単一のvoid no-argsメソッドを定義する機能的なインターフェイスです。

Runnable task = () -> { String threadName = Thread.currentThread().getName(); System.out.println("Hello " + threadName);};task.run();Thread thread = new Thread(task);thread.start();System.out.println("Done!");

Runnable は関数型インターフェースなので、Java 8 のラムダ式を利用して、現在のスレッド名をコンソールに表示することができます。 まず、新しいスレッドを開始する前に、メイン スレッド上で実行可能なプログラムを直接実行します。

コンソールに表示される結果は次のようになります。

Hello mainHello Thread-0Done!

あるいは、次のようになります。

Hello mainDone!Hello Thread-0

同時実行のため、ランナブルが「done」と表示される前に呼び出されるのか、それとも「done」と表示された後に呼び出されるのかは予測できません。

スレッドは一定時間スリープ状態にすることができます。

Runnable runnable = () -> { try { String name = Thread.currentThread().getName(); System.out.println("Foo " + name); TimeUnit.SECONDS.sleep(1); System.out.println("Bar " + name); } catch (InterruptedException e) { e.printStackTrace(); }};Thread thread = new Thread(runnable);thread.start();

上記のコードを実行すると、最初の print 文と 2 番目の print 文の間に 1 秒の遅延があることに気づくでしょう。 TimeUnitは、時間の単位を扱うのに便利な列挙体です。

Thread クラスでの作業は非常に面倒で、エラーが発生しやすいものです。 このような理由から、2004年にJava 5がリリースされたときに、Concurrency APIが導入されました。 この API は java.util.concurrent パッケージに含まれており、並行プログラミングを処理するための便利なクラスが多数含まれています。

さて、Concurrency API の最も重要な部分の 1 つであるエクゼキュータ サービスについて詳しく見てみましょう。

エクゼキュータ

Concurrency API では、スレッドを直接操作するための高レベルな代替手段として ExecutorService の概念を導入しています。 エクゼキュータは非同期タスクを実行することができ、通常はスレッドのプールを管理するので、新しいスレッドを手動で作成する必要はありません。 内部プールのすべてのスレッドは、リベンジ・タスクのために再利用されますので、アプリケーションのライフサイクルを通じて、1つのエクゼキュータ・サービスで必要なだけの同時タスクを実行することができます。

エクゼキュータを使用した最初のスレッドサンプルは次のようになります。

ExecutorService executor = Executors.newSingleThreadExecutor();executor.submit(() -> { String threadName = Thread.currentThread().getName(); System.out.println("Hello " + threadName);});// => Hello pool-1-thread-1

クラス Executors は、さまざまな種類のエクゼキュータ サービスを作成するための便利なファクトリ メソッドを提供しています。

結果は上記のサンプルと似ていますが、コードを実行してみると、重要な違いに気づくでしょう。

ExecutorServiceshutdown()shutdownNow()は実行中のすべてのタスクに割り込んでエクゼキュータを直ちにシャットダウンします。

これは、私が一般的に実行者をシャットダウンする好ましい方法です。

try { System.out.println("attempt to shutdown executor"); executor.shutdown(); executor.awaitTermination(5, TimeUnit.SECONDS);}catch (InterruptedException e) { System.err.println("tasks interrupted");}finally { if (!executor.isTerminated()) { System.err.println("cancel non-finished tasks"); } executor.shutdownNow(); System.out.println("shutdown finished");}

実行者は、現在実行中のタスクの終了を一定時間待つことで、ソフト的にシャットダウンします。

Callables and Futures

RunnableCallableという別の種類のタスクをサポートしています。 Callableはrunnableと同様に機能的なインターフェースですが、voidではなく、値を返します。

以下のラムダ式は、1秒間スリープした後に整数を返す呼び出し可能なものを定義しています:

Callable<Integer> task = () -> { try { TimeUnit.SECONDS.sleep(1); return 123; } catch (InterruptedException e) { throw new IllegalStateException("task interrupted", e); }};

呼び出し可能なものは、ランナブルのように実行サービスに提出することができます。 しかし、callablesの結果はどうでしょうか? submit()Future 型の特別な結果を返し、後で実際の結果を取得するために使用できます。

ExecutorService executor = Executors.newFixedThreadPool(1);Future<Integer> future = executor.submit(task);System.out.println("future done? " + future.isDone());Integer result = future.get();System.out.println("future done? " + future.isDone());System.out.print("result: " + result);

実行者に呼び出し可能なファイルを送信した後、まずisDone()を介して未来の実行が既に終了しているかどうかを確認します。

メソッドを呼び出すと get() 現在のスレッドがブロックされ、実際の結果を返す前に呼び出しが完了するまで待機します。

future done? falsefuture done? trueresult: 123

フューチャーは、基盤となるエクゼキュータ サービスと緊密に結合しています。

executor.shutdownNow();future.get();

エクゼキュータの作成方法が前の例と少し違うことにお気づきでしょうか。 newFixedThreadPool(1)を使用して、サイズ1のスレッドプールでバックアップされたエクゼキュータサービスを作成します。

タイムアウト

future.get()への呼び出しは、基礎となる callable が終了するまでブロックして待機します。 最悪の場合、callable は永遠に実行され、アプリケーションが応答しなくなります。

ExecutorService executor = Executors.newFixedThreadPool(1);Future<Integer> future = executor.submit(() -> { try { TimeUnit.SECONDS.sleep(2); return 123; } catch (InterruptedException e) { throw new IllegalStateException("task interrupted", e); }});future.get(1, TimeUnit.SECONDS);

上記のコードを実行すると、TimeoutException が生成されます。

Exception in thread "main" java.util.concurrent.TimeoutException at java.util.concurrent.FutureTask.get(FutureTask.java:205)

この例外がスローされる理由は、もうお分かりかと思います。

InvokeAll

エクスクルーシブでは、invokeAll()を介して複数の呼び出し可能なオブジェクトを一度に一括送信することができます。

ExecutorService executor = Executors.newWorkStealingPool();List<Callable<String>> callables = Arrays.asList( () -> "task1", () -> "task2", () -> "task3");executor.invokeAll(callables) .stream() .map(future -> { try { return future.get(); } catch (Exception e) { throw new IllegalStateException(e); } }) .forEach(System.out::println);

この例では、invokeAllの呼び出しによって返されたすべてのフューチャーを処理するために、Java 8の関数ストリームを利用します。 まず、各フューチャーをその戻り値にマッピングし、次に各値をコンソールに出力します。

InvokeAny

呼び出し可能なものを一括して送信するもう 1 つの方法は、invokeAny()invokeAll() とは若干動作が異なります。

この動作をテストするために、このヘルパー メソッドを使用してさまざまな持続時間の callable をシミュレートします。

この動作をテストするために、このヘルパー メソッドを使用して、異なる持続時間を持つ呼び出し可能性をシミュレートします。このメソッドは、指定された結果を返すまで一定の時間スリープする呼び出し可能性を返します。 invokeAny()を介してこれらの呼び出し可能なものを実行者に送信すると、最も速い呼び出し可能なものの文字列結果が返されます(この場合はtask2)。

ExecutorService executor = Executors.newWorkStealingPool();List<Callable<String>> callables = Arrays.asList( callable("task1", 2), callable("task2", 1), callable("task3", 3));String result = executor.invokeAny(callables);System.out.println(result);// => task2

上記の例では、newWorkStealingPool()を介して作成された別のタイプのエクゼキュータを使用しています。 このファクトリーメソッドはJava 8に含まれており、ForkJoinPoolタイプのエクゼキュータを返しますが、通常のエクゼキュータとは動作が若干異なります。 固定サイズのスレッドプールを使用する代わりに、ForkJoinPoolは、デフォルトではホストCPUの利用可能なコア数である、指定された並列化サイズに対して作成されます。

スケジューリングされたエクゼキュータ

エクゼキュータ上でタスクを一度だけ投入して実行する方法はすでに学びました。

ScheduledExecutorService は、タスクを定期的に実行するか、または一定の時間が経過した後に一度だけ実行するようにスケジューリングすることができます。

このコードサンプルでは、最初に3秒の遅延が発生した後にタスクを実行するようにスケジュールしています。

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS);TimeUnit.MILLISECONDS.sleep(1337);long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS);System.out.printf("Remaining Delay: %sms", remainingDelay);

タスクをスケジューリングすると、ScheduledFutureFuturegetDelay()が用意されています。

定期的に実行されるタスクをスケジュールするために、エクゼキュータはscheduleAtFixedRate()scheduleWithFixedDelay()の2つのメソッドを提供します。

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());int initialDelay = 0;int period = 1;executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);

さらに、このメソッドは、タスクが最初に実行されるまでの待機時間を表す初期遅延を受け入れます。

scheduleAtFixedRate()は、タスクの実際の期間を考慮していないことに留意してください。

そのような場合は、代わりに scheduleWithFixedDelay() の使用を検討してください。 この方法は、前述の方法と同じように動作します。 違いは、待ち時間が、タスクの終了から次のタスクの開始までの間に適用されることです。 例:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);Runnable task = () -> { try { TimeUnit.SECONDS.sleep(2); System.out.println("Scheduling: " + System.nanoTime()); } catch (InterruptedException e) { System.err.println("task interrupted"); }};executor.scheduleWithFixedDelay(task, 0, 1, TimeUnit.SECONDS);

この例では、実行の終了と次の実行の開始の間に1秒の固定遅延があるタスクをスケジュールします。 最初の遅延はゼロで、タスクの継続時間は2秒です。 つまり、実行間隔は0秒、3秒、6秒、9秒……となっています。

これは、一連の同時実行チュートリアルのうちの最初の部分でした。 表示されているコードサンプルをご自身で練習されることをお勧めします。 この記事に掲載されているすべてのコードサンプルはGitHubにありますので、ご自由にリポジトリをフォークして、私にスターをつけてください

コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です