/home/runner/work/HiCR/HiCR/include/hicr/frontends/channel/variableSize/mpsc/locking/consumer.hpp Source File

HiCR: /home/runner/work/HiCR/HiCR/include/hicr/frontends/channel/variableSize/mpsc/locking/consumer.hpp Source File
HiCR
consumer.hpp
Go to the documentation of this file.
1/*
2 * Copyright 2025 Huawei Technologies Co., Ltd.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
24#pragma once
25
27#include <utility>
28
29namespace HiCR::channel::variableSize::MPSC::locking
30{
31
38class Consumer final : public variableSize::Base
39{
40 public:
41
65 Consumer(CommunicationManager &communicationManager,
66 std::shared_ptr<GlobalMemorySlot> payloadBuffer,
67 std::shared_ptr<GlobalMemorySlot> tokenBuffer,
68 const std::shared_ptr<LocalMemorySlot> &internalCoordinationBufferForCounts,
69 const std::shared_ptr<LocalMemorySlot> &internalCoordinationBufferForPayloads,
70 const std::shared_ptr<GlobalMemorySlot> &consumerCoordinationBufferForCounts,
71 std::shared_ptr<GlobalMemorySlot> consumerCoordinationBufferForPayloads,
72 const size_t payloadCapacity,
73 const size_t capacity)
74 : variableSize::Base(communicationManager, internalCoordinationBufferForCounts, internalCoordinationBufferForPayloads, capacity, payloadCapacity),
75 _payloadBuffer(std::move(payloadBuffer)),
76 _tokenSizeBuffer(std::move(tokenBuffer)),
77 _consumerCoordinationBufferForCounts(consumerCoordinationBufferForCounts),
78 _consumerCoordinationBufferForPayloads(std::move(consumerCoordinationBufferForPayloads))
79 {
80 assert(internalCoordinationBufferForCounts != nullptr);
81 assert(internalCoordinationBufferForPayloads != nullptr);
82 assert(consumerCoordinationBufferForCounts != nullptr);
83 getCommunicationManager()->queryMemorySlotUpdates(_tokenSizeBuffer->getSourceLocalMemorySlot());
84 getCommunicationManager()->queryMemorySlotUpdates(_payloadBuffer->getSourceLocalMemorySlot());
85 }
86
107 __INLINE__ size_t basePeek(const size_t pos = 0)
108 {
109 // Check if the requested position exceeds the capacity of the channel
110 if (pos >= getCircularBufferForCounts()->getCapacity())
111 HICR_THROW_LOGIC("Attempting to peek for a token with position (%lu), which is beyond than the channel capacity (%lu)", pos, getCircularBufferForCounts()->getCapacity());
112
113 // Check if there are enough tokens in the buffer to satisfy the request
114 if (pos >= getCircularBufferForCounts()->getDepth())
115 HICR_THROW_RUNTIME("Attempting to peek position (%lu) but not enough tokens (%lu) are in the buffer", pos, getCircularBufferForCounts()->getDepth());
116
117 // Calculating buffer position
118 const size_t bufferPos = (getCircularBufferForCounts()->getTailPosition() + pos) % getCircularBufferForCounts()->getCapacity();
119
120 // Succeeded in pushing the token(s)
121 return bufferPos;
122 }
123
137 __INLINE__ std::array<size_t, 2> peek(const size_t pos = 0)
138 {
140 std::array<size_t, 2> result{};
141 if (pos != 0) { HICR_THROW_FATAL("peek only implemented for n = 0 at the moment!"); }
142 if (pos >= getCircularBufferForCounts()->getDepth())
143 {
144 HICR_THROW_RUNTIME("Attempting to peek position (%lu) but not enough tokens (%lu) are in the buffer", pos, getCircularBufferForCounts()->getDepth());
145 }
146
147 result[0] = getCircularBufferForPayloads()->getTailPosition() % getCircularBufferForPayloads()->getCapacity();
148 size_t *tokenBufferPtr = static_cast<size_t *>(_tokenSizeBuffer->getSourceLocalMemorySlot()->getPointer());
149 auto tokenPos = basePeek(pos);
150 result[1] = tokenBufferPtr[tokenPos];
151 return result;
152 }
153
160 __INLINE__ size_t getOldPayloadBytes(size_t n)
161 {
162 if (n == 0) return 0;
163 size_t *tokenBufferPtr = static_cast<size_t *>(_tokenSizeBuffer->getSourceLocalMemorySlot()->getPointer());
164
165 size_t payloadBytes = 0;
166 for (size_t i = 0; i < n; i++)
167 {
168 assert(i >= 0);
169 size_t pos = basePeek(i);
170 auto payloadSize = tokenBufferPtr[pos];
171 payloadBytes += payloadSize;
172 }
173 return payloadBytes;
174 }
175
181 __INLINE__ size_t getNewPayloadBytes(size_t n)
182 {
183 if (n == 0) return 0;
184 size_t *tokenBufferPtr = static_cast<size_t *>(_tokenSizeBuffer->getSourceLocalMemorySlot()->getPointer());
185 size_t payloadBytes = 0;
186
187 for (size_t i = 0; i < n; i++)
188 {
189 size_t ind = getCircularBufferForCounts()->getDepth() - 1 - i;
190 assert(ind >= 0);
191 size_t pos = basePeek(ind);
192 auto payloadSize = tokenBufferPtr[pos];
193 payloadBytes += payloadSize;
194 }
195
196 return payloadBytes;
197 }
198
210 __INLINE__ bool pop(const size_t n = 1)
211 {
212 bool successFlag = false;
213
214 // Locking remote coordination buffer slot
215 if (getCommunicationManager()->acquireGlobalLock(_consumerCoordinationBufferForCounts) == false) return successFlag;
216
217 if (n > getCircularBufferForCounts()->getCapacity())
218 HICR_THROW_LOGIC("Attempting to pop (%lu) tokens, which is larger than the channel capacity (%lu)", n, getCircularBufferForCounts()->getCapacity());
219 // If the exchange buffer does not have n tokens pushed, reject operation
221 HICR_THROW_RUNTIME("Attempting to pop (%lu) tokens, which is more than the number of current tokens in the channel (%lu)", n, getCircularBufferForCounts()->getDepth());
222
223 size_t *tokenBufferPtr = static_cast<size_t *>(_tokenSizeBuffer->getSourceLocalMemorySlot()->getPointer());
224
225 size_t bytesOldestEntry = tokenBufferPtr[getCircularBufferForCounts()->getTailPosition()];
226
227 getCircularBufferForCounts()->advanceTail(n);
228 getCircularBufferForPayloads()->advanceTail(bytesOldestEntry);
229
230 getCommunicationManager()->releaseGlobalLock(_consumerCoordinationBufferForCounts);
231 successFlag = true;
232 return successFlag;
233 }
234
251 size_t getDepth() { return getCircularBufferForCounts()->getDepth(); }
252
261 bool isEmpty() { return (getDepth() == 0); }
262
268 [[nodiscard]] std::shared_ptr<GlobalMemorySlot> getPayloadBufferMemorySlot() const { return _payloadBuffer; }
269
270 private:
271
275 std::shared_ptr<GlobalMemorySlot> _payloadBuffer;
276
282 const std::shared_ptr<GlobalMemorySlot> _tokenSizeBuffer;
283
288 const std::shared_ptr<GlobalMemorySlot> _consumerCoordinationBufferForCounts;
289
294 const std::shared_ptr<GlobalMemorySlot> _consumerCoordinationBufferForPayloads;
295};
296
297} // namespace HiCR::channel::variableSize::MPSC::locking
Definition communicationManager.hpp:54
virtual __INLINE__ void flushReceived()
Definition communicationManager.hpp:492
__INLINE__ void releaseGlobalLock(const std::shared_ptr< GlobalMemorySlot > &memorySlot)
Definition communicationManager.hpp:462
__INLINE__ void queryMemorySlotUpdates(std::shared_ptr< LocalMemorySlot > memorySlot)
Definition communicationManager.hpp:248
__INLINE__ CommunicationManager * getCommunicationManager() const
Definition base.hpp:217
Definition base.hpp:41
__INLINE__ auto getCircularBufferForPayloads() const
Definition base.hpp:99
__INLINE__ auto getCircularBufferForCounts() const
Definition base.hpp:93
__INLINE__ size_t basePeek(const size_t pos=0)
Definition consumer.hpp:107
std::shared_ptr< GlobalMemorySlot > getPayloadBufferMemorySlot() const
Definition consumer.hpp:268
__INLINE__ bool pop(const size_t n=1)
Definition consumer.hpp:210
size_t getDepth()
Definition consumer.hpp:251
Consumer(CommunicationManager &communicationManager, std::shared_ptr< GlobalMemorySlot > payloadBuffer, std::shared_ptr< GlobalMemorySlot > tokenBuffer, const std::shared_ptr< LocalMemorySlot > &internalCoordinationBufferForCounts, const std::shared_ptr< LocalMemorySlot > &internalCoordinationBufferForPayloads, const std::shared_ptr< GlobalMemorySlot > &consumerCoordinationBufferForCounts, std::shared_ptr< GlobalMemorySlot > consumerCoordinationBufferForPayloads, const size_t payloadCapacity, const size_t capacity)
Definition consumer.hpp:65
__INLINE__ std::array< size_t, 2 > peek(const size_t pos=0)
Definition consumer.hpp:137
__INLINE__ size_t getNewPayloadBytes(size_t n)
Definition consumer.hpp:181
bool isEmpty()
Definition consumer.hpp:261
__INLINE__ size_t getOldPayloadBytes(size_t n)
Definition consumer.hpp:160
#define HICR_THROW_RUNTIME(...)
Definition exceptions.hpp:74
#define HICR_THROW_LOGIC(...)
Definition exceptions.hpp:67
#define HICR_THROW_FATAL(...)
Definition exceptions.hpp:81
extends channel::Base into a base enabling var-size messages