gstreamingexecutor.hpp
6.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2019-2020 Intel Corporation
#ifndef OPENCV_GAPI_GSTREAMING_EXECUTOR_HPP
#define OPENCV_GAPI_GSTREAMING_EXECUTOR_HPP
#ifdef _MSC_VER
#pragma warning(disable: 4503) // "decorated name length exceeded"
// on concurrent_bounded_queue
#endif
#include <memory> // unique_ptr, shared_ptr
#include <thread> // thread
#include <vector>
#include <unordered_map>
#if defined(HAVE_TBB)
# include <tbb/concurrent_queue.h> // FIXME: drop it from here!
template<typename T> using QueueClass = tbb::concurrent_bounded_queue<T>;
#else
# include "executor/conc_queue.hpp"
template<typename T> using QueueClass = cv::gapi::own::concurrent_bounded_queue<T>;
#endif // TBB
#include "executor/last_value.hpp"
#include <ade/graph.hpp>
#include "backends/common/gbackend.hpp"
namespace cv {
namespace gimpl {
namespace stream {
struct Start {};
struct Stop {
enum class Kind {
HARD, // a hard-stop: end-of-pipeline reached or stop() called
CNST, // a soft-stop emitted for/by constant sources (see QueueReader)
} kind = Kind::HARD;
cv::GRunArg cdata; // const data for CNST stop
};
struct Result {
cv::GRunArgs args; // Full results vector
std::vector<bool> flags; // Availability flags (in case of desync)
};
using Cmd = cv::util::variant
< cv::util::monostate
, Start // Tells emitters to start working. Not broadcasted to workers.
, Stop // Tells emitters to stop working. Broadcasted to workers.
, cv::GRunArg // Workers data payload to process.
, Result // Pipeline's data for gout()
>;
// Interface over a queue. The underlying queue implementation may be
// different. This class is mainly introduced to bring some
// abstraction over the real queues (bounded in-order) and a
// desynchronized data slots (see required to implement
// cv::gapi::desync)
class Q {
public:
virtual void push(const Cmd &cmd) = 0;
virtual void pop(Cmd &cmd) = 0;
virtual bool try_pop(Cmd &cmd) = 0;
virtual void clear() = 0;
virtual ~Q() = default;
};
// A regular queue implementation
class SyncQueue final: public Q {
QueueClass<Cmd> m_q; // FIXME: OWN or WRAP??
public:
virtual void push(const Cmd &cmd) override { m_q.push(cmd); }
virtual void pop(Cmd &cmd) override { m_q.pop(cmd); }
virtual bool try_pop(Cmd &cmd) override { return m_q.try_pop(cmd); }
virtual void clear() override { m_q.clear(); }
void set_capacity(std::size_t c) { m_q.set_capacity(c);}
};
// Desynchronized "queue" implementation
// Every push overwrites value which is not yet popped
// This container can hold 0 or 1 element
// Special handling for Stop is implemented (FIXME: not really)
class DesyncQueue final: public Q {
cv::gapi::own::last_written_value<Cmd> m_v;
public:
virtual void push(const Cmd &cmd) override { m_v.push(cmd); }
virtual void pop(Cmd &cmd) override { m_v.pop(cmd); }
virtual bool try_pop(Cmd &cmd) override { return m_v.try_pop(cmd); }
virtual void clear() override { m_v.clear(); }
};
} // namespace stream
// FIXME: Currently all GExecutor comments apply also
// to this one. Please document it separately in the future.
class GStreamingExecutor final
{
protected:
// GStreamingExecutor is a state machine described as follows
//
// setSource() called
// STOPPED: - - - - - - - - - ->READY:
// -------- ------
// Initial state Input data specified
// No threads running Threads are created and IDLE
// ^ (Currently our emitter threads
// : are bounded to input data)
// : stop() called No processing happending
// : OR :
// : end-of-stream reached : start() called
// : during pull()/try_pull() V
// : RUNNING:
// : --------
// : Actual pipeline execution
// - - - - - - - - - - - - - - Threads are running
//
enum class State {
STOPPED,
READY,
RUNNING,
} state = State::STOPPED;
std::unique_ptr<ade::Graph> m_orig_graph;
std::shared_ptr<ade::Graph> m_island_graph;
cv::GCompileArgs m_comp_args;
cv::GMetaArgs m_last_metas;
util::optional<bool> m_reshapable;
cv::gimpl::GIslandModel::Graph m_gim; // FIXME: make const?
const bool m_desync;
// FIXME: Naive executor details are here for now
// but then it should be moved to another place
struct OpDesc
{
std::vector<RcDesc> in_objects;
std::vector<RcDesc> out_objects;
cv::GMetaArgs out_metas;
ade::NodeHandle nh;
cv::GRunArgs in_constants;
std::shared_ptr<GIslandExecutable> isl_exec;
};
std::vector<OpDesc> m_ops;
struct DataDesc
{
ade::NodeHandle slot_nh;
ade::NodeHandle data_nh;
};
std::vector<DataDesc> m_slots;
cv::GRunArgs m_const_vals;
// Order in these vectors follows the GComputaion's protocol
std::vector<ade::NodeHandle> m_emitters;
std::vector<ade::NodeHandle> m_sinks;
class Synchronizer;
std::unique_ptr<Synchronizer> m_sync;
std::vector<std::thread> m_threads;
std::vector<stream::SyncQueue> m_emitter_queues;
// a view over m_emitter_queues
std::vector<stream::SyncQueue*> m_const_emitter_queues;
std::vector<stream::Q*> m_sink_queues;
// desync path tags for outputs. -1 means that output
// doesn't belong to a desync path
std::vector<int> m_sink_sync;
std::unordered_set<stream::Q*> m_internal_queues;
stream::SyncQueue m_out_queue;
// Describes mapping from desync paths to collector threads
struct CollectorThreadInfo {
std::vector<stream::Q*> queues;
std::vector<int> mapping;
};
std::unordered_map<int, CollectorThreadInfo> m_collector_map;
void wait_shutdown();
cv::GTypesInfo out_info;
public:
explicit GStreamingExecutor(std::unique_ptr<ade::Graph> &&g_model,
const cv::GCompileArgs &comp_args);
~GStreamingExecutor();
void setSource(GRunArgs &&args);
void start();
bool pull(cv::GRunArgsP &&outs);
bool pull(cv::GOptRunArgsP &&outs);
std::tuple<bool, cv::util::variant<cv::GRunArgs, cv::GOptRunArgs>> pull();
bool try_pull(cv::GRunArgsP &&outs);
void stop();
bool running() const;
};
} // namespace gimpl
} // namespace cv
#endif // OPENCV_GAPI_GSTREAMING_EXECUTOR_HPP