ConcurrentQueue.h
Go to the documentation of this file.
1 /*--------------------------------------------------------------------------------------+
2 |
3 | Supplied under applicable software license agreement.
4 |
5 | Copyright (c) 2018 Bentley Systems, Incorporated. All rights reserved.
6 |
7 +---------------------------------------------------------------------------------------*/
8 #pragma once
9 
10 #include <Bentley/Tasks/Tasks.h>
11 #include <Bentley/Bentley.h>
12 #include <queue>
13 
15 
16 /*--------------------------------------------------------------------------------------+
17 * @bsiclass Bentley Systems
18 +---------------+---------------+---------------+---------------+---------------+------*/
19 template<typename D>
21  {
22  private:
23  struct DataContainer
24  {
25  D data;
26  uint64_t index;
27  };
28 
29  struct DataContainerCompare
30  {
31  bool operator()(const DataContainer &a, const DataContainer &b) const
32  {
33  if (a.data->GetPriority () < b.data->GetPriority ())
34  {
35  return true;
36  }
37  else if (a.data->GetPriority () > b.data->GetPriority ())
38  {
39  return false;
40  }
41  else
42  {
43  return a.index > b.index;
44  }
45  }
46  };
47 
48  typedef std::priority_queue<DataContainer, bvector<DataContainer>, DataContainerCompare> DataContainerQueue;
49 
50  DataContainerQueue m_queue;
51 
52  struct EmptyQueuePredicate : IConditionVariablePredicate
53  {
54  private:
55  DataContainerQueue const& m_queue;
56  bool m_expectedEmpty;
57  public:
58  EmptyQueuePredicate(DataContainerQueue const& queue, bool expectedEmpty) : m_queue(queue), m_expectedEmpty(expectedEmpty) {}
59  virtual bool _TestCondition(BeConditionVariable &cv) override { return m_expectedEmpty==m_queue.empty (); }
60  };
61 
62  uint64_t m_queueIndex;
63 
64  mutable BeMutex m_mutex;
65  BeConditionVariable m_isEmptyCV;
66  BeConditionVariable m_isNotEmptyCV;
67 
68  public:
70  m_queueIndex (0)
71  {
72  };
73 
74  BeMutex& GetMutex () const
75  {
76  return m_mutex;
77  }
78 
79  void Push (const D& data)
80  {
81  BeMutexHolder l (m_mutex);
82 
83  DataContainer dataContainer;
84  dataContainer.data = data;
85  dataContainer.index = m_queueIndex++;
86 
87  m_queue.push (dataContainer);
88 
89  m_isNotEmptyCV.notify_one ();
90  }
91 
92  bool Empty () const
93  {
94  BeMutexHolder lk (m_mutex);
95  return ProtectedEmpty ();
96  }
97 
98  bool ProtectedEmpty () const
99  {
100  return m_queue.empty ();
101  }
102 
103  inline int Size () const
104  {
105  BeMutexHolder lk (m_mutex);
106  return (int)m_queue.size ();
107  }
108 
109  void WaitAndPop (D& poppedValue)
110  {
111  BeMutexHolder lk (m_mutex);
112  ProtectedWaitAndPop (poppedValue);
113  }
114 
115  bool TryPop (D& poppedValue)
116  {
117  BeMutexHolder lk (m_mutex);
118  return ProtectedTryPop (poppedValue);
119  }
120 
121  bool ProtectedTryPop (D& poppedValue)
122  {
123  if (m_queue.empty ())
124  {
125  return false;
126  }
127 
128  DataContainer dataContainer = m_queue.top ();
129  m_queue.pop ();
130 
131  poppedValue = dataContainer.data;
132 
133  return true;
134  }
135 
136 
137  void ProtectedWaitAndPop (D& poppedValue, BeMutexHolder& lk)
138  {
139  EmptyQueuePredicate predicate(m_queue, false);
140  m_isNotEmptyCV.ProtectedWaitOnCondition(lk, &predicate, BeConditionVariable::Infinite);
141 
142  DataContainer dataContainer = m_queue.top ();
143  m_queue.pop ();
144 
145  poppedValue = dataContainer.data;
146 
147  if (m_queue.empty ())
148  m_isEmptyCV.notify_all ();
149  }
150 
151  void WaitUntilEmpty () const
152  {
153  EmptyQueuePredicate predicate(m_queue, true);
154  m_isEmptyCV.WaitOnCondition(&predicate, BeConditionVariable::Infinite);
155  }
156  };
A synchronization primitive that can be used to block a thread, or multiple threads at the same time...
Definition: BeThread.h:132
bool ProtectedTryPop(D &poppedValue)
Definition: ConcurrentQueue.h:121
void WaitUntilEmpty() const
Definition: ConcurrentQueue.h:151
A BeMutex ownership wrapper.
Definition: BeThread.h:82
bool Empty() const
Definition: ConcurrentQueue.h:92
Definition: ConcurrentQueue.h:20
void ProtectedWaitAndPop(D &poppedValue, BeMutexHolder &lk)
Definition: ConcurrentQueue.h:137
unsigned long long uint64_t
Definition: Bentley.r.h:95
#define BEGIN_BENTLEY_TASKS_NAMESPACE
Definition: Tasks.h:13
BeMutex & GetMutex() const
Definition: ConcurrentQueue.h:74
bool TryPop(D &poppedValue)
Definition: ConcurrentQueue.h:115
int Size() const
Definition: ConcurrentQueue.h:103
Defines typedefs and constants that can be used across other namespaces. All Bentley-authored C++ sou...
void WaitAndPop(D &poppedValue)
Definition: ConcurrentQueue.h:109
bool ProtectedEmpty() const
Definition: ConcurrentQueue.h:98
A synchronization primitive that can be used to protect shared data from being simultaneously accesse...
Definition: BeThread.h:57
void Push(const D &data)
Definition: ConcurrentQueue.h:79
ConcurrentQueue()
Definition: ConcurrentQueue.h:69
#define END_BENTLEY_TASKS_NAMESPACE
Definition: Tasks.h:14
Provides implementation of predicate for a BeConditionVariable.
Definition: BeThread.h:117

Copyright © 2017 Bentley Systems, Incorporated. All rights reserved.