Posted on 2014-12-03 11:27
魔のkyo 阅读(1648)
评论(1) 编辑 收藏 引用
#include <semaphore.h>
#include <atomic>
#include <assert.h>
#include <pthread.h>
#include <thread>
#include <unistd.h>
#include <sys/time.h>
// 多生产者单消费者无锁共享队列
template<typename T>
class shared_queue
{
shared_queue& operator=(const shared_queue&) = delete;
shared_queue(const shared_queue& other) = delete;
public:
shared_queue(size_t max_size) {
_capacity = max_size + 1;
// 把capacity对齐到2的幂, 为了快速用&mask替代%capacity操作
if ((_capacity & (_capacity - 1)) != 0)
{
size_t n = _capacity;
while (n) {
_capacity = n;
n &= n - 1;
}
_capacity <<= 1;
}
// 断言_capacity是比传入max_size大的最小的2的幂
assert(_capacity != 0 && (_capacity & (_capacity - 1)) == 0);
assert((_capacity >> 1) <= max_size && max_size < _capacity);
_mask = _capacity - 1;
_data = new T[_capacity];
_used = new bool[_capacity];
std::fill(_used, _used+_capacity, false);
_begin = _end = 0;
sem_init(&_readable_count, 0, 0);
}
~shared_queue() {
sem_destroy(&_readable_count);
}
void push(T item) {
uint32_t cur = _end++;
cur &= _mask; // 等价于 cur %= _capacity;
_data[cur] = item;
_used[cur] = true;
sem_post(&_readable_count);
}
void wait_and_pop(T& popped_item) {
sem_wait(&_readable_count);
uint32_t cur = _begin++;
cur &= _mask; // 等价于 cur %= _capacity;
volatile const bool& used = _used[cur];
while (!used); // 忙等待,这是必须的,考虑两个线程同时调用push的情况,先进入的可能后结束
// 我仍然不能保证现在的处理是100%正确的,或许在push中_data[cur]和_used[cur]的赋值之间需要加入内存屏障
popped_item = std::move(_data[cur]);
_used[cur] = false;
}
private:
T* _data;
bool* _used;
size_t _capacity;
size_t _mask;
uint32_t _begin;
std::atomic<uint32_t> _end;
sem_t _readable_count;
};
shared_queue<int> queue(128);
void producer1()
{
for(int i=0;i<100;i++) {
queue.push(i);
usleep(1);
}
}
void producer2()
{
for(int i=100;i<200;i++) {
queue.push(i);
usleep(1);
}
}
void consumer()
{
for(int i=0;i<200;i++) {
int x;
queue.wait_and_pop(x);
printf("%d ", x);
fflush(stdout);
}
}
int main()
{
std::thread th1(producer1);
std::thread th2(producer2);
std::thread th3(consumer);
th1.join();
th2.join();
th3.join();
}