This wraps the ConcurrentBoundedQueue to implement an on-demand activation model for MPSC (Multi-Producer Single-Consumer) consumers:
This is mainly used to support scenarios with a large number of low-activity queues, saving inactive listener consumer threads. Its functionality is similar to bthread::ExecutionQueue, but:
#include "babylon/concurrent/execution_queue.h"
using ::babylon::ConcurrentExecutionQueue;
// Explicitly define a queue
using Queue = ConcurrentExecutionQueue<T>;
Queue queue;
// Set the queue capacity to N
// The consumer uses some_executor for execution
// Register a lambda function for consumption
queue.initialize(N, some_executor, [] (Queue::Iterator iter, Queue::Iterator end) {
// Consume the data in the range
while (iter != end) {
T& item = *iter;
do_sth_with(item);
++iter;
}
});
// Produce a piece of data and start background consumption as needed
queue.execute("10086");
...
// Wait for all currently published data to be consumed
// Note that this does not include stop semantics; you can repeatedly execute & join
queue.join();