Procesamiento simultáneo de InputStream único con consumidores independientes

Necesito generar N hilos para el consumidor, que procesan el mismo InputStream al mismo tiempo, por ejemplo, transformarlo de alguna manera, calcular la sum de comprobación o la firma digital, etc. Estos consumidores no dependen unos de otros y todos utilizan bibliotecas de terceros, que aceptan InputStream como fuente de datos.

Entonces, lo que puedo hacer es crear una implementación de InputStream, que

  • leer parte de los datos del flujo “padre”
  • desbloquear consumidores
  • Espera hasta que todos los consumidores lean todo el fragmento.
  • lee el siguiente fragmento

Aunque parece simple, puede plantear varios problemas, como el locking vital cuando ciertos consumidores fallecen, implementar todos los métodos de InputStream, controlar el enlace de los consumidores utilizando barreras / cierres, etc.

Un amigo me dijo que la implementación de la mitad de una hora era mi tarde.

Preferiría usar algo suficientemente maduro (Google no obtuvo resultados, por lo tanto, ¿mi google-fu no es lo suficientemente bueno?) O no molestar y copiar todo el flujo de “fuente” en un archivo temporal y usarlo como fuente de datos. La última solución parece ser más confiable, pero puede terminar en la creación de archivos de gigabytes (cuando se procesa transmisión de audio, por ejemplo).

A mi modo de ver, debería tener al menos algún tipo de almacenamiento intermedio para que diferentes consumidores puedan moverse a través de la secuencia a un ritmo diferente sin que el consumidor más lento se atasque constantemente. Básicamente, esto garantiza el peor desempeño y muy poco beneficio de la concurrencia.

Podría, digamos, etiquetar cada fragmento con los consumidores que lo han usado hasta ahora y luego eliminar aquellos que están completamente agotados. Tal vez esto pueda lograrse si cada consumidor tiene una referencia a cada fragmento que aún no ha utilizado, lo que permitiría que GC se encargue automáticamente de los fragmentos usados. El productor puede mantener una lista de WeakReference s a los trozos para que tenga un control sobre el número de trozos que aún no se han utilizado y basar su estrangulamiento en eso.

También estoy pensando en tener una instancia separada de InputStream por subproceso, que se comunique internamente con el productor InputStream . De esta manera, tiene una solución fácil para su peligro de locking de vida: try ... finally { is.close(); } try ... finally { is.close(); } – el consumidor moribundo cierra su propia cadena de entrada. Esto se comunica al productor.

Tengo algunas ideas con el uso de un ArrayBlockingQueue por consumidor. Habría alguna dificultad en asegurar que todos los consumidores reciban la alimentación adecuada, sin que el productor se bloquee o esté ocupado.

¿Has considerado usar streams de tubería? Su productor puede tener uno o más PipedOuputStream en el que lanza todo lo que lee del archivo. En el otro lado de las tuberías, tiene diferentes hilos de consumidores que leen en un PipedInputstream correspondiente (que es un InputStream que puede compartir con sus bibliotecas).

Su hilo productor puede decidir a través de qué datos de las tuberías deben enviarse, mediante esto, proporcionar datos para ser procesados ​​para una lectura del hilo del consumidor en el otro lado de la tubería.

Si necesita recuperar datos de sus hilos de consumo, puede crear otro conducto, en la dirección opuesta, para enviarle los datos.

Puede probar alguna implementación de Java Messaging Service (JMS) como Apache ActiveMQ .

En su caso, tendría que crear un llamado Tema (ver Temas frente a Colas ). El productor crea un tema y se publica para N consumidores, que puede ejecutarse simultáneamente, y cada consumidor recibe exactamente los mismos datos.

Dado que desea utilizar InputStream s, hay un capítulo sobre cómo enviar mensajes a través de secuencias .

Supongo que, normalmente, los productores y los consumidores serían procesos separados, probablemente ejecutándose en diferentes máquinas en la red. Sin embargo, creo que puedes configurarlo para que se ejecute completamente en una única JVM. Esto dependería de la implementación de JMS. Estos también son bastante famosos: HornetQ por JBoss , RabbitMQ y muchos otros.