/home/runner/work/HiCR/HiCR/include/hicr/frontends/channel/fixedSize/mpsc/nonlocking/consumer.hpp Source File

HiCR: /home/runner/work/HiCR/HiCR/include/hicr/frontends/channel/fixedSize/mpsc/nonlocking/consumer.hpp Source File
HiCR
consumer.hpp
Go to the documentation of this file.
1/*
2 * Copyright 2025 Huawei Technologies Co., Ltd.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
24#pragma once
25
26#include <queue>
27#include <hicr/core/definitions.hpp>
31
32namespace HiCR::channel::fixedSize::MPSC::nonlocking
33{
34
42{
43 public:
44
62 Consumer(CommunicationManager &coordinationCommunicationManager,
63 CommunicationManager &payloadCommunicationManager,
64 std::vector<std::shared_ptr<GlobalMemorySlot>> tokenBuffers,
65 std::vector<std::shared_ptr<LocalMemorySlot>> internalCoordinationBuffers,
66 std::vector<std::shared_ptr<GlobalMemorySlot>> producerCoordinationBuffers,
67 const size_t tokenSize,
68 const size_t capacity)
69 : _tokenBuffers(tokenBuffers),
70 _coordinationCommunicationManager(&coordinationCommunicationManager),
71 _payloadCommunicationManager(&payloadCommunicationManager)
72 {
73 // make sure producer and consumer sides provide p elements, equalling
74 // the number of producers
75 assert(internalCoordinationBuffers.size() == producerCoordinationBuffers.size());
76 assert(internalCoordinationBuffers.size() == tokenBuffers.size());
77 // create p (= number of producers) SPSC channels
78 for (size_t i = 0; i < internalCoordinationBuffers.size(); i++)
79 {
80 std::shared_ptr<fixedSize::SPSC::Consumer> consumerPtr(new fixedSize::SPSC::Consumer(
81 coordinationCommunicationManager, payloadCommunicationManager, tokenBuffers[i], internalCoordinationBuffers[i], producerCoordinationBuffers[i], tokenSize, capacity));
82 _spscList.push_back(consumerPtr);
83 _depths.push_back(0);
84 }
85 }
86
87 ~Consumer() = default;
88
102 __INLINE__ std::array<size_t, 2> peek(const size_t pos = 0)
103 {
104 std::array<size_t, 2> ret = {0};
105 // @ToDo: to support pos > 0, we need to modify _channelPushes to
106 // be of type std::vector instead of std::queue
107 if (pos > 0) HICR_THROW_LOGIC("Nonblocking MPSC not yet implemented for peek with n!=0");
108
109 _coordinationCommunicationManager->flushReceived();
110 _payloadCommunicationManager->flushReceived();
111 updateDepth();
112 if (_channelPushes.empty()) HICR_THROW_RUNTIME("Attempting to peek position (%lu) but supporting queue has size (%lu)", pos, _channelPushes.size());
113
114 size_t channelId = _channelPushes.front(); // front() returns the first (i.e. oldest) element
115 if (channelId >= _spscList.size()) { HICR_THROW_LOGIC("channelId (%lu) >= _spscList.size() (%lu)", channelId, _spscList.size()); }
116 ret[0] = channelId;
117 ret[1] = _spscList[channelId]->peek();
118
119 return ret;
120 }
121
128 __INLINE__ size_t getDepth()
129 {
130 size_t totalDepth = 0;
131 for (auto d : _depths) { totalDepth += d; }
132 // if (totalDepth != _channelPushes.size()) {
133 // HICR_THROW_LOGIC("Helper FIFO and channels are out of sync, implemenation issue! getDepth (%lu) != _channelPushes.size() (%lu)", totalDepth, _channelPushes.size());
134 // }
135 return totalDepth;
136 }
137
142 __INLINE__ bool isEmpty() { return (getDepth() == 0); }
143
149 __INLINE__ void pop(const size_t n = 1)
150 {
151 updateDepth();
152 // If the exchange buffer does not have enough tokens, reject operation
153 if (n > getDepth())
154 HICR_THROW_RUNTIME("Attempting to pop (%lu) tokens, which is more than the number of current tokens in the channel (%lu)", n, getDepth());
155 else
156 {
157 size_t channelFirstPushed = _channelPushes.front();
158 if (channelFirstPushed >= _spscList.size())
159 HICR_THROW_LOGIC("Index of latest push channel incorrect!");
160 else
161 {
162 // pop n elements from the SPSCs in the order recorded in the helper
163 // FIFO _channelPushes, and also update the FIFO itself
164 for (size_t i = 0; i < n; i++)
165 {
166 _spscList[channelFirstPushed]->pop();
167 _depths[channelFirstPushed]--;
168 _channelPushes.pop();
169 }
170 }
171 }
172 }
173
182 __INLINE__ void updateDepth()
183 {
184 std::vector<size_t> newDepths(_spscList.size());
185 // Note that after calling updateDepth() on each SPSC channel,
186 // we must accept this state as a new temporary snapshot in newDepths
187 // It is possible that during our iterating through newDepths, producers have
188 // sent more elements already, which will be handled in later
189 // updateDepth calls
190 for (size_t i = 0; i < _spscList.size(); i++)
191 {
192 _spscList[i]->updateDepth();
193 newDepths[i] = _spscList[i]->getDepth();
194 }
195
196 for (size_t i = 0; i < _spscList.size(); i++)
197 {
198 for (size_t j = _depths[i]; j < newDepths[i]; j++) { _channelPushes.push(i); }
199 }
200 std::swap(_depths, newDepths);
201 if (getDepth() != _channelPushes.size()) { HICR_THROW_LOGIC("getDepth (%lu) != _channelPushes.size() (%lu)", getDepth(), _channelPushes.size()); }
202 }
203
210 [[nodiscard]] __INLINE__ std::vector<std::shared_ptr<GlobalMemorySlot>> getTokenBuffers() const { return _tokenBuffers; }
211
212 private:
213
218 const std::vector<std::shared_ptr<HiCR::GlobalMemorySlot>> _tokenBuffers;
219
223 std::vector<std::shared_ptr<channel::fixedSize::SPSC::Consumer>> _spscList;
227 std::queue<size_t> _channelPushes;
228
232 std::vector<size_t> _depths;
233
237 CommunicationManager *const _coordinationCommunicationManager;
238
242 CommunicationManager *const _payloadCommunicationManager;
243};
244
245} // namespace HiCR::channel::fixedSize::MPSC::nonlocking
Definition communicationManager.hpp:54
virtual __INLINE__ void flushReceived()
Definition communicationManager.hpp:469
Consumer(CommunicationManager &coordinationCommunicationManager, CommunicationManager &payloadCommunicationManager, std::vector< std::shared_ptr< GlobalMemorySlot > > tokenBuffers, std::vector< std::shared_ptr< LocalMemorySlot > > internalCoordinationBuffers, std::vector< std::shared_ptr< GlobalMemorySlot > > producerCoordinationBuffers, const size_t tokenSize, const size_t capacity)
Definition consumer.hpp:62
__INLINE__ std::array< size_t, 2 > peek(const size_t pos=0)
Definition consumer.hpp:102
__INLINE__ size_t getDepth()
Definition consumer.hpp:128
__INLINE__ void pop(const size_t n=1)
Definition consumer.hpp:149
__INLINE__ std::vector< std::shared_ptr< GlobalMemorySlot > > getTokenBuffers() const
Definition consumer.hpp:210
__INLINE__ bool isEmpty()
Definition consumer.hpp:142
__INLINE__ void updateDepth()
Definition consumer.hpp:182
Provides a failure model and corresponding exception classes.
#define HICR_THROW_RUNTIME(...)
Definition exceptions.hpp:74
#define HICR_THROW_LOGIC(...)
Definition exceptions.hpp:67
Provides base functionality for a fixed-size MPSC channel over HiCR.
Provides consumer functionality for a fixed size SPSC channel over HiCR.