/home/runner/work/HiCR/HiCR/include/hicr/frontends/channel/variableSize/spsc/producer.hpp Source File

HiCR: /home/runner/work/HiCR/HiCR/include/hicr/frontends/channel/variableSize/spsc/producer.hpp Source File
HiCR
producer.hpp
Go to the documentation of this file.
1/*
2 * Copyright 2025 Huawei Technologies Co., Ltd.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
24#pragma once
25
27#include <utility>
28
29namespace HiCR::channel::variableSize::SPSC
30{
31
39{
40 public:
41
62 Producer(CommunicationManager &communicationManager,
63 std::shared_ptr<LocalMemorySlot> sizeInfoBuffer,
64 std::shared_ptr<GlobalMemorySlot> payloadBuffer,
65 std::shared_ptr<GlobalMemorySlot> tokenBuffer,
66 const std::shared_ptr<LocalMemorySlot> &internalCoordinationBufferForCounts,
67 const std::shared_ptr<LocalMemorySlot> &internalCoordinationBufferForPayloads,
68 std::shared_ptr<GlobalMemorySlot> consumerCoordinationBufferForCounts,
69 std::shared_ptr<GlobalMemorySlot> consumerCoordinationBufferForPayloads,
70 const size_t payloadCapacity,
71 const size_t payloadSize,
72 const size_t capacity)
73 : variableSize::Base(communicationManager, internalCoordinationBufferForCounts, internalCoordinationBufferForPayloads, capacity, payloadCapacity),
74 _payloadBuffer(std::move(payloadBuffer)),
75 _sizeInfoBuffer(std::move(sizeInfoBuffer)),
76 _payloadSize(payloadSize),
77 _tokenBuffer(std::move(tokenBuffer)),
78 _consumerCoordinationBufferForCounts(std::move(consumerCoordinationBufferForCounts)),
79 _consumerCoordinationBufferForPayloads(std::move(consumerCoordinationBufferForPayloads))
80 {}
81
82 ~Producer() = default;
83
88 __INLINE__ void updateDepth() {}
89
94 __INLINE__ void advancePayloadTail(const size_t n = 1) { getCircularBufferForPayloads()->advanceTail(n); }
95
100 [[nodiscard]] __INLINE__ size_t getPayloadHeadPosition() const noexcept { return getCircularBufferForPayloads()->getHeadPosition(); }
101
106 __INLINE__ size_t getPayloadSize() { return _payloadSize; }
107
112 __INLINE__ size_t getPayloadDepth() { return getCircularBufferForPayloads()->getDepth(); }
113
118 __INLINE__ size_t getPayloadCapacity() { return getCircularBufferForPayloads()->getCapacity(); }
119
143 __INLINE__ void push(const std::shared_ptr<LocalMemorySlot> &sourceSlot, const size_t n = 1)
144 {
145 if (n != 1) HICR_THROW_RUNTIME("HiCR currently has no implementation for n != 1 with push(sourceSlot, n) for variable size version.");
146
147 // Make sure source slot is beg enough to satisfy the operation
148 size_t requiredBufferSize = sourceSlot->getSize();
149 size_t providedBufferCapacity = getPayloadCapacity();
150
151 // Updating depth of token (message sizes) and payload buffers
152 updateDepth();
153 auto currentCountsDepth = getCircularBufferForCounts()->getDepth();
154 auto currentPayloadDepth = getCircularBufferForPayloads()->getDepth();
155 auto currentDepth = getDepth();
156
157 /*
158 * Part 1: Copy the payload data
159 */
160 if (currentPayloadDepth + requiredBufferSize > providedBufferCapacity)
161 HICR_THROW_RUNTIME("Attempting to push (%lu) bytes while the channel currently has payload depth (%lu). This would exceed capacity (%lu).\n",
162 requiredBufferSize,
163 currentPayloadDepth,
164 providedBufferCapacity);
165
166 /*
167 * Payload copy:
168 * - We have checked (requiredBufferSize <= depth)
169 * that the payload fits into available circular buffer,
170 * but it is possible it spills over the end into the
171 * beginning. Cover this corner case below
172 *
173 */
174 getCircularBufferForPayloads()->setCachedDepth(currentPayloadDepth);
175 if (requiredBufferSize + getPayloadHeadPosition() > getPayloadCapacity())
176 {
177 size_t first_chunk = getPayloadCapacity() - getPayloadHeadPosition();
178 size_t second_chunk = requiredBufferSize - first_chunk;
179
180 // copy first part to end of buffer
181 getCommunicationManager()->memcpy(_payloadBuffer, /* destination */
182 getPayloadHeadPosition(), /* dst_offset */
183 sourceSlot, /* source */
184 0, /* src_offset */
185 first_chunk); /* size */
186 // copy second part to beginning of buffer
187 getCommunicationManager()->memcpy(_payloadBuffer, /* destination */
188 0, /* dst_offset */
189 sourceSlot, /* source */
190 first_chunk, /* src_offset */
191 second_chunk); /* size */
192 getCommunicationManager()->fence(sourceSlot, 2, 0);
193 }
194 else
195 {
196 getCommunicationManager()->memcpy(_payloadBuffer, getPayloadHeadPosition(), sourceSlot, 0, requiredBufferSize);
197 getCommunicationManager()->fence(sourceSlot, 1, 0);
198 }
199
200 getCircularBufferForPayloads()->advanceHead(requiredBufferSize, true);
201
202 // update the consumer coordination buffers (consumer does not update
203 // its own coordination head positions)
204 getCommunicationManager()->memcpy(_consumerCoordinationBufferForPayloads,
205 _HICR_CHANNEL_HEAD_ADVANCE_COUNT_IDX * sizeof(size_t),
207 _HICR_CHANNEL_HEAD_ADVANCE_COUNT_IDX * sizeof(size_t),
208 sizeof(size_t));
210
211 /*
212 * Part 2: Copy the message size
213 */
214
215 auto *sizeInfoBufferPtr = static_cast<size_t *>(_sizeInfoBuffer->getPointer());
216 sizeInfoBufferPtr[0] = requiredBufferSize;
217
218 // If the exchange buffer does not have n free slots, reject the operation
219 if (currentDepth + 1 > getCircularBufferForCounts()->getCapacity())
220 HICR_THROW_RUNTIME("Attempting to push with (%lu) tokens while the channel has (%lu) tokens and this would exceed capacity (%lu).\n",
221 1,
222 getDepth(),
223 getCircularBufferForCounts()->getCapacity());
224
225 getCircularBufferForCounts()->setCachedDepth(currentCountsDepth);
226 /*
227 * Advance head, as we have added new elements.
228 * It is important to do the advanceHead and copy together,
229 * or else issues such as advancing head index too early or too late might occur!
230 */
231
232 getCommunicationManager()->memcpy(_tokenBuffer, /* destination */
233 getTokenSize() * getCircularBufferForCounts()->getHeadPosition(), /* dst_offset */
234 _sizeInfoBuffer, /* source */
235 0, /* src_offset */
236 getTokenSize()); /* size */
237 getCommunicationManager()->fence(_sizeInfoBuffer, 1, 0);
238 getCircularBufferForCounts()->advanceHead(1, true);
239
240 getCommunicationManager()->memcpy(_consumerCoordinationBufferForCounts,
241 _HICR_CHANNEL_HEAD_ADVANCE_COUNT_IDX * sizeof(size_t),
243 _HICR_CHANNEL_HEAD_ADVANCE_COUNT_IDX * sizeof(size_t),
244 sizeof(size_t));
246 }
247
256 size_t getDepth() { return getCircularBufferForCounts()->getDepth(); }
257
266 bool isEmpty() { return getDepth() == 0; }
267
268 private:
269
273 std::shared_ptr<GlobalMemorySlot> _payloadBuffer;
277 std::shared_ptr<LocalMemorySlot> _sizeInfoBuffer;
281 size_t _payloadSize;
282
286 const std::shared_ptr<GlobalMemorySlot> _tokenBuffer;
287
291 const std::shared_ptr<GlobalMemorySlot> _consumerCoordinationBufferForCounts;
292
296 const std::shared_ptr<GlobalMemorySlot> _consumerCoordinationBufferForPayloads;
297};
298
299} // namespace HiCR::channel::variableSize::SPSC
Definition communicationManager.hpp:54
__INLINE__ void memcpy(const std::shared_ptr< LocalMemorySlot > &destination, size_t dst_offset, const std::shared_ptr< LocalMemorySlot > &source, size_t src_offset, size_t size)
Definition communicationManager.hpp:267
__INLINE__ void fence(GlobalMemorySlot::tag_t tag)
Definition communicationManager.hpp:377
__INLINE__ CommunicationManager * getCommunicationManager() const
Definition base.hpp:217
__INLINE__ size_t getTokenSize() const noexcept
Definition base.hpp:84
Definition base.hpp:41
__INLINE__ auto getCircularBufferForPayloads() const
Definition base.hpp:99
__INLINE__ auto getCoordinationBufferForPayloads() const
Definition base.hpp:111
__INLINE__ auto getCoordinationBufferForCounts() const
Definition base.hpp:105
__INLINE__ auto getCircularBufferForCounts() const
Definition base.hpp:93
__INLINE__ size_t getPayloadDepth()
Definition producer.hpp:112
size_t getDepth()
Definition producer.hpp:256
__INLINE__ void push(const std::shared_ptr< LocalMemorySlot > &sourceSlot, const size_t n=1)
Definition producer.hpp:143
__INLINE__ void updateDepth()
Definition producer.hpp:88
__INLINE__ size_t getPayloadCapacity()
Definition producer.hpp:118
bool isEmpty()
Definition producer.hpp:266
__INLINE__ size_t getPayloadSize()
Definition producer.hpp:106
__INLINE__ size_t getPayloadHeadPosition() const noexcept
Definition producer.hpp:100
__INLINE__ void advancePayloadTail(const size_t n=1)
Definition producer.hpp:94
Producer(CommunicationManager &communicationManager, std::shared_ptr< LocalMemorySlot > sizeInfoBuffer, std::shared_ptr< GlobalMemorySlot > payloadBuffer, std::shared_ptr< GlobalMemorySlot > tokenBuffer, const std::shared_ptr< LocalMemorySlot > &internalCoordinationBufferForCounts, const std::shared_ptr< LocalMemorySlot > &internalCoordinationBufferForPayloads, std::shared_ptr< GlobalMemorySlot > consumerCoordinationBufferForCounts, std::shared_ptr< GlobalMemorySlot > consumerCoordinationBufferForPayloads, const size_t payloadCapacity, const size_t payloadSize, const size_t capacity)
Definition producer.hpp:62
#define HICR_THROW_RUNTIME(...)
Definition exceptions.hpp:74
extends channel::Base into a base enabling var-size messages