/home/runner/work/HiCR/HiCR/include/hicr/frontends/channel/variableSize/mpsc/nonlocking/consumer.hpp Source File

HiCR: /home/runner/work/HiCR/HiCR/include/hicr/frontends/channel/variableSize/mpsc/nonlocking/consumer.hpp Source File
HiCR
consumer.hpp
1/*
2 * Copyright 2025 Huawei Technologies Co., Ltd.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
24#pragma once
25
26#include <queue>
28
29namespace HiCR::channel::variableSize::MPSC::nonlocking
30{
31
37{
38 public:
39
64 Consumer(CommunicationManager &communicationManager,
65 const std::vector<std::shared_ptr<GlobalMemorySlot>> &payloadBuffers,
66 const std::vector<std::shared_ptr<GlobalMemorySlot>> &tokenBuffers,
67 const std::vector<std::shared_ptr<LocalMemorySlot>> &internalCoordinationBufferForCounts,
68 const std::vector<std::shared_ptr<LocalMemorySlot>> &internalCoordinationBufferForPayloads,
69 const std::vector<std::shared_ptr<GlobalMemorySlot>> &producerCoordinationBufferForCounts,
70 const std::vector<std::shared_ptr<GlobalMemorySlot>> &producerCoordinationBufferForPayloads,
71 const size_t payloadCapacity,
72 const size_t payloadSize,
73 const size_t capacity)
74 : _communicationManager(&communicationManager)
75 {
76 // make sure producer and consumer sides have the same element size
77 // the size is hopefully the producer count
78 assert(!internalCoordinationBufferForCounts.empty());
79 auto producerCount = internalCoordinationBufferForCounts.size();
80 assert(producerCount == internalCoordinationBufferForPayloads.size());
81 assert(producerCount == producerCoordinationBufferForCounts.size());
82 assert(producerCount == producerCoordinationBufferForPayloads.size());
83
84 // create p (= number of producers) SPSC channels
85 for (size_t i = 0; i < producerCount; i++)
86 {
87 std::shared_ptr<variableSize::SPSC::Consumer> consumerPtr(new variableSize::SPSC::Consumer(communicationManager,
88 payloadBuffers[i],
89 tokenBuffers[i],
90 internalCoordinationBufferForCounts[i],
91 internalCoordinationBufferForPayloads[i],
92 producerCoordinationBufferForCounts[i],
93 producerCoordinationBufferForPayloads[i],
94 payloadCapacity,
95 capacity));
96 _spscList.push_back(consumerPtr);
97
98 /*
99 * Note that it is important to record messages that might already have been received
100 * immediately upon creation of the SPSC channel. Therefore we do not reset
101 * _depths to zero, and check for "early" received messages
102 */
103 _depths.push_back(consumerPtr->getDepth());
104 for (size_t j = 0; j < _depths.back(); j++) { _channelPushes.push(i); }
105 }
106 }
107
108 ~Consumer() = default;
109
125 __INLINE__ std::array<size_t, 3> peek(const size_t pos = 0)
126 {
127 std::array<size_t, 3> ret = {0};
128 // @ToDo: to support pos > 0, we need to modify _channelPushes to
129 // be of type std::vector instead of std::queue
130 if (pos > 0) HICR_THROW_LOGIC("Nonblocking MPSC not yet implemented for peek with n!=0");
131
132 _communicationManager->flushReceived();
133 updateDepth();
134 if (_channelPushes.empty()) HICR_THROW_RUNTIME("Attempting to peek position (%lu) but supporting queue has size (%lu)", pos, _channelPushes.size());
135
136 size_t channelId = _channelPushes.front(); // front() returns the first (i.e. oldest) element
137 if (channelId >= _spscList.size()) { HICR_THROW_LOGIC("channelId (%lu) >= _spscList.size() (%lu)", channelId, _spscList.size()); }
138 ret[0] = channelId;
139 ret[1] = _spscList[channelId]->peek()[0];
140 ret[2] = _spscList[channelId]->peek()[1];
141
142 return ret;
143 }
144
151 __INLINE__ size_t getDepth()
152 {
153 size_t totalDepth = 0;
154 for (auto d : _depths) { totalDepth += d; }
155
156 if (totalDepth != _channelPushes.size())
157 {
158 HICR_THROW_LOGIC("Helper FIFO and channels are out of sync, implemenation issue! getDepth (%lu) != _channelPushes.size() (%lu)", totalDepth, _channelPushes.size());
159 }
160 return totalDepth;
161 }
162
167 __INLINE__ bool isEmpty() { return (getDepth() == 0); }
168
174 __INLINE__ void pop(const size_t n = 1)
175 {
176 updateDepth();
177 // If the exchange buffer does not have enough tokens, reject operation
178 if (n > getDepth())
179 HICR_THROW_RUNTIME("Attempting to pop (%lu) tokens, which is more than the number of current tokens in the channel (%lu)", n, getDepth());
180 else
181 {
182 size_t channelFirstPushed = _channelPushes.front();
183 if (channelFirstPushed >= _spscList.size())
184 HICR_THROW_LOGIC("Index of latest push channel incorrect!");
185 else
186 {
187 // pop n elements from the SPSCs in the order recorded in the helper
188 // FIFO _channelPushes, and also update the FIFO itself
189 for (size_t i = 0; i < n; i++)
190 {
191 _spscList[channelFirstPushed]->pop();
192 _depths[channelFirstPushed]--;
193 _channelPushes.pop();
194 }
195 }
196 }
197 }
198
207 __INLINE__ void updateDepth()
208 {
209 std::vector<size_t> newDepths(_spscList.size());
210 /*
211 * Note that after calling updateDepth() on each SPSC channel,
212 * we must accept this state as a new temporary snapshot in newDepths.
213 * It is possible that during our iterating through newDepths, producers have
214 * sent more elements already, which will be handled in later updateDepth calls.
215 */
216
217 for (size_t i = 0; i < _spscList.size(); i++)
218 {
219 _spscList[i]->updateDepth();
220 newDepths[i] = _spscList[i]->getDepth();
221 }
222
223 for (size_t i = 0; i < _spscList.size(); i++)
224 {
225 assert(_depths[i] <= newDepths[i]);
226 for (size_t j = _depths[i]; j < newDepths[i]; j++) { _channelPushes.push(i); }
227 }
228 std::swap(_depths, newDepths);
229 if (getDepth() != _channelPushes.size()) { HICR_THROW_LOGIC("getDepth (%lu) != _channelPushes.size() (%lu)", getDepth(), _channelPushes.size()); }
230 }
231
232 private:
233
237 std::vector<std::shared_ptr<channel::variableSize::SPSC::Consumer>> _spscList;
241 std::queue<size_t> _channelPushes;
242
246 std::vector<size_t> _depths;
250 CommunicationManager *const _communicationManager;
251};
252
253} // namespace HiCR::channel::variableSize::MPSC::nonlocking
Definition communicationManager.hpp:54
virtual __INLINE__ void flushReceived()
Definition communicationManager.hpp:492
__INLINE__ size_t getDepth()
Definition consumer.hpp:151
__INLINE__ std::array< size_t, 3 > peek(const size_t pos=0)
Definition consumer.hpp:125
__INLINE__ bool isEmpty()
Definition consumer.hpp:167
__INLINE__ void updateDepth()
Definition consumer.hpp:207
__INLINE__ void pop(const size_t n=1)
Definition consumer.hpp:174
Consumer(CommunicationManager &communicationManager, const std::vector< std::shared_ptr< GlobalMemorySlot > > &payloadBuffers, const std::vector< std::shared_ptr< GlobalMemorySlot > > &tokenBuffers, const std::vector< std::shared_ptr< LocalMemorySlot > > &internalCoordinationBufferForCounts, const std::vector< std::shared_ptr< LocalMemorySlot > > &internalCoordinationBufferForPayloads, const std::vector< std::shared_ptr< GlobalMemorySlot > > &producerCoordinationBufferForCounts, const std::vector< std::shared_ptr< GlobalMemorySlot > > &producerCoordinationBufferForPayloads, const size_t payloadCapacity, const size_t payloadSize, const size_t capacity)
Definition consumer.hpp:64
#define HICR_THROW_RUNTIME(...)
Definition exceptions.hpp:74
#define HICR_THROW_LOGIC(...)
Definition exceptions.hpp:67
Provides functionality for a var-size SPSC consumer channel.