/home/runner/work/HiCR/HiCR/include/hicr/frontends/channel/fixedSize/mpsc/nonlocking/consumer.hpp Source File

HiCR: /home/runner/work/HiCR/HiCR/include/hicr/frontends/channel/fixedSize/mpsc/nonlocking/consumer.hpp Source File
HiCR
consumer.hpp
Go to the documentation of this file.
1/*
2 * Copyright 2025 Huawei Technologies Co., Ltd.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
24#pragma once
25
26#include <queue>
27#include <hicr/core/definitions.hpp>
31
32namespace HiCR::channel::fixedSize::MPSC::nonlocking
33{
34
42{
43 public:
44
61 Consumer(CommunicationManager &communicationManager,
62 std::vector<std::shared_ptr<GlobalMemorySlot>> tokenBuffers,
63 std::vector<std::shared_ptr<LocalMemorySlot>> internalCoordinationBuffers,
64 std::vector<std::shared_ptr<GlobalMemorySlot>> producerCoordinationBuffers,
65 const size_t tokenSize,
66 const size_t capacity)
67 : _tokenBuffers(tokenBuffers),
68 _communicationManager(&communicationManager)
69 {
70 // make sure producer and consumer sides provide p elements, equalling
71 // the number of producers
72 assert(internalCoordinationBuffers.size() == producerCoordinationBuffers.size());
73 assert(internalCoordinationBuffers.size() == tokenBuffers.size());
74 // create p (= number of producers) SPSC channels
75 for (size_t i = 0; i < internalCoordinationBuffers.size(); i++)
76 {
77 std::shared_ptr<fixedSize::SPSC::Consumer> consumerPtr(
78 new fixedSize::SPSC::Consumer(communicationManager, tokenBuffers[i], internalCoordinationBuffers[i], producerCoordinationBuffers[i], tokenSize, capacity));
79 _spscList.push_back(consumerPtr);
80 _depths.push_back(0);
81 }
82 }
83
84 ~Consumer() = default;
85
99 __INLINE__ std::array<size_t, 2> peek(const size_t pos = 0)
100 {
101 std::array<size_t, 2> ret = {0};
102 // @ToDo: to support pos > 0, we need to modify _channelPushes to
103 // be of type std::vector instead of std::queue
104 if (pos > 0) HICR_THROW_LOGIC("Nonblocking MPSC not yet implemented for peek with n!=0");
105
106 _communicationManager->flushReceived();
107 updateDepth();
108 if (_channelPushes.empty()) HICR_THROW_RUNTIME("Attempting to peek position (%lu) but supporting queue has size (%lu)", pos, _channelPushes.size());
109
110 size_t channelId = _channelPushes.front(); // front() returns the first (i.e. oldest) element
111 if (channelId >= _spscList.size()) { HICR_THROW_LOGIC("channelId (%lu) >= _spscList.size() (%lu)", channelId, _spscList.size()); }
112 ret[0] = channelId;
113 ret[1] = _spscList[channelId]->peek();
114
115 return ret;
116 }
117
124 __INLINE__ size_t getDepth()
125 {
126 size_t totalDepth = 0;
127 for (auto d : _depths) { totalDepth += d; }
128 // if (totalDepth != _channelPushes.size()) {
129 // HICR_THROW_LOGIC("Helper FIFO and channels are out of sync, implemenation issue! getDepth (%lu) != _channelPushes.size() (%lu)", totalDepth, _channelPushes.size());
130 // }
131 return totalDepth;
132 }
133
138 __INLINE__ bool isEmpty() { return (getDepth() == 0); }
139
145 __INLINE__ void pop(const size_t n = 1)
146 {
147 updateDepth();
148 // If the exchange buffer does not have enough tokens, reject operation
149 if (n > getDepth())
150 HICR_THROW_RUNTIME("Attempting to pop (%lu) tokens, which is more than the number of current tokens in the channel (%lu)", n, getDepth());
151 else
152 {
153 size_t channelFirstPushed = _channelPushes.front();
154 if (channelFirstPushed >= _spscList.size())
155 HICR_THROW_LOGIC("Index of latest push channel incorrect!");
156 else
157 {
158 // pop n elements from the SPSCs in the order recorded in the helper
159 // FIFO _channelPushes, and also update the FIFO itself
160 for (size_t i = 0; i < n; i++)
161 {
162 _spscList[channelFirstPushed]->pop();
163 _depths[channelFirstPushed]--;
164 _channelPushes.pop();
165 }
166 }
167 }
168 }
169
178 __INLINE__ void updateDepth()
179 {
180 std::vector<size_t> newDepths(_spscList.size());
181 // Note that after calling updateDepth() on each SPSC channel,
182 // we must accept this state as a new temporary snapshot in newDepths
183 // It is possible that during our iterating through newDepths, producers have
184 // sent more elements already, which will be handled in later
185 // updateDepth calls
186 for (size_t i = 0; i < _spscList.size(); i++)
187 {
188 _spscList[i]->updateDepth();
189 newDepths[i] = _spscList[i]->getDepth();
190 }
191
192 for (size_t i = 0; i < _spscList.size(); i++)
193 {
194 for (size_t j = _depths[i]; j < newDepths[i]; j++) { _channelPushes.push(i); }
195 }
196 std::swap(_depths, newDepths);
197 if (getDepth() != _channelPushes.size()) { HICR_THROW_LOGIC("getDepth (%lu) != _channelPushes.size() (%lu)", getDepth(), _channelPushes.size()); }
198 }
199
206 [[nodiscard]] __INLINE__ std::vector<std::shared_ptr<GlobalMemorySlot>> getTokenBuffers() const { return _tokenBuffers; }
207
208 private:
209
214 const std::vector<std::shared_ptr<HiCR::GlobalMemorySlot>> _tokenBuffers;
215
219 std::vector<std::shared_ptr<channel::fixedSize::SPSC::Consumer>> _spscList;
223 std::queue<size_t> _channelPushes;
224
228 std::vector<size_t> _depths;
232 CommunicationManager *const _communicationManager;
233};
234
235} // namespace HiCR::channel::fixedSize::MPSC::nonlocking
Definition communicationManager.hpp:54
virtual __INLINE__ void flushReceived()
Definition communicationManager.hpp:492
__INLINE__ std::array< size_t, 2 > peek(const size_t pos=0)
Definition consumer.hpp:99
__INLINE__ size_t getDepth()
Definition consumer.hpp:124
Consumer(CommunicationManager &communicationManager, std::vector< std::shared_ptr< GlobalMemorySlot > > tokenBuffers, std::vector< std::shared_ptr< LocalMemorySlot > > internalCoordinationBuffers, std::vector< std::shared_ptr< GlobalMemorySlot > > producerCoordinationBuffers, const size_t tokenSize, const size_t capacity)
Definition consumer.hpp:61
__INLINE__ void pop(const size_t n=1)
Definition consumer.hpp:145
__INLINE__ std::vector< std::shared_ptr< GlobalMemorySlot > > getTokenBuffers() const
Definition consumer.hpp:206
__INLINE__ bool isEmpty()
Definition consumer.hpp:138
__INLINE__ void updateDepth()
Definition consumer.hpp:178
Provides a failure model and corresponding exception classes.
#define HICR_THROW_RUNTIME(...)
Definition exceptions.hpp:74
#define HICR_THROW_LOGIC(...)
Definition exceptions.hpp:67
Provides base functionality for a fixed-size MPSC channel over HiCR.
Provides consumer functionality for a fixed size SPSC channel over HiCR.