babylon

[简体中文]

Execution Queue

Principle

This wraps the ConcurrentBoundedQueue to implement an on-demand activation model for MPSC (Multi-Producer Single-Consumer) consumers:

This is mainly used to support scenarios with a large number of low-activity queues, saving inactive listener consumer threads. Its functionality is similar to bthread::ExecutionQueue, but:

Usage Example

#include "babylon/concurrent/execution_queue.h"

using ::babylon::ConcurrentExecutionQueue;

// Explicitly define a queue
using Queue = ConcurrentExecutionQueue<T>;
Queue queue;

// Set the queue capacity to N
// The consumer uses some_executor for execution
// Register a lambda function for consumption
queue.initialize(N, some_executor, [] (Queue::Iterator iter, Queue::Iterator end) {
  // Consume the data in the range
  while (iter != end) {
    T& item = *iter;
    do_sth_with(item);
    ++iter;
  }
});

// Produce a piece of data and start background consumption as needed
queue.execute("10086");
...

// Wait for all currently published data to be consumed
// Note that this does not include stop semantics; you can repeatedly execute & join
queue.join();