Blame view

tsl_aiplatform/helpers/buffer.hpp 2.54 KB
85cc8cb9   Hu Chunming   ๅŽŸ็‰ˆไปฃ็ 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
  /*
   * @Author: yangzilong
   * @Date: 2021-11-30 11:09:25
   * @Last Modified by: yangzilong
   * @Last Modified time: Do not edit
   * @Email: yangzilong@objecteye.com
   * @Description:
   */
  #pragma once
  
  #include <mutex>
  #include <queue>
  #include <memory>
  #include <condition_variable>
  
  
  namespace helpers
  {
      namespace buffer
      {
  
          template<typename T>
          class basic_thread_safe_queue
          {
          /**
           * @brief thread safe data buffer.
           * */
          public:
              /* remove copy construct and copy assignment. */
              basic_thread_safe_queue(const basic_thread_safe_queue&) = delete;
              basic_thread_safe_queue& operator=(const basic_thread_safe_queue&) = delete;
  
  
              basic_thread_safe_queue()
              {}
  
              ~basic_thread_safe_queue()
              {}
  
              bool push(const T &data)
              {
                  std::lock_guard<std::mutex> lk(mutex_);
                  data_queue_.push(data);
              }
  
              bool push(T &&data)
              {
                  std::lock_guard<std::mutex> lk(mutex_);
                  data_queue_.push(std::move(data));
                  cv_.notify_one();
              }
  
              bool try_pop(T &data)
              {
                  std::lock_guard<std::mutex> lk(mutex_);
                  if (data_queue_.empty())
                      return false;
                  data = std::move(data_queue_.front());
                  data_queue_.pop();
                  return true;
              }
  
              bool wait_and_pop(T &data)
              {
                  std::unique_lock<std::mutex> lk(mutex_);
                  cv_.wait(lk, [this] {return !data_queue_.empty()});
                  data = data_queue_.front();
                  data_queue_.pop();
              }
  
              std::shared_ptr<T> get()
              {
                  std::lock_guard<std::mutex> lk(mutex_);
                  if (data_queue_.empty())
                      return std::shared_ptr<T>();
                  std::shared_ptr<T> res(std::make_shared<T>(data_queue_.front()));
                  data_queue_.pop();
                  return res;
              }
  
              int size() const
              {
                  std::lock_guard<std::mutex> lk(mutex_);
                  return data_queue_.size();
              }
  
  
              int empty() const
              {
                  return data_queue_.size() == 0;
              }
  
          private:
              mutable std::mutex mutex_;
              std::queue<T> data_queue_;
              std::condition_variable cv_;
  
          };
  
  
      }  // namespace buffer
  }  // namespace helpers