Articles

Tutorial de concurrencia en Java 8: Hilos y Ejecutores

Posted on

Bienvenidos a la primera parte de mi tutorial de Java 8 Concurrencia. Esta guía te enseña la programación concurrente en Java 8 con ejemplos de código fáciles de entender. Es la primera parte de una serie de tutoriales que cubren la API de Java Concurrency. En los siguientes 15 minutos aprenderás a ejecutar código en paralelo a través de hilos, tareas y servicios de ejecución.

  • Parte 1: Hilos y ejecutores
  • Parte 2: Sincronización y bloqueos
  • Parte 3: Variables atómicas y ConcurrentMap
  • La API de concurrencia se introdujo por primera vez con el lanzamiento de Java 5 y luego se fue mejorando progresivamente con cada nueva versión de Java. La mayoría de los conceptos mostrados en este artículo también funcionan en versiones anteriores de Java. Sin embargo, mis ejemplos de código se centran en Java 8 y hacen un uso intensivo de las expresiones lambda y otras nuevas características. Si aún no estás familiarizado con las lambdas, te recomiendo que leas primero mi Tutorial de Java 8.

    Hilos y Runnables

    Todos los sistemas operativos modernos soportan concurrencia tanto a través de procesos como de hilos. Los procesos son instancias de programas que normalmente se ejecutan de forma independiente a los demás, por ejemplo, si se inicia un programa java el sistema operativo genera un nuevo proceso que se ejecuta en paralelo a otros programas. Dentro de esos procesos podemos utilizar hilos para ejecutar código de forma concurrente, de forma que podemos aprovechar al máximo los núcleos disponibles de la CPU.

    Java soporta hilos desde el JDK 1.0. Antes de iniciar un nuevo hilo hay que especificar el código que va a ejecutar este hilo, a menudo llamado tarea. Esto se hace implementando Runnable – una interfaz funcional que define un único método void no-args run() como se demuestra en el siguiente ejemplo:

    Runnable task = () -> { String threadName = Thread.currentThread().getName(); System.out.println("Hello " + threadName);};task.run();Thread thread = new Thread(task);thread.start();System.out.println("Done!");

    Dado que Runnable es una interfaz funcional podemos utilizar expresiones lambda de Java 8 para imprimir el nombre de los hilos actuales en la consola. Primero ejecutamos el runnable directamente en el hilo principal antes de iniciar un nuevo hilo.

    El resultado en la consola podría ser el siguiente:

    Hello mainHello Thread-0Done!

    O esto:

    Hello mainDone!Hello Thread-0

    Debido a la ejecución concurrente no podemos predecir si el runnable será invocado antes o después de imprimir ‘done’. El orden no es determinista, lo que hace que la programación concurrente sea una tarea compleja en aplicaciones de gran tamaño.

    Los hilos pueden ponerse en reposo durante un tiempo determinado. Esto es bastante útil para simular tareas de larga duración en los siguientes ejemplos de código de este artículo:

    Runnable runnable = () -> { try { String name = Thread.currentThread().getName(); System.out.println("Foo " + name); TimeUnit.SECONDS.sleep(1); System.out.println("Bar " + name); } catch (InterruptedException e) { e.printStackTrace(); }};Thread thread = new Thread(runnable);thread.start();

    Cuando ejecutes el código anterior notarás el retraso de un segundo entre la primera y la segunda sentencia de impresión. TimeUnit es un enum útil para trabajar con unidades de tiempo. Alternativamente puedes conseguir lo mismo llamando a Thread.sleep(1000).

    Trabajar con la clase Thread puede ser muy tedioso y propenso a errores. Por esta razón se introdujo la API de Concurrencia en 2004 con el lanzamiento de Java 5. La API se encuentra en el paquete java.util.concurrent y contiene muchas clases útiles para manejar la programación concurrente. Desde entonces la API de Concurrencia ha sido mejorada con cada nueva versión de Java e incluso Java 8 proporciona nuevas clases y métodos para tratar la concurrencia.

    Ahora vamos a profundizar en una de las partes más importantes de la API de Concurrencia – los servicios de ejecutor.

    Ejecutores

    La API de Concurrencia introduce el concepto de un ExecutorService como un reemplazo de más alto nivel para trabajar con hilos directamente. Los ejecutores son capaces de ejecutar tareas asíncronas y suelen gestionar un pool de hilos, por lo que no tenemos que crear nuevos hilos manualmente. Todos los hilos del pool interno se reutilizarán bajo el capó para las tareas revenant, por lo que podemos ejecutar tantas tareas concurrentes como queramos a lo largo del ciclo de vida de nuestra aplicación con un único servicio ejecutor.

    Así es como se ve el primer ejemplo de hilo usando ejecutores:

    ExecutorService executor = Executors.newSingleThreadExecutor();executor.submit(() -> { String threadName = Thread.currentThread().getName(); System.out.println("Hello " + threadName);});// => Hello pool-1-thread-1

    La clase Executors proporciona métodos de fábrica convenientes para crear diferentes tipos de servicios de ejecutor. En este ejemplo utilizamos un ejecutor con un pool de hilos de tamaño uno.

    El resultado es similar al del ejemplo anterior, pero al ejecutar el código notarás una importante diferencia: ¡el proceso java nunca se detiene! Los ejecutores tienen que ser detenidos explícitamente – de lo contrario siguen escuchando nuevas tareas.

    Un ExecutorService proporciona dos métodos para ese propósito: shutdown() espera a que las tareas que se están ejecutando actualmente terminen mientras que shutdownNow() interrumpe todas las tareas en ejecución y apaga el ejecutor inmediatamente.

    Esta es la forma preferida en la que suelo apagar los ejecutores:

    try { System.out.println("attempt to shutdown executor"); executor.shutdown(); executor.awaitTermination(5, TimeUnit.SECONDS);}catch (InterruptedException e) { System.err.println("tasks interrupted");}finally { if (!executor.isTerminated()) { System.err.println("cancel non-finished tasks"); } executor.shutdownNow(); System.out.println("shutdown finished");}

    El ejecutor se apaga suavemente esperando un cierto tiempo para la terminación de las tareas actualmente en ejecución. Después de un máximo de cinco segundos el ejecutor finalmente se apaga interrumpiendo todas las tareas en ejecución.

    Callables y Futuros

    Además de Runnable los ejecutores soportan otro tipo de tarea llamada Callable. Los callables son interfaces funcionales al igual que los runnables pero en lugar de ser void devuelven un valor.

    Esta expresión lambda define un callable que devuelve un entero después de dormir un segundo:

    Callable<Integer> task = () -> { try { TimeUnit.SECONDS.sleep(1); return 123; } catch (InterruptedException e) { throw new IllegalStateException("task interrupted", e); }};

    Los callables pueden ser enviados a servicios ejecutores al igual que los runnables. Pero, ¿qué pasa con el resultado de los callables? Dado que submit() no espera hasta que la tarea se complete, el servicio ejecutor no puede devolver el resultado de la llamada directamente. En su lugar, el ejecutor devuelve un resultado especial de tipo Future que puede utilizarse para recuperar el resultado real en un momento posterior.

    ExecutorService executor = Executors.newFixedThreadPool(1);Future<Integer> future = executor.submit(task);System.out.println("future done? " + future.isDone());Integer result = future.get();System.out.println("future done? " + future.isDone());System.out.print("result: " + result);

    Después de enviar el callable al ejecutor primero comprobamos si el futuro ya se ha terminado de ejecutar mediante isDone(). Estoy bastante seguro de que este no es el caso ya que el callable anterior duerme durante un segundo antes de devolver el entero.

    La llamada al método get() bloquea el hilo actual y espera hasta que el callable finalice antes de devolver el resultado real 123. Ahora el futuro está finalmente hecho y vemos el siguiente resultado en la consola:

    future done? falsefuture done? trueresult: 123

    Los futuros están estrechamente acoplados al servicio ejecutor subyacente. Ten en cuenta que todo futuro no terminado lanzará excepciones si apagas el ejecutor:

    executor.shutdownNow();future.get();

    Habrás notado que la creación del ejecutor difiere ligeramente del ejemplo anterior. Utilizamos newFixedThreadPool(1) para crear un servicio ejecutor respaldado por un thread-pool de tamaño uno. Esto es equivalente a newSingleThreadExecutor() pero más adelante podríamos aumentar el tamaño del pool simplemente pasando un valor mayor que uno.

    Tiempos de espera

    Cualquier llamada a future.get() se bloqueará y esperará hasta que el callable subyacente haya finalizado. En el peor de los casos un callable se ejecuta para siempre – haciendo así que su aplicación no responda. Puedes simplemente contrarrestar esos escenarios pasando un tiempo de espera:

    ExecutorService executor = Executors.newFixedThreadPool(1);Future<Integer> future = executor.submit(() -> { try { TimeUnit.SECONDS.sleep(2); return 123; } catch (InterruptedException e) { throw new IllegalStateException("task interrupted", e); }});future.get(1, TimeUnit.SECONDS);

    Ejecutando el código anterior se obtiene un TimeoutException:

    Exception in thread "main" java.util.concurrent.TimeoutException at java.util.concurrent.FutureTask.get(FutureTask.java:205)

    Ya habrás adivinado por qué se lanza esta excepción: Hemos especificado un tiempo de espera máximo de un segundo, pero la llamada en realidad necesita dos segundos antes de devolver el resultado.

    InvokeAll

    Los ejecutores admiten el envío por lotes de varias llamadas a la vez a través de invokeAll(). Este método acepta una colección de callables y devuelve una lista de futuros.

    ExecutorService executor = Executors.newWorkStealingPool();List<Callable<String>> callables = Arrays.asList( () -> "task1", () -> "task2", () -> "task3");executor.invokeAll(callables) .stream() .map(future -> { try { return future.get(); } catch (Exception e) { throw new IllegalStateException(e); } }) .forEach(System.out::println);

    En este ejemplo utilizamos flujos funcionales de Java 8 para procesar todos los futuros devueltos por la invocación de invokeAll. Primero mapeamos cada futuro a su valor de retorno y luego imprimimos cada valor en la consola. Si aún no estás familiarizado con los streams lee mi Tutorial de Stream en Java 8.

    InvocarCualquier

    Otra forma de enviar callables por lotes es el método invokeAny() que funciona de forma ligeramente diferente a invokeAll(). En lugar de devolver objetos futuros este método se bloquea hasta que el primer callable termina y devuelve el resultado de ese callable.

    Para probar este comportamiento utilizamos este método de ayuda para simular callables con diferentes duraciones. El método devuelve un callable que duerme durante un tiempo determinado hasta devolver el resultado dado:

    Callable<String> callable(String result, long sleepSeconds) { return () -> { TimeUnit.SECONDS.sleep(sleepSeconds); return result; };}

    Utilizamos este método para crear un montón de callables con diferentes duraciones de uno a tres segundos. Al enviar esos callables a un ejecutor a través de invokeAny() se devuelve el resultado de la cadena del callable más rápido – en ese caso task2:

    ExecutorService executor = Executors.newWorkStealingPool();List<Callable<String>> callables = Arrays.asList( callable("task1", 2), callable("task2", 1), callable("task3", 3));String result = executor.invokeAny(callables);System.out.println(result);// => task2

    El ejemplo anterior utiliza otro tipo de ejecutor creado mediante newWorkStealingPool(). Este método de fábrica es parte de Java 8 y devuelve un ejecutor de tipo ForkJoinPool que funciona de forma ligeramente diferente a los ejecutores normales. En lugar de utilizar un thread-pool de tamaño fijo se crean ForkJoinPools para un tamaño de paralelismo determinado que por defecto es el número de núcleos disponibles de la CPU del host.

    Los ForkJoinPools existen desde Java 7 y serán tratados en detalle en un tutorial posterior de esta serie. Terminemos este tutorial profundizando en los ejecutores programados.

    Ejecutores programados

    Ya hemos aprendido a enviar y ejecutar tareas una vez en un ejecutor. Para ejecutar periódicamente tareas comunes varias veces, podemos utilizar pools de hilos programados.

    Un ScheduledExecutorService es capaz de programar tareas para que se ejecuten de forma periódica o una vez transcurrido un tiempo determinado.

    Este ejemplo de código programa una tarea para que se ejecute después de que haya pasado un retraso inicial de tres segundos:

    ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS);TimeUnit.MILLISECONDS.sleep(1337);long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS);System.out.printf("Remaining Delay: %sms", remainingDelay);

    La programación de una tarea produce un futuro especializado de tipo ScheduledFuture que – además de Future – proporciona el método getDelay() para recuperar el retraso restante. Una vez transcurrido este retardo, la tarea se ejecutará de forma concurrente.

    Para programar tareas que se ejecuten periódicamente, los ejecutores proporcionan los dos métodos scheduleAtFixedRate() y scheduleWithFixedDelay(). El primer método es capaz de ejecutar tareas con un ritmo de tiempo fijo, por ejemplo, una vez cada segundo como se demuestra en este ejemplo:

    ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());int initialDelay = 0;int period = 1;executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);

    Además, este método acepta un retardo inicial que describe el tiempo de espera principal antes de que la tarea se ejecute por primera vez.

    Tenga en cuenta que scheduleAtFixedRate() no tiene en cuenta la duración real de la tarea. Así que si se especifica un período de un segundo, pero la tarea necesita 2 segundos para ser ejecutada, entonces el grupo de hilos trabajará a la capacidad muy pronto.

    En ese caso usted debe considerar el uso de scheduleWithFixedDelay() en su lugar. Este método funciona igual que el homólogo descrito anteriormente. La diferencia es que el periodo de tiempo de espera se aplica entre el final de una tarea y el inicio de la siguiente. Por ejemplo:

    ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);Runnable task = () -> { try { TimeUnit.SECONDS.sleep(2); System.out.println("Scheduling: " + System.nanoTime()); } catch (InterruptedException e) { System.err.println("task interrupted"); }};executor.scheduleWithFixedDelay(task, 0, 1, TimeUnit.SECONDS);

    Este ejemplo programa una tarea con un retraso fijo de un segundo entre el final de una ejecución y el inicio de la siguiente. El retardo inicial es cero y la duración de la tarea es de dos segundos. Así que terminamos con un intervalo de ejecución de 0s, 3s, 6s, 9s y así sucesivamente. Como puedes ver scheduleWithFixedDelay() es útil si no puedes predecir la duración de las tareas programadas.

    Esta fue la primera parte de una serie de tutoriales de concurrencia. Recomiendo practicar los ejemplos de código mostrados por su cuenta. Encontrarás todos los ejemplos de código de este artículo en GitHub, así que siéntete libre de bifurcar el repositorio y darme una estrella.

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *