I have a producer-consumer scenario where the producers produce much faster than the consumers can consume. Generally, the solution is to make the producers block since a producer/consumer scenario operates as fast as the slowest component. Throttling or blocking the producers is not a good solution because our application provides enough time for consumers to catch up later.
Here’s a diagram depicting a full “phase” in our application vs. a more common scenario:
Our Application Common Scenario 2N +--------+--------+ |PPPPPPPP|oooooooo| P = Producer |PPPPPPPP|oooooooo| C = Consumer N +--------+--------+ N +--------+--------+--------+ o = Other Work |CPCPCPCP|CCCCCCCC| |CPCPCPCP|CPCPCPCP|oooooooo| N = number of tasks |CPCPCPCP|CCCCCCCC| |CPCPCPCP|CPCPCPCP|oooooooo| ------------------- ---------------------------- 0 T/2 T 0 T/2 T 3T/2
The idea is to maximize throughput by not inhibiting the producers.
The data on which our tasks operate is easily serialized, so I plan to implement a filesystem solution for spilling all the tasks that can’t be immediately satisfied.
I’m using Java’s
ThreadPoolExecutor with a
BlockingQueue with a maximum capacity to ensure we don’t run out of memory. The problem is in implementing such a “tiered” queue, where tasks that can be queued in memory are done so immediately, otherwise the data is queued on disk.
I’ve come up with two possible solutions:
- Implement a
BlockingQueuefrom scratch, using the
ArrayBlockingQueueimplementation as a reference. This may be as simple as copying the implementation in the standard library and adding filesystem read/writes.
- Continue using a standard
BlockingQueueimplementation, implement a separate
FilesystemQueuefor storing my data, and using one or more threads to dequeue files, create
Runnables and enqueue them using the
Are either of these reasonable and is there potentially a better approach?
The first option is to increase the available heap space size, as suggested by Dimitar Dimitrov, using the memory flag
From Oracle’s Documentation: Note that the JVM uses more memory than just the heap. For example Java methods, thread stacks and native handles are allocated in memory separate from the heap, as well as JVM internal data structures.
Here is also a diagram of how java heap memory is categorized.
The second option is to use a library that implements the functionality requested. For that purpose you can use ashes-queue
From project’s overview: This is a simple FIFO implementation in Java which has persistent support. That is, if the queue is full, the overflowing messages will be persisted and when there are available slots, they will be put back into memory.
The third option is to create your own implementation. For that matter, you may preview this thread which guides you to that purpose.
Your suggestions are included in this last third option. Both are reasonable. From an implementation point of view, you should go with the first option as it will guarantee an easier implementation and clean design.