Ejemplo de Time Series Data y CompletableFuture en Java

El código completo de todos los ejemplos se encuentra aquí.

Cómo ejecutar el programa

mvn clean compile verify exec:java

Prefacio

Supongamos que queremos calcular un índice de calidad del aire basado en dos valores:

  • temperatura del aire
  • porcentaje de monóxido de carbono en el aire

Dados los siguientes símbolos:

símbolosignificado
AQiíndice de calidad del aire
Ttemperatura del aire en grados Celsius
Tmtemperatura máxima del aire en C°
Cporcentaje de monóxido de carbono en el aire

Podemos calcular el AQi con esta especie de fórmula:

fórmula de calidad del aire

DISCLAIMER: esta fórmula no es de ninguna manera científica y está pensada exclusivamente con fines educativos. No quiero que ambientalistas y científicos de verdad me persigan con fórmulas matemáticas y acusaciones de charlatanería. Además, vi la oportunidad de crear una bonita ecuación en LaTeX y la aproveché, por motivos estéticos y porque me hace parecer inteligente, cosa que ciertamente no soy1.

Lo que la fórmula intenta expresar es que al subir la temperatura y el porcentaje de monóxido de carbono, la calidad del aire decrece. Sí, esto es totalmente anticientífico pero veréis que tiene sentido a efectos de mi argumentación.

Asumo una temperatura máxima de 40C°. Así que, por ejemplo:

$ bc -l
bc 1.06
Copyright 1991-1994, 1997, 1998, 2000 Free Software Foundation, Inc.
This is free software with ABSOLUTELY NO WARRANTY.
For details type 'warranty'.
t=60; c=100; tm=40; (((t * 100) / tm) + c) / 2
125.00000000000000000000
t=60; c=50; tm=40; (((t * 100) / tm) + c) / 2
100.00000000000000000000
t=40; c=50; tm=40; (((t * 100) / tm) + c) / 2
75.00000000000000000000
t=40; c=10; tm=40; (((t * 100) / tm) + c) / 2
55.00000000000000000000
t=20; c=10; tm=40; (((t * 100) / tm) + c) / 2
30.00000000000000000000
t=10; c=5; tm=40; (((t * 100) / tm) + c) / 2
15.00000000000000000000
t=10; c=0.5; tm=40; (((t * 100) / tm) + c) / 2
12.75000000000000000000

De esto podemos derivar la siguiente tabla totalmente anticientífica:

AQisignificado
de 125 a ∞muerte horrible
de 100 a 125muerte dolorosa
de 75 a 100muerte
de 55 a 75es aceptable2
de 30 a 55todo bien3
de 15 a 30se está bien
de 12.75 a 15fresquito
de -∞ a 12.75bienvenido a Yakutsk, probablemente

Proveedores de servicios

Supongamos que tenemos servicios de internet que exponen datos de monitorización de temperatura y niveles de monóxido de carbono. Estos servicios podrían exponer una API que nos proporciona datos de tipo time series4.

Así, por ejemplo, podríamos llamar a un servicio de monitorización de temperatura, y el servicio nos respondería con datos time series como estos:

timestampvalor
2021-01-20T08:00:00Z10.1
2021-01-20T08:02:00Z10.3
2021-01-20T08:05:00Z10.7
2021-01-20T08:06:00Z10.9
2021-01-20T08:06:19Z11.0
2021-01-20T08:06:42Z11.1
2021-01-20T08:09:00Z11.3

Un servicio de monitorización de porcentaje de monóxido de carbono podría en cambio responder con datos como estos:

timestampvalor
2021-01-20T08:01:00Z2.0
2021-01-20T08:02:00Z2.3
2021-01-20T08:06:00Z2.8
2021-01-20T08:07:00Z2.9
2021-01-20T08:08:00Z3.3

Ten en cuenta que he ordenado los datos por timestamp para hacerlos un poco más legibles, pero no deberías asumir nada sobre el orden de los datos devueltos por un proveedor externo. No es que esto tenga importancia aquí porque...

El algoritmo

...nuestro algoritmo ahora requiere:

  1. concatenar los datos de temperatura y porcentaje de monóxido de carbono
  2. ordenar por timestamp
idtimestampvalortipo
12021-01-20T08:00:00Z10.1T
22021-01-20T08:01:00Z 2.0C
32021-01-20T08:02:00Z10.3T
42021-01-20T08:02:00Z 2.3C
52021-01-20T08:05:00Z10.7T
62021-01-20T08:06:00Z10.9T
72021-01-20T08:06:00Z 2.8C
82021-01-20T08:06:19Z11.0T
92021-01-20T08:06:42Z11.1T
102021-01-20T08:07:00Z 2.9C
112021-01-20T08:08:00Z 3.3C
122021-01-20T08:09:00Z11.3T

tipo: T es temperatura y C es porcentaje de monóxido de carbono

Nuestra tarea ahora es recorrer los datos, empezando desde el principio, una fila a la vez, calculando el índice de calidad del aire a medida que avanzamos, paso a paso.

Lo primero que hay que notar aquí es que para calcular nuestra fórmula del AQi necesitamos tener ambos valores para T y C. En otras palabras, el primer punto en el que podemos aplicar nuestra fórmula es el del id 2, ya que tenemos un valor para T en el id 1 y un valor para C en el id 2. Así que tomamos nuestros valores (10.1 para T y 2.0 para C), aplicamos la fórmula, y obtenemos un primer valor de AQi de 13.625 que asociamos con el timestamp del id 2, ya que ese es el momento al que se refiere nuestro cálculo. Nuestra primera entrada de AQi en la serie resultante debería verse así:

timestampvalor
2021-01-20T08:01:00Z13.625

De ahora en adelante, nuestra fórmula puede aplicarse a cada elemento restante en la serie, teniendo en cuenta que debemos correlacionar cada valor con el valor más reciente del otro tipo. En otras palabras:

para el idtomar valores de los id
21, 2
32, 3
43, 4
54, 5
64, 6
76, 7
87, 8
97, 9
109, 10
119, 11
1211, 12

Puedes pensar en este tipo de movimiento como una rolling time window ya que tienes una ventana que se mueve en el tiempo enfocándose en los datos más recientes para nuestras medidas específicas de T y C en cada paso5.

Rolling Time Window

Adelante, desplázate hacia abajo. Deberías verla.

Step 01: T = 10.1, C = 2.0, AQi = 13.625

id123456789101112
T10.110.310.710.911.011.111.3
C2.02.32.82.93.3

Ventana: id 1-2

Step 02: T = 10.3, C = 2.0, AQi = 13.875

id123456789101112
T10.110.310.710.911.011.111.3
C2.02.32.82.93.3

Ventana: id 2-3

Step 03: T = 10.3, C = 2.3, AQi = 14.025

id123456789101112
T10.110.310.710.911.011.111.3
C2.02.32.82.93.3

Ventana: id 3-4

Step 04: T = 10.7, C = 2.3, AQi = 14.525

id123456789101112
T10.110.310.710.911.011.111.3
C2.02.32.82.93.3

Ventana: id 4-5

Step 05: T = 10.9, C = 2.3, AQi = 14.775

id123456789101112
T10.110.310.710.911.011.111.3
C2.02.32.82.93.3

Ventana: id 4-6

Step 06: T = 10.9, C = 2.8, AQi = 15.025

id123456789101112
T10.110.310.710.911.011.111.3
C2.02.32.82.93.3

Ventana: id 6-7

Step 07: T = 11.0, C = 2.8, AQi = 15.150

id123456789101112
T10.110.310.710.911.011.111.3
C2.02.32.82.93.3

Ventana: id 7-8

Step 08: T = 11.1, C = 2.8, AQi = 15.275

id123456789101112
T10.110.310.710.911.011.111.3
C2.02.32.82.93.3

Ventana: id 7-9

Step 09: T = 11.1, C = 2.9, AQi = 15.325

id123456789101112
T10.110.310.710.911.011.111.3
C2.02.32.82.93.3

Ventana: id 9-10

Step 10: T = 11.1, C = 3.3, AQi = 15.525

id123456789101112
T10.110.310.710.911.011.111.3
C2.02.32.82.93.3

Ventana: id 9-11

Step 11: T = 11.3, C = 3.3, AQi = 15.775

id123456789101112
T10.110.310.710.911.011.111.3
C2.02.32.82.93.3

Ventana: id 11-12

Dado lo anterior, nuestra serie temporal completa para el AQi es:

timestampvalor
2021-01-20T08:01:00Z13.625
2021-01-20T08:02:00Z13.875
2021-01-20T08:02:00Z14.025
2021-01-20T08:05:00Z14.525
2021-01-20T08:06:00Z14.775
2021-01-20T08:06:00Z15.025
2021-01-20T08:06:19Z15.150
2021-01-20T08:06:42Z15.275
2021-01-20T08:07:00Z15.325
2021-01-20T08:08:00Z15.525
2021-01-20T08:09:00Z15.775

Si has mirado con atención, habrás notado que tenemos un par de timestamps duplicados en los resultados, concretamente 2021-01-20T08:02:00Z y 2021-01-20T08:06:00Z. Estos representan una paradoja temporal ya que parece que nuestro AQi tiene dos valores diferentes en el mismo instante.

I find your lack of logic disturbing

Ambos sabemos que estos datos van a acabar en una página web. No querrás que uno de esos desarrolladores hipster del frontend nos señale una falta de lógica o, peor aún, una inconsistencia en nuestros datos, ¿verdad?

Eso pensaba. Pues bien, mi idea es que podemos descartar tranquilamente la primera entrada de un timestamp duplicado ya que se refiere a un cálculo con datos obsoletos. ¿Por qué? Bueno, considera los valores del primer timestamp duplicado: 2021-01-20T08:02:00Z. La primera vez que calculamos el AQi, tomamos los datos de los id 2 y 3 y el id 2 se refiere a un timestamp anterior, concretamente 2021-01-20T08:01:00Z. La segunda vez que calculamos el AQi, usamos los datos de los id 3 y 4, que se refieren ambos al timestamp 2021-01-20T08:02:00Z, por lo que el resultado de este cálculo es más relevante que el anterior para el que produjimos el mismo timestamp de 2021-01-20T08:02:00Z.

Lo mismo se aplica a la entrada del AQi con timestamp 2021-01-20T08:06:00Z ya que el primer cálculo usaba los id 4 y 6 mientras que el segundo consideraba los id 6 y 7 que son más recientes que el timestamp del id 4.

Así que eliminamos un par de entradas y nuestro resultado limpio queda así:

timestampvalor
2021-01-20T08:01:00Z13.625
2021-01-20T08:02:00Z14.025
2021-01-20T08:05:00Z14.525
2021-01-20T08:06:00Z15.025
2021-01-20T08:06:19Z15.150
2021-01-20T08:06:42Z15.275
2021-01-20T08:07:00Z15.325
2021-01-20T08:08:00Z15.525
2021-01-20T08:09:00Z15.775

Igual que una ecuación es una buena excusa para repasar un poco de LaTeX, una buena serie de datos temporales es un excelente candidato para gnuplot.

gráfico de los datos hasta ahora

Los datos en el mundo real son, por supuesto, mucho más caóticos que esto, y podrías querer normalizar el resultado usando un intervalo temporal arbitrario, por ejemplo un minuto:

timestampvalor
2021-01-20T08:01:00Z13.625
2021-01-20T08:02:00Z14.025
2021-01-20T08:03:00Z14.025
2021-01-20T08:04:00Z14.025
2021-01-20T08:05:00Z14.525
2021-01-20T08:06:00Z15.025
2021-01-20T08:07:00Z15.325
2021-01-20T08:08:00Z15.525
2021-01-20T08:09:00Z15.775

gráfico de los datos normalizados

¿Tiene sentido? Espero que sí.

sí

Escribamos el código

Escribamos algo de código. Primero, definamos una interfaz para nuestro calculador de AQi, para poder proporcionar diferentes implementaciones más adelante.

El código de esta interfaz se puede ver aquí.

La interfaz es un lugar conveniente donde implementar la fórmula del AQi:

static double airQualityIndex(double temperature, double carbonMonoxidePercentage, double maxTemperature) {
    return (((temperature * 100) / maxTemperature) + carbonMonoxidePercentage) / 2;
}

Este método toma una temperatura, un porcentaje de monóxido de carbono, una temperatura máxima y devuelve el AQi. Bien.

La parte interesante sin embargo es este método:

List<TimeValue> calculate(List<TimeValue> temperatures, List<TimeValue> carbonMonoxidePercentages);

Esto nos dice que el método calculate toma dos listas de TimeValue: la primera es una lista de temperaturas y la otra es una lista de porcentajes de monóxido de carbono. Luego devuelve una lista de TimeValue, solo que esta vez la lista representa los índices de calidad del aire.

¿Qué es un TimeValue? Podemos ver su definición aquí. Aunque todo esto parece horriblemente complicado debido a la verbosidad del lenguaje Java y algunos detalles de implementación, puedes pensar en un TimeValue como una forma cómoda de representar un Instant en el tiempo y su valor asociado. Nada del otro mundo, realmente.

Programar como si fuera 1984

Ahora que tenemos un framework básico para nuestros cálculos, escribamos una primera implementación usando el estilo vieja escuela. El código completo está aquí. Echémosle un vistazo.

Nuestro calculador toma la temperatura máxima en su constructor y almacena su valor en la constante de instancia maxTemperature ya que necesitaremos su valor más adelante cuando invoquemos la función del AQi.

Nuestro método calculate debe comenzar con estos dos pasos:

  1. concatenar los datos de temperatura y porcentaje de monóxido de carbono en una sola estructura de datos
  2. ordenar el resultado por timestamp

El primer paso está implementado en este bloque de código:

// key = time value type (C = carbonMonoxidePercentage, T = temperature)
// concatenated with the timestamp as a string
Map<String, TimeValue> timeValuesByType = new HashMap<>();

for (TimeValue temperature : temperatures) {
    timeValuesByType.put("T".concat(temperature.ts()), temperature);
}

for (TimeValue carbonMonoxidePercentage : carbonMonoxidePercentages) {
    timeValuesByType.put("C".concat(carbonMonoxidePercentage.ts()), carbonMonoxidePercentage);
}

La clave en nuestra variable timeValuesByType es una concatenación en cadena de la letra T para temperatura o C para porcentaje de monóxido de carbono, seguida del timestamp. Necesitamos hacer esto para poder luego distinguir entre los dos tipos de datos. Las cadenas de la clave tendrán este aspecto: T2021-02-03T08:00:00.000Z.

El ordenamiento se realiza en este bloque:

Map<String, TimeValue> timeValuesByTypeSortedByTimestamp = new LinkedHashMap<>();
List<String> keysSortedByTimestamp = new ArrayList<>(timeValuesByType.keySet());
keysSortedByTimestamp.sort(comparing(s -> timeValuesByType.get(s).timestamp()));

for (String key : keysSortedByTimestamp) {
    timeValuesByTypeSortedByTimestamp.put(key, timeValuesByType.get(key));
}

Esto es solo la forma supercomplicada de Java para ordenar nuestro mapa según el timestamp que tenemos en los valores del mapa. Declaramos un mapa timeValuesByTypeSortedByTimestamp, implementado por un LinkedHashMap porque queremos preservar el orden de iteración de las entradas del mapa. Luego envolvemos todas las claves de nuestro mapa original timeValuesByType en un ArrayList ya que necesitamos una List para poder invocar sort. La función de comparación que pasamos a sort toma el timestamp de la entrada relativa en el mapa original timeValuesByType. Luego iteramos keysSortedByTimestamp, añadiendo entradas a nuestro mapa timeValuesByTypeSortedByTimestamp.

Ahora declaramos un mapa para los resultados de nuestros cálculos del AQi y un par de variables que necesitaremos después:

Map<Instant, Double> airQualityIndexMap = new HashMap<>();
TimeValue lastTemperature = null;
TimeValue lastCarbonMonoxidePercentage = null;

Aquí empieza la parte divertida. Recorremos las entradas del mapa en nuestra variable timeValuesByTypeSortedByTimestamp previamente definida.

for (Map.Entry<String, TimeValue> entry : timeValuesByTypeSortedByTimestamp.entrySet()) {
    ...

Sabemos que si la clave empieza con una T, tenemos un valor de temperatura y, en tal caso, lo almacenamos en la variable lastTemperature. De lo contrario, el valor debe ser de tipo C para carbono, así que hacemos lo mismo con la variable lastCarbonMonoxidePercentage.

if (entry.getKey().startsWith("T")) {
    lastTemperature = entry.getValue();
} else if (entry.getKey().startsWith("C")) {
    lastCarbonMonoxidePercentage = entry.getValue();
}

En este punto, si tenemos un valor tanto para T como para C, podemos proceder a calcular nuestro AQi y almacenar su valor en la variable airQualityIndexMap.

if (lastTemperature != null && lastCarbonMonoxidePercentage != null) {
    airQualityIndexMap.put(
        mostRecent(lastTemperature.timestamp(), lastCarbonMonoxidePercentage.timestamp()),
        airQualityIndex(lastTemperature.value(), lastCarbonMonoxidePercentage.value(), maxTemperature)
    );
}

Estamos tomando el timestamp más reciente entre los dos TimeValue usando una pequeña función auxiliar que definimos anteriormente en la interfaz del calculador.

Un efecto colateral deseado de usar un mapa para esta estructura de datos es que, cuando insertamos un nuevo valor para un timestamp existente, la entrada se sobrescribe con el más reciente. Esto resuelve nuestro problema con los timestamps duplicados.

Al final del ciclo, nuestros resultados están casi listos. Solo necesitamos ordenarlos de nuevo por timestamp y devolver los valores como una List de TimeValue.

List<Instant> keys = new ArrayList<>(airQualityIndexMap.keySet());
keys.sort(Instant::compareTo);
List<TimeValue> results = new ArrayList<>();

for (Instant key : keys) {
    results.add(TimeValue.of(key, airQualityIndexMap.get(key)));
}

Elegancia funcional

¿Podemos hacerlo mejor? Por supuesto. Usemos un arma elegante para tiempos más civilizados: la programación funcional. Nuestro FunctionalAirQualityIndexCalculator está reducido casi al mínimo, pero eso es solo porque la lógica principal detrás de los cálculos ahora se encuentra en el AirQualityIndexCollector.

Nuestro nuevo calculador es mucho más simple ahora. La primera parte es un poco compleja así que veámosla primero:

List<TypedTimeValue> timeSeries = Stream.concat(
   temperatures.stream().map(e -> new TypedTimeValue(TypedTimeValue.Type.T, e)),
   carbonMonoxidePercentages.stream().map(e -> new TypedTimeValue(TypedTimeValue.Type.C, e))
).collect(Collectors.toUnmodifiableList());

Hay varios patrones funcionales trabajando aquí:

  • los datos de temperatura y porcentaje de monóxido de carbono se transmiten y mapean en un contenedor de tipo para poder luego entender si el dato que estamos mirando es de tipo T o C

  • los dos streams resultantes se concatenan usando Stream.concat

  • al final recolectamos el stream concatenado en una List<TypedTimeValue> no modificable

return timeSeries.stream().parallel()
    .collect(AirQualityIndexCollector.toUnmodifiableList(maxTemperature));

La variable timeSeries se transmite entonces en paralelo a un colector que hace el trabajo pesado y devuelve una List<TimeValue> no modificable con los índices de calidad del aire.

Echemos un vistazo al colector.

public class AirQualityIndexCollector
        implements Collector<TypedTimeValue, Queue<TypedTimeValue>, List<TimeValue>> {
    ...

Estamos implementando la interfaz Collector. Los parámetros de tipo que proporcionamos expresan tres cosas:

  • estamos recolectando valores de tipo TypedTimeValue
  • nuestro acumulador interno usa una Queue<TypedTimeValue>
  • al final del trabajo, devolvemos una List<TimeValue>

Una Queue es simplemente una List thread safe. Proporcionamos la implementación usando el método supplier:

@Override
public Supplier<Queue<TypedTimeValue>> supplier() {
    return ConcurrentLinkedQueue::new;
}

En este caso, la implementación es una ConcurrentLinkedQueue que, de nuevo, es solo una especie de ArrayList thread safe.

@Override
public BiConsumer<Queue<TypedTimeValue>, TypedTimeValue> accumulator() {
    return Queue::add;
}

El acumulador debe devolver una función que el colector usa para acumular los datos de entrada. Como puedes ver, simplemente devolvemos una referencia al método add de Queue.

@Override
public BinaryOperator<Queue<TypedTimeValue>> combiner() {
    return (typedTimeValues, typedTimeValues2) -> {
        typedTimeValues.addAll(typedTimeValues2);
        return typedTimeValues;
    };
}

El método combiner debe devolver una función que combina dos acumuladores. La implementación debe tomar todos los elementos del segundo acumulador y añadirlos al primero, lo cual no suena muy funcional en términos de inmutabilidad pero en este caso es un comportamiento esperado, y está perfectamente bien.

@Override
public Function<Queue<TypedTimeValue>, List<TimeValue>> finisher() {
    ...

Finalmente, el finisher debe devolver una función que toma todos los valores acumulados en nuestra Queue<TypedTimeValue> y devuelve una List<TimeValue> con nuestros índices de calidad del aire.

final Map<Instant, TimeValue> aqiAccumulator = new HashMap<>();

Este es un mapa que sirve para recolectar los índices de calidad del aire. Como puedes ver, está indexado por timestamp, así que no tendremos entradas duplicadas cuando cálculos más recientes para un mismo timestamp se inserten en el mapa reemplazando los anteriores.

return accumulator -> {
   accumulator.stream()
           .map(TypedTimeValue::timestamp)
           .sorted()
           .forEach(entryTS -> {
               final TimeValue lastTemperature = getClosest(accumulator, TypedTimeValue.Type.T, entryTS);
               final TimeValue lastCarbonMonoxidePercentage = getClosest(accumulator, TypedTimeValue.Type.C, entryTS);

               if (lastTemperature != null && lastCarbonMonoxidePercentage != null) {
                   Instant timestamp = mostRecent(lastTemperature.timestamp(), lastCarbonMonoxidePercentage.timestamp());
                   aqiAccumulator.put(timestamp, TimeValue.of(timestamp, airQualityIndex(lastTemperature.value(), lastCarbonMonoxidePercentage.value(), maxTemperature)));
               }
           });

   return aqiAccumulator.values().stream()
           .sorted()
           .collect(Collectors.toUnmodifiableList());
};

Esto es un buen bloque de código pero veámoslo poco a poco. Estamos transmitiendo los datos acumulados, extrayendo el timestamp, ordenando por este y, para cada timestamp, buscamos los datos de temperatura y porcentaje de monóxido de carbono con el timestamp más cercano. Más cercano significa que el timestamp que estamos evaluando debe ser anterior o igual al timestamp en cuestión.

Si tenemos ambos datos (T y C), podemos proceder al cálculo del AQi y poner su valor en el mapa aqiAccumulator.

Finalmente, todo lo que tenemos que hacer es tomar los valores del mapa aqiAccumulator, ordenarlos por timestamp y recolectarlos en una List<TimeValue> no modificable.

El ordenamiento es posible gracias a que nuestra clase TimeValue implementa Comparable<TimeValue>.

Hay varios puntos en el método finisher donde miro dentro de las estructuras de datos sobre las que estoy iterando, lo cual, de nuevo, no parece muy kosher en términos de programación funcional, pero está bien porque sé que los datos que estoy examinando no son susceptibles de ser modificados entre bastidores por threads concurrentes.

¿Es este calculador mejor que el de la vieja escuela? No estoy seguro. Esto sigue siendo bastante verboso, pero me parece más fácil de leer ya que gran parte del código está escrito en un estilo declarativo en lugar de imperativo.

Consideraciones sobre la concurrencia

Como necesitamos recuperar dos conjuntos de datos diferentes de dos proveedores distintos (uno para los datos de temperatura y otro para los datos de porcentaje de monóxido de carbono), podríamos querer ejecutar los clientes en paralelo. Esto tiene una ventaja sobre la ejecución en un solo hilo donde tendrías que serializar las llamadas a los proveedores.

En un entorno de un solo hilo, podrías escribir:

TimeValueProvider providerT = new TemperatureTimeValueProvider();
TimeValueProvider providerC = new CarbonMonoxidePercentageProvider();
List<TimeValue> timeValuesT = providerT.get();
List<TimeValue> timeValuesC = providerC.get();

Esto se traduce en el siguiente modelo de ejecución serial:

diagrama de secuencia para la ejecución serial

Como dijimos, podemos hacerlo mejor. En un entorno multihilo, podemos lanzar dos clientes concurrentes y comenzar el procesamiento de los datos en cuanto recibamos respuesta de ambos. Esto nos ahorra algo de tiempo y potencialmente acelera nuestros tiempos de respuesta.

diagrama de secuencia para la ejecución paralela

¿Cómo implementamos este modelo de ejecución en nuestro código? Hay varias opciones, pero la más popular, y la que personalmente prefiero, es usar CompletableFuture, que fueron introducidos en Java 8, si no recuerdo mal.

Un CompletableFuture es un contenedor para una computación. Le proporcionas el código que quieres ejecutar y el runtime de Java se encarga de ejecutarlo en concurrencia en un scheduler multihilo. El scheduler es, por supuesto, configurable pero los valores por defecto están bien para nuestro caso. Puedes ver el ejemplo completo aquí.

En mi ejemplo he declarado mis CompletableFuture así:

CompletableFuture<List<TimeValue>> timedValuesFuture1 = CompletableFuture.supplyAsync(() -> {
   log("Calling provider1...");
   List<TimeValue> timeValues = provider1.get();
   log(String.format("provider 1 returned: %s\n", timeValues));
   return timeValues;
});

Esto es un poco verboso ya que quería incluir algunos logs para mostrar cómo el código se ejecuta en paralelo. Podría haber escrito simplemente:

CompletableFuture<List<TimeValue>> timedValuesFuture1 = CompletableFuture.supplyAsync(provider1::get);

Esto sigue siendo verboso pero definitivamente mejor que antes. Como la computación en nuestro CompletableFuture devuelve una List<TimeValue>, el método supplyAsync devuelve un CompletableFuture<List<TimeValue>>, que es la forma de Java de decir que la variable timedValuesFuture1 es un CompletableFuture que contiene una List<TimeValue>. Ten en cuenta que el código que estamos pasando al método supplyAsync está dentro de una lambda. Esto significa que nuestro código no se ejecuta en el método supplyAsync sino que el runtime de Java es libre de elegir el mejor momento para ejecutarlo. El scheduler por defecto generalmente iniciará los CompletableFuture tan pronto como se definan, pero debes saber que no es necesariamente así y que definir una lambda no significa que se ejecute en el punto de declaración.

Ahora necesitamos una forma de asegurarnos de que nuestros CompletableFuture hayan terminado su ejecución antes de poder continuar. Esto se hace componiendo los futures y llamando al método join sobre el future resultante:

CompletableFuture.allOf(timedValuesFuture1, timedValuesFuture2).join();

El método allOf devuelve un nuevo CompletableFuture que envuelve los futures que le estamos pasando. Sobre este nuevo future llamamos join que bloquea la ejecución hasta que todos los futures internos hayan terminado su trabajo.

Después de esta línea estamos seguros de que nuestros threads se han ejecutado, así que podemos obtener los datos que necesitamos de nuestros futures originales usando el método join:

List<TimeValue> timeValues1 = timedValuesFuture1.join();
List<TimeValue> timeValues2 = timedValuesFuture2.join();

Ejemplo de output

Cuando ejecutes la aplicación, deberías ver un output similar a este:

2021-02-03T17:50:26.772545406 --- [main] Hello concurrent world!
2021-02-03T17:50:26.801737530 --- [ForkJoinPool.commonPool-worker-3] Calling provider1...
2021-02-03T17:50:26.802105151 --- [main] Calling allOf(...).join()
2021-02-03T17:50:26.802202415 --- [ForkJoinPool.commonPool-worker-5] Calling provider2...
2021-02-03T17:50:27.834127796 --- [ForkJoinPool.commonPool-worker-5] provider 2 returned: [TimeValue{timestamp=2021-01-18T08:00:22Z, value=76.629}, TimeValue{timestamp=2021-01-18T08:00:45Z, value=90.241}]
2021-02-03T17:50:27.834702562 --- [ForkJoinPool.commonPool-worker-3] provider 1 returned: [TimeValue{timestamp=2021-01-18T08:00:24Z, value=30.318}, TimeValue{timestamp=2021-01-18T08:00:35Z, value=13.521}, TimeValue{timestamp=2021-01-18T08:00:35Z, value=29.518}, TimeValue{timestamp=2021-01-18T08:00:36Z, value=0.818}, TimeValue{timestamp=2021-01-18T08:00:46Z, value=8.695}, TimeValue{timestamp=2021-01-18T08:00:50Z, value=31.233}, TimeValue{timestamp=2021-01-18T08:00:51Z, value=24.675}, TimeValue{timestamp=2021-01-18T08:00:53Z, value=38.477}]
2021-02-03T17:50:27.835040844 --- [main] After allOf(...).join()
2021-02-03T17:50:27.852793190 --- [main] timeValues = [TimeValue{timestamp=2021-01-18T08:00:24Z, value=76.212}, TimeValue{timestamp=2021-01-18T08:00:35Z, value=75.212}, TimeValue{timestamp=2021-01-18T08:00:36Z, value=39.337}, TimeValue{timestamp=2021-01-18T08:00:45Z, value=46.143}, TimeValue{timestamp=2021-01-18T08:00:46Z, value=55.989}, TimeValue{timestamp=2021-01-18T08:00:50Z, value=84.161}, TimeValue{timestamp=2021-01-18T08:00:51Z, value=75.964}, TimeValue{timestamp=2021-01-18T08:00:53Z, value=93.217}]

Puedes ver que hay tres hilos diferentes trabajando aquí:

  1. main
  2. ForkJoinPool.commonPool-worker-3
  3. ForkJoinPool.commonPool-worker-5

Es interesante notar que, en esta ejecución específica, allOf(...).join() fue llamado mucho antes de que se llamara al provider 2 y de que ambos resultados fueran devueltos por los proveedores.

Tu output será ciertamente diferente ya que:

  1. el orden de ejecución de los hilos es no determinista
  2. los valores de los proveedores se generan aleatoriamente

Conclusión

¡Lo lograste! Ha sido un buen recorrido. Espero que haya sido entretenido. Dediqué bastante tiempo a esto cuando estaba intentando profundizar en algunos aspectos que me encontré en el trabajo. Te sugiero que hagas lo mismo cuando te encuentres con problemas que necesiten ser investigados más a fondo. Espero que te haya resultado útil.

Bonus

leyendo memes en github

Credits

1

esta es mi venganza por todas las malas notas en matemáticas en la escuela.

3

this is fine meme

2

it is acceptable meme

4

Los time series data, también conocidos como time-stamped data, son una secuencia de datos indexados en orden temporal. Time-stamped son datos recopilados en diferentes momentos. Estos datos consisten típicamente en mediciones sucesivas realizadas desde la misma fuente a lo largo de un intervalo de tiempo y se utilizan para rastrear cambios a lo largo del tiempo.

5

Me gusta pensar en este movimiento como una especie de danza, y la encuentro sexy. Creo que I'm a creep, I'm a weirdo.