Articles

Tutorial sulla concorrenza in Java 8: Thread ed esecutori

Posted on

Benvenuti alla prima parte del mio tutorial su Java 8 Concurrency. Questa guida ti insegna la programmazione concorrente in Java 8 con esempi di codice facilmente comprensibili. È la prima parte di una serie di tutorial che coprono l’API Java Concurrency. Nei prossimi 15 minuti imparerete come eseguire codice in parallelo tramite thread, task e servizi executor.

  • Parte 1: Threads ed Executors
  • Parte 2: Sincronizzazione e Locks
  • Parte 3: Variabili Atomiche e ConcurrentMap

La Concurrency API è stata introdotta con il rilascio di Java 5 e poi progressivamente migliorata con ogni nuova release di Java. La maggior parte dei concetti mostrati in questo articolo funzionano anche nelle vecchie versioni di Java. Tuttavia i miei esempi di codice si concentrano su Java 8 e fanno largo uso di espressioni lambda e altre nuove caratteristiche. Se non avete ancora familiarità con le lambda vi consiglio di leggere prima il mio tutorial su Java 8.

Threads e Runnables

Tutti i sistemi operativi moderni supportano la concorrenza sia tramite processi che tramite threads. I processi sono istanze di programmi che tipicamente vengono eseguiti indipendentemente l’uno dall’altro, ad esempio se si avvia un programma java il sistema operativo genera un nuovo processo che viene eseguito in parallelo ad altri programmi. All’interno di questi processi possiamo utilizzare i thread per eseguire il codice simultaneamente, in modo da poter sfruttare al meglio i core disponibili della CPU.

Java supporta i thread dal JDK 1.0. Prima di iniziare un nuovo thread è necessario specificare il codice da eseguire da questo thread, spesso chiamato task. Questo viene fatto implementando Runnable – un’interfaccia funzionale che definisce un singolo metodo void no-args run() come dimostrato nel seguente esempio:

Runnable task = () -> { String threadName = Thread.currentThread().getName(); System.out.println("Hello " + threadName);};task.run();Thread thread = new Thread(task);thread.start();System.out.println("Done!");

Siccome Runnable è un’interfaccia funzionale possiamo utilizzare le espressioni lambda di Java 8 per stampare il nome del thread corrente sulla console. Prima eseguiamo il runnable direttamente sul thread principale prima di iniziare un nuovo thread.

Il risultato sulla console potrebbe essere come questo:

Hello mainHello Thread-0Done!

Ovvero:

Hello mainDone!Hello Thread-0

A causa dell’esecuzione concorrente non possiamo prevedere se il runnable sarà invocato prima o dopo aver stampato ‘done’. L’ordine non è deterministico, rendendo così la programmazione concorrente un compito complesso nelle applicazioni più grandi.

I thread possono essere messi a riposo per una certa durata. Questo è abbastanza utile per simulare attività di lunga durata nei successivi esempi di codice di questo articolo:

Runnable runnable = () -> { try { String name = Thread.currentThread().getName(); System.out.println("Foo " + name); TimeUnit.SECONDS.sleep(1); System.out.println("Bar " + name); } catch (InterruptedException e) { e.printStackTrace(); }};Thread thread = new Thread(runnable);thread.start();

Quando si esegue il codice di cui sopra si noterà il ritardo di un secondo tra la prima e la seconda istruzione di stampa. TimeUnit è un enum utile per lavorare con le unità di tempo. In alternativa potete ottenere lo stesso risultato chiamando Thread.sleep(1000).

Lavorare con la classe Thread può essere molto noioso e soggetto a errori. Per questo motivo la Concurrency API è stata introdotta nel 2004 con il rilascio di Java 5. L’API si trova nel pacchetto java.util.concurrent e contiene molte classi utili per gestire la programmazione concorrente. Da allora la Concurrency API è stata migliorata con ogni nuova release di Java e anche Java 8 fornisce nuove classi e metodi per gestire la concorrenza.

Ora diamo uno sguardo più approfondito a una delle parti più importanti della Concurrency API – i servizi executor.

Executors

La Concurrency API introduce il concetto di un ExecutorService come sostituzione di livello superiore per lavorare direttamente con i thread. Gli esecutori sono in grado di eseguire compiti asincroni e tipicamente gestiscono un pool di thread, quindi non dobbiamo creare nuovi thread manualmente. Tutti i thread del pool interno saranno riutilizzati sotto il cofano per i compiti revenant, così possiamo eseguire tutti i compiti concorrenti che vogliamo durante il ciclo di vita della nostra applicazione con un singolo servizio executor.

Ecco come appare il primo esempio di thread usando gli executor:

ExecutorService executor = Executors.newSingleThreadExecutor();executor.submit(() -> { String threadName = Thread.currentThread().getName(); System.out.println("Hello " + threadName);});// => Hello pool-1-thread-1

La classe Executors fornisce comodi metodi factory per creare diversi tipi di servizi executor. In questo esempio usiamo un executor con un pool di thread di dimensione uno.

Il risultato sembra simile all’esempio precedente ma quando si esegue il codice si nota un’importante differenza: il processo java non si ferma mai! Gli esecutori devono essere fermati esplicitamente – altrimenti continuano ad ascoltare per nuovi compiti.

Un ExecutorService fornisce due metodi per questo scopo: shutdown() attende che i compiti attualmente in esecuzione finiscano mentre shutdownNow() interrompe tutti i compiti in esecuzione e spegne l’esecutore immediatamente.

Questo è il modo preferito in cui tipicamente spengo gli esecutori:

try { System.out.println("attempt to shutdown executor"); executor.shutdown(); executor.awaitTermination(5, TimeUnit.SECONDS);}catch (InterruptedException e) { System.err.println("tasks interrupted");}finally { if (!executor.isTerminated()) { System.err.println("cancel non-finished tasks"); } executor.shutdownNow(); System.out.println("shutdown finished");}

L’esecutore si spegne dolcemente aspettando una certa quantità di tempo per la fine dei compiti in esecuzione. Dopo un massimo di cinque secondi l’esecutore si spegne definitivamente interrompendo tutti i compiti in esecuzione.

Callables e Futures

In aggiunta a Runnable gli esecutori supportano un altro tipo di compito chiamato Callable. I callables sono interfacce funzionali proprio come i runnables ma invece di essere void restituiscono un valore.

Questa espressione lambda definisce un callable che restituisce un intero dopo aver dormito per un secondo:

Callable<Integer> task = () -> { try { TimeUnit.SECONDS.sleep(1); return 123; } catch (InterruptedException e) { throw new IllegalStateException("task interrupted", e); }};

I callables possono essere sottoposti a servizi executor proprio come i runnables. Ma che dire del risultato dei callable? Dato che submit() non aspetta che il task sia completato, il servizio executor non può restituire direttamente il risultato del callable. Invece l’esecutore restituisce un risultato speciale di tipo Future che può essere usato per recuperare il risultato effettivo in un momento successivo.

ExecutorService executor = Executors.newFixedThreadPool(1);Future<Integer> future = executor.submit(task);System.out.println("future done? " + future.isDone());Integer result = future.get();System.out.println("future done? " + future.isDone());System.out.print("result: " + result);

Dopo aver sottoposto il callable all’executor controlliamo prima se il futuro ha già terminato l’esecuzione tramite isDone(). Sono abbastanza sicuro che questo non sia il caso dato che il callable di cui sopra dorme per un secondo prima di restituire l’intero.

Chiamando il metodo get() si blocca il thread corrente e si aspetta che il callable completi prima di restituire il risultato effettivo 123. Ora il future è finalmente finito e vediamo il seguente risultato sulla console:

future done? falsefuture done? trueresult: 123

I futures sono strettamente accoppiati al servizio executor sottostante. Tenete a mente che ogni futuro non terminato lancerà delle eccezioni se spegnete l’executor:

executor.shutdownNow();future.get();

Avrete notato che la creazione dell’executor differisce leggermente dall’esempio precedente. Usiamo newFixedThreadPool(1) per creare un servizio executor supportato da un thread-pool di dimensione uno. Questo è equivalente a newSingleThreadExecutor() ma potremmo in seguito aumentare la dimensione del pool semplicemente passando un valore maggiore di uno.

Timeout

Ogni chiamata a future.get() si bloccherà e aspetterà finché la callable sottostante non sarà terminata. Nel peggiore dei casi un callable viene eseguito per sempre – rendendo così la vostra applicazione non reattiva. Potete semplicemente contrastare questi scenari passando un timeout:

ExecutorService executor = Executors.newFixedThreadPool(1);Future<Integer> future = executor.submit(() -> { try { TimeUnit.SECONDS.sleep(2); return 123; } catch (InterruptedException e) { throw new IllegalStateException("task interrupted", e); }});future.get(1, TimeUnit.SECONDS);

Eseguendo il codice precedente si ottiene un TimeoutException:

Exception in thread "main" java.util.concurrent.TimeoutException at java.util.concurrent.FutureTask.get(FutureTask.java:205)

Avrete già capito perché viene lanciata questa eccezione: Abbiamo specificato un tempo massimo di attesa di un secondo, ma il callable in realtà ha bisogno di due secondi prima di restituire il risultato.

InvokeAll

Gli esecutori supportano l’invio in batch di più callable in una volta sola tramite invokeAll(). Questo metodo accetta una collezione di callable e restituisce una lista di future.

ExecutorService executor = Executors.newWorkStealingPool();List<Callable<String>> callables = Arrays.asList( () -> "task1", () -> "task2", () -> "task3");executor.invokeAll(callables) .stream() .map(future -> { try { return future.get(); } catch (Exception e) { throw new IllegalStateException(e); } }) .forEach(System.out::println);

In questo esempio utilizziamo gli stream funzionali di Java 8 per elaborare tutti i future restituiti dall’invocazione di invokeAll. Prima mappiamo ogni futuro al suo valore di ritorno e poi stampiamo ogni valore nella console. Se non avete ancora familiarità con gli stream leggete il mio Java 8 Stream Tutorial.

InvokeAny

Un altro modo di inviare callable in batch è il metodo invokeAny() che funziona in modo leggermente diverso da invokeAll(). Invece di restituire oggetti futuri, questo metodo si blocca finché il primo callable non termina e restituisce il risultato di quel callable.

Per testare questo comportamento usiamo questo metodo helper per simulare callable con durate diverse. Il metodo restituisce un callable che dorme per una certa quantità di tempo fino a restituire il risultato dato:

Callable<String> callable(String result, long sleepSeconds) { return () -> { TimeUnit.SECONDS.sleep(sleepSeconds); return result; };}

Utilizziamo questo metodo per creare un gruppo di callable con durate diverse da uno a tre secondi. Inviando questi callable a un executor tramite invokeAny() si ottiene il risultato della stringa del callable più veloce – in questo caso il task2:

ExecutorService executor = Executors.newWorkStealingPool();List<Callable<String>> callables = Arrays.asList( callable("task1", 2), callable("task2", 1), callable("task3", 3));String result = executor.invokeAny(callables);System.out.println(result);// => task2

L’esempio precedente usa ancora un altro tipo di executor creato tramite newWorkStealingPool(). Questo metodo di fabbrica fa parte di Java 8 e restituisce un esecutore di tipo ForkJoinPool che funziona leggermente diverso dai normali esecutori. Invece di usare un thread-pool di dimensioni fisse, i ForkJoinPools vengono creati per una data dimensione di parallelismo che per default è il numero di core disponibili della CPU host.

I ForkJoinPools esistono da Java 7 e saranno trattati in dettaglio in un tutorial successivo di questa serie. Concludiamo questo tutorial dando uno sguardo più approfondito agli esecutori programmati.

Esecutori programmati

Abbiamo già imparato come presentare ed eseguire compiti una volta su un esecutore. Per eseguire periodicamente compiti comuni più volte, possiamo utilizzare i pool di thread programmati.

Un ScheduledExecutorService è in grado di programmare compiti da eseguire periodicamente o una volta dopo che è trascorso un certo periodo di tempo.

Questo esempio di codice programma un compito da eseguire dopo un ritardo iniziale di tre secondi:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS);TimeUnit.MILLISECONDS.sleep(1337);long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS);System.out.printf("Remaining Delay: %sms", remainingDelay);

La programmazione di un task produce un futuro specializzato di tipo ScheduledFuture che – oltre a Future – fornisce il metodo getDelay() per recuperare il ritardo rimanente. Dopo che questo ritardo è trascorso, il compito sarà eseguito contemporaneamente.

Per programmare i compiti da eseguire periodicamente, gli esecutori forniscono i due metodi scheduleAtFixedRate() e scheduleWithFixedDelay(). Il primo metodo è in grado di eseguire compiti con una frequenza temporale fissa, ad esempio una volta al secondo come dimostrato in questo esempio:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());int initialDelay = 0;int period = 1;executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);

Inoltre questo metodo accetta un ritardo iniziale che descrive il tempo di attesa prima che il compito venga eseguito per la prima volta.

Tenete presente che scheduleAtFixedRate() non tiene conto della durata effettiva del task. Quindi, se si specifica un periodo di un secondo ma il compito ha bisogno di 2 secondi per essere eseguito, allora il pool di thread lavorerà a pieno regime molto presto.

In questo caso si dovrebbe considerare l’utilizzo di scheduleWithFixedDelay() invece. Questo metodo funziona proprio come la controparte descritta sopra. La differenza è che il periodo di attesa si applica tra la fine di un compito e l’inizio del compito successivo. Per esempio:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);Runnable task = () -> { try { TimeUnit.SECONDS.sleep(2); System.out.println("Scheduling: " + System.nanoTime()); } catch (InterruptedException e) { System.err.println("task interrupted"); }};executor.scheduleWithFixedDelay(task, 0, 1, TimeUnit.SECONDS);

Questo esempio programma un task con un ritardo fisso di un secondo tra la fine di un’esecuzione e l’inizio dell’esecuzione successiva. Il ritardo iniziale è zero e la durata del compito è di due secondi. Così ci ritroviamo con un intervallo di esecuzione di 0s, 3s, 6s, 9s e così via. Come potete vedere scheduleWithFixedDelay() è utile se non potete prevedere la durata dei compiti programmati.

Questa era la prima parte di una serie di tutorial sulla concorrenza. Raccomando di mettere in pratica gli esempi di codice mostrati da soli. Trovate tutti gli esempi di codice di questo articolo su GitHub, quindi sentitevi liberi di fare un fork del repo e datemi una stella.

Lascia un commento

Il tuo indirizzo email non sarà pubblicato. I campi obbligatori sono contrassegnati *