Blame view

3rdparty/spdlog-1.9.2/include/spdlog/details/mpmc_blocking_q.h 3.56 KB
3d2ab595   Hu Chunming   支持gb28181
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
  // Copyright(c) 2015-present, Gabi Melman & spdlog contributors.
  // Distributed under the MIT License (http://opensource.org/licenses/MIT)
  
  #pragma once
  
  // multi producer-multi consumer blocking queue.
  // enqueue(..) - will block until room found to put the new message.
  // enqueue_nowait(..) - will return immediately with false if no room left in
  // the queue.
  // dequeue_for(..) - will block until the queue is not empty or timeout have
  // passed.
  
  #include <spdlog/details/circular_q.h>
  
  #include <condition_variable>
  #include <mutex>
  
  namespace spdlog {
  namespace details {
  
  template<typename T>
  class mpmc_blocking_queue
  {
  public:
      using item_type = T;
      explicit mpmc_blocking_queue(size_t max_items)
          : q_(max_items)
      {}
  
  #ifndef __MINGW32__
      // try to enqueue and block if no room left
      void enqueue(T &&item)
      {
          {
              std::unique_lock<std::mutex> lock(queue_mutex_);
              pop_cv_.wait(lock, [this] { return !this->q_.full(); });
              q_.push_back(std::move(item));
          }
          push_cv_.notify_one();
      }
  
      // enqueue immediately. overrun oldest message in the queue if no room left.
      void enqueue_nowait(T &&item)
      {
          {
              std::unique_lock<std::mutex> lock(queue_mutex_);
              q_.push_back(std::move(item));
          }
          push_cv_.notify_one();
      }
  
      // try to dequeue item. if no item found. wait upto timeout and try again
      // Return true, if succeeded dequeue item, false otherwise
      bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration)
      {
          {
              std::unique_lock<std::mutex> lock(queue_mutex_);
              if (!push_cv_.wait_for(lock, wait_duration, [this] { return !this->q_.empty(); }))
              {
                  return false;
              }
              popped_item = std::move(q_.front());
              q_.pop_front();
          }
          pop_cv_.notify_one();
          return true;
      }
  
  #else
      // apparently mingw deadlocks if the mutex is released before cv.notify_one(),
      // so release the mutex at the very end each function.
  
      // try to enqueue and block if no room left
      void enqueue(T &&item)
      {
          std::unique_lock<std::mutex> lock(queue_mutex_);
          pop_cv_.wait(lock, [this] { return !this->q_.full(); });
          q_.push_back(std::move(item));
          push_cv_.notify_one();
      }
  
      // enqueue immediately. overrun oldest message in the queue if no room left.
      void enqueue_nowait(T &&item)
      {
          std::unique_lock<std::mutex> lock(queue_mutex_);
          q_.push_back(std::move(item));
          push_cv_.notify_one();
      }
  
      // try to dequeue item. if no item found. wait upto timeout and try again
      // Return true, if succeeded dequeue item, false otherwise
      bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration)
      {
          std::unique_lock<std::mutex> lock(queue_mutex_);
          if (!push_cv_.wait_for(lock, wait_duration, [this] { return !this->q_.empty(); }))
          {
              return false;
          }
          popped_item = std::move(q_.front());
          q_.pop_front();
          pop_cv_.notify_one();
          return true;
      }
  
  #endif
  
      size_t overrun_counter()
      {
          std::unique_lock<std::mutex> lock(queue_mutex_);
          return q_.overrun_counter();
      }
  
      size_t size()
      {
          std::unique_lock<std::mutex> lock(queue_mutex_);
          return q_.size();
      }
  
  private:
      std::mutex queue_mutex_;
      std::condition_variable push_cv_;
      std::condition_variable pop_cv_;
      spdlog::details::circular_q<T> q_;
  };
  } // namespace details
  } // namespace spdlog