/home/runner/work/HiCR/HiCR/include/hicr/frontends/channel/variableSize/spsc/producer.hpp Source File

HiCR: /home/runner/work/HiCR/HiCR/include/hicr/frontends/channel/variableSize/spsc/producer.hpp Source File
HiCR
producer.hpp
Go to the documentation of this file.
1/*
2 * Copyright 2025 Huawei Technologies Co., Ltd.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
24#pragma once
25
27#include <utility>
28
29namespace HiCR::channel::variableSize::SPSC
30{
31
39{
40 public:
41
64 Producer(CommunicationManager &coordinationCommunicationManager,
65 CommunicationManager &payloadCommunicationManager,
66 std::shared_ptr<LocalMemorySlot> sizeInfoBuffer,
67 std::shared_ptr<GlobalMemorySlot> payloadBuffer,
68 std::shared_ptr<GlobalMemorySlot> tokenBuffer,
69 const std::shared_ptr<LocalMemorySlot> &internalCoordinationBufferForCounts,
70 const std::shared_ptr<LocalMemorySlot> &internalCoordinationBufferForPayloads,
71 std::shared_ptr<GlobalMemorySlot> consumerCoordinationBufferForCounts,
72 std::shared_ptr<GlobalMemorySlot> consumerCoordinationBufferForPayloads,
73 const size_t payloadCapacity,
74 const size_t payloadSize,
75 const size_t capacity)
76 : variableSize::Base(coordinationCommunicationManager,
77 payloadCommunicationManager,
78 internalCoordinationBufferForCounts,
79 internalCoordinationBufferForPayloads,
80 capacity,
81 payloadCapacity),
82 _payloadBuffer(std::move(payloadBuffer)),
83 _sizeInfoBuffer(std::move(sizeInfoBuffer)),
84 _payloadSize(payloadSize),
85 _tokenBuffer(std::move(tokenBuffer)),
86 _consumerCoordinationBufferForCounts(std::move(consumerCoordinationBufferForCounts)),
87 _consumerCoordinationBufferForPayloads(std::move(consumerCoordinationBufferForPayloads))
88 {}
89
90 ~Producer() = default;
91
115 __INLINE__ void push(const std::shared_ptr<LocalMemorySlot> &sourceSlot, const size_t n = 1)
116 {
117 if (n != 1) HICR_THROW_RUNTIME("HiCR currently has no implementation for n != 1 with push(sourceSlot, n) for variable size version.");
118
119 // Updating depth of token (message sizes) and payload buffers
120 updateDepth();
121
152 // Get bytes required to push the token
153 size_t requiredPayloadBufferSize = sourceSlot->getSize();
154
155 // Throw exception if the token can not be pushed
156 if (isFull(requiredPayloadBufferSize) == true)
157 {
158 HICR_THROW_RUNTIME("Attempting to push a token while the channel is full.\nChannel depth: %lu capacity: %lu\nPayload depth: %lu capacity: %lu",
160 getCircularBufferForCounts()->getCapacity(),
162 getCircularBufferForPayloads()->getCapacity());
163 }
164
165 // Get communication managers
166 auto payloadCommunicationManager = getPayloadCommunicationManager();
167 auto coordinationCommunicationManager = getCoordinationCommunicationManager();
168
169 // Payload copy. Just push to the channel, we know there is enough space
170 payloadCommunicationManager->memcpy(_payloadBuffer, getPayloadHeadPosition(), sourceSlot, 0, requiredPayloadBufferSize);
171 payloadCommunicationManager->fence(sourceSlot, 1, 0);
172
173 // Advance the head in the payload buffer
174 getCircularBufferForPayloads()->advanceHead(requiredPayloadBufferSize);
175
176 // update the consumer coordination buffers (consumer does not update its own coordination head positions)
177 coordinationCommunicationManager->memcpy(_consumerCoordinationBufferForPayloads,
178 _HICR_CHANNEL_HEAD_ADVANCE_COUNT_IDX * sizeof(size_t),
180 _HICR_CHANNEL_HEAD_ADVANCE_COUNT_IDX * sizeof(size_t),
181 sizeof(size_t));
182 coordinationCommunicationManager->fence(getCoordinationBufferForPayloads(), 1, 0);
183
184 /*
185 * Part 2: Copy the message size
186 */
187 auto *sizeInfoBufferPtr = static_cast<size_t *>(_sizeInfoBuffer->getPointer());
188 sizeInfoBufferPtr[0] = requiredPayloadBufferSize;
189
190 coordinationCommunicationManager->memcpy(_tokenBuffer, /* destination */
191 getTokenSize() * getCircularBufferForCounts()->getHeadPosition(), /* dst_offset */
192 _sizeInfoBuffer, /* source */
193 0, /* src_offset */
194 getTokenSize()); /* size */
195 coordinationCommunicationManager->fence(_sizeInfoBuffer, 1, 0);
196 getCircularBufferForCounts()->advanceHead(1);
197
198 coordinationCommunicationManager->memcpy(_consumerCoordinationBufferForCounts,
199 _HICR_CHANNEL_HEAD_ADVANCE_COUNT_IDX * sizeof(size_t),
201 _HICR_CHANNEL_HEAD_ADVANCE_COUNT_IDX * sizeof(size_t),
202 sizeof(size_t));
203 coordinationCommunicationManager->fence(getCoordinationBufferForCounts(), 1, 0);
204 }
205
210 __INLINE__ void updateDepth() {}
211
216 [[nodiscard]] __INLINE__ size_t getPayloadHeadPosition() const noexcept { return getCircularBufferForPayloads()->getHeadPosition(); }
217
222 __INLINE__ size_t getPayloadSize() { return _payloadSize; }
223
231 __INLINE__ size_t getPayloadDepth() { return getCircularBufferForPayloads()->getDepth(); }
232
237 __INLINE__ size_t getPayloadCapacity() { return getCircularBufferForPayloads()->getCapacity(); }
238
250 size_t getCoordinationDepth() { return getCircularBufferForCounts()->getDepth(); }
251
261 bool isEmpty() { return getCoordinationDepth() == 0; }
262
274 bool isFull(const size_t requiredBufferSize)
275 {
276 // Check if we can push one more token
277 auto coordinationCircularBuffer = getCircularBufferForCounts();
278 if (coordinationCircularBuffer->getDepth() == coordinationCircularBuffer->getCapacity()) { return true; }
279
280 // Check if there is enough space in the payload buffer. If
281 auto payloadCircularBuffer = getCircularBufferForPayloads();
282 if (payloadCircularBuffer->getDepth() + requiredBufferSize > payloadCircularBuffer->getCapacity()) { return true; }
283
284 return false;
285 }
286
287 private:
288
292 std::shared_ptr<GlobalMemorySlot> _payloadBuffer;
296 std::shared_ptr<LocalMemorySlot> _sizeInfoBuffer;
300 size_t _payloadSize;
301
305 const std::shared_ptr<GlobalMemorySlot> _tokenBuffer;
306
310 const std::shared_ptr<GlobalMemorySlot> _consumerCoordinationBufferForCounts;
311
315 const std::shared_ptr<GlobalMemorySlot> _consumerCoordinationBufferForPayloads;
316};
317
318} // namespace HiCR::channel::variableSize::SPSC
Definition communicationManager.hpp:54
__INLINE__ size_t getDepth() const noexcept
Definition base.hpp:141
__INLINE__ size_t getTokenSize() const noexcept
Definition base.hpp:84
__INLINE__ CommunicationManager * getPayloadCommunicationManager() const
Definition base.hpp:223
__INLINE__ bool isFull() const noexcept
Definition base.hpp:151
__INLINE__ CommunicationManager * getCoordinationCommunicationManager() const
Definition base.hpp:229
Definition base.hpp:41
__INLINE__ auto getCircularBufferForPayloads() const
Definition base.hpp:101
__INLINE__ auto getCoordinationBufferForPayloads() const
Definition base.hpp:113
__INLINE__ auto getCoordinationBufferForCounts() const
Definition base.hpp:107
__INLINE__ auto getCircularBufferForCounts() const
Definition base.hpp:95
__INLINE__ size_t getPayloadDepth()
Definition producer.hpp:231
bool isFull(const size_t requiredBufferSize)
Definition producer.hpp:274
Producer(CommunicationManager &coordinationCommunicationManager, CommunicationManager &payloadCommunicationManager, std::shared_ptr< LocalMemorySlot > sizeInfoBuffer, std::shared_ptr< GlobalMemorySlot > payloadBuffer, std::shared_ptr< GlobalMemorySlot > tokenBuffer, const std::shared_ptr< LocalMemorySlot > &internalCoordinationBufferForCounts, const std::shared_ptr< LocalMemorySlot > &internalCoordinationBufferForPayloads, std::shared_ptr< GlobalMemorySlot > consumerCoordinationBufferForCounts, std::shared_ptr< GlobalMemorySlot > consumerCoordinationBufferForPayloads, const size_t payloadCapacity, const size_t payloadSize, const size_t capacity)
Definition producer.hpp:64
__INLINE__ void push(const std::shared_ptr< LocalMemorySlot > &sourceSlot, const size_t n=1)
Definition producer.hpp:115
__INLINE__ void updateDepth()
Definition producer.hpp:210
__INLINE__ size_t getPayloadCapacity()
Definition producer.hpp:237
size_t getCoordinationDepth()
Definition producer.hpp:250
bool isEmpty()
Definition producer.hpp:261
__INLINE__ size_t getPayloadSize()
Definition producer.hpp:222
__INLINE__ size_t getPayloadHeadPosition() const noexcept
Definition producer.hpp:216
#define HICR_THROW_RUNTIME(...)
Definition exceptions.hpp:74
extends channel::Base into a base enabling var-size messages