conc_queue_tests.cpp
5.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2019 Intel Corporation
#include "../test_precomp.hpp"
#include <unordered_set>
#include <thread>
#include "executor/conc_queue.hpp"
namespace opencv_test
{
using namespace cv::gapi;
TEST(ConcQueue, PushPop)
{
own::concurrent_bounded_queue<int> q;
for (int i = 0; i < 100; i++)
{
q.push(i);
}
for (int i = 0; i < 100; i++)
{
int x;
q.pop(x);
EXPECT_EQ(x, i);
}
}
TEST(ConcQueue, TryPop)
{
own::concurrent_bounded_queue<int> q;
int x = 0;
EXPECT_FALSE(q.try_pop(x));
q.push(1);
EXPECT_TRUE(q.try_pop(x));
EXPECT_EQ(1, x);
}
TEST(ConcQueue, Clear)
{
own::concurrent_bounded_queue<int> q;
for (int i = 0; i < 10; i++)
{
q.push(i);
}
q.clear();
int x = 0;
EXPECT_FALSE(q.try_pop(x));
}
// In this test, every writer thread produces its own range of integer
// numbers, writing those to a shared queue.
//
// Every reader thread pops elements from the queue (until -1 is
// reached) and stores those in its own associated set.
//
// Finally, the master thread waits for completion of all other
// threads and verifies that all the necessary data is
// produced/obtained.
namespace
{
using StressParam = std::tuple<int // Num writer threads
,int // Num elements per writer
,int // Num reader threads
,std::size_t>; // Queue capacity
constexpr int STOP_SIGN = -1;
constexpr int BASE = 1000;
}
struct ConcQueue_: public ::testing::TestWithParam<StressParam>
{
using Q = own::concurrent_bounded_queue<int>;
using S = std::unordered_set<int>;
static void writer(int base, int writes, Q& q)
{
for (int i = 0; i < writes; i++)
{
q.push(base + i);
}
q.push(STOP_SIGN);
}
static void reader(Q& q, S& s)
{
int x = 0;
while (true)
{
q.pop(x);
if (x == STOP_SIGN) return;
s.insert(x);
}
}
};
TEST_P(ConcQueue_, Test)
{
int num_writers = 0;
int num_writes = 0;
int num_readers = 0;
std::size_t capacity = 0u;
std::tie(num_writers, num_writes, num_readers, capacity) = GetParam();
CV_Assert(num_writers < 20);
CV_Assert(num_writes < BASE);
Q q;
if (capacity)
{
// see below (2)
CV_Assert(static_cast<int>(capacity) > (num_writers - num_readers));
q.set_capacity(capacity);
}
// Start reader threads
std::vector<S> storage(num_readers);
std::vector<std::thread> readers;
for (S& s : storage)
{
readers.emplace_back(reader, std::ref(q), std::ref(s));
}
// Start writer threads, also pre-generate reference numbers
S reference;
std::vector<std::thread> writers;
for (int w = 0; w < num_writers; w++)
{
writers.emplace_back(writer, w*BASE, num_writes, std::ref(q));
for (int r = 0; r < num_writes; r++)
{
reference.insert(w*BASE + r);
}
}
// Every writer puts a STOP_SIGN at the end,
// There are three cases:
// 1) num_writers == num_readers
// every reader should get its own STOP_SIGN from any
// of the writers
//
// 2) num_writers > num_readers
// every reader will get a STOP_SIGN but there're more
// STOP_SIGNs may be pushed to the queue - and if this
// number exceeds capacity, writers block (to a deadlock).
// The latter situation must be avoided at parameters level.
// [a] Also not every data produced by writers will be consumed
// by a reader in this case. Master thread will read the rest
//
// 3) num_readers > num_writers
// in this case, some readers will stuck and will never get
// a STOP_SIGN. Master thread will push extra STOP_SIGNs to the
// queue.
// Solution to (2a)
S remnants;
if (num_writers > num_readers)
{
int extra = num_writers - num_readers;
while (extra)
{
int x = 0;
q.pop(x);
if (x == STOP_SIGN) extra--;
else remnants.insert(x);
}
}
// Solution to (3)
if (num_readers > num_writers)
{
int extra = num_readers - num_writers;
while (extra--) q.push(STOP_SIGN);
}
// Wait for completions
for (auto &t : readers) t.join();
for (auto &t : writers) t.join();
// Accumulate and validate the result
S result(remnants.begin(), remnants.end());
for (const auto &s : storage) result.insert(s.begin(), s.end());
EXPECT_EQ(reference, result);
}
INSTANTIATE_TEST_CASE_P(ConcQueueStress, ConcQueue_,
Combine( Values(1, 2, 4, 8, 16) // writers
, Values(1, 32, 96, 256) // writes
, Values(1, 2, 10) // readers
, Values(0u, 16u, 32u))); // capacity
} // namespace opencv_test