/home/runner/work/HiCR/HiCR/include/hicr/frontends/channel/variableSize/mpsc/locking/producer.hpp Source File

HiCR: /home/runner/work/HiCR/HiCR/include/hicr/frontends/channel/variableSize/mpsc/locking/producer.hpp Source File
HiCR
producer.hpp
Go to the documentation of this file.
1/*
2 * Copyright 2025 Huawei Technologies Co., Ltd.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
24#pragma once
25
27#include <utility>
28
29namespace HiCR::channel::variableSize::MPSC::locking
30{
31
38class Producer final : public variableSize::Base
39{
40 public:
41
61 Producer(CommunicationManager &coordinationCommunicationManager,
62 CommunicationManager &payloadCommunicationManager,
63 std::shared_ptr<LocalMemorySlot> sizeInfoBuffer,
64 std::shared_ptr<GlobalMemorySlot> payloadBuffer,
65 std::shared_ptr<GlobalMemorySlot> tokenBuffer,
66 const std::shared_ptr<LocalMemorySlot> &internalCoordinationBufferForCounts,
67 const std::shared_ptr<LocalMemorySlot> &internalCoordinationBufferForPayloads,
68 std::shared_ptr<GlobalMemorySlot> consumerCoordinationBufferForCounts,
69 std::shared_ptr<GlobalMemorySlot> consumerCoordinationBufferForPayloads,
70 const size_t payloadCapacity,
71 const size_t payloadSize,
72 const size_t capacity)
73 : variableSize::Base(coordinationCommunicationManager,
74 payloadCommunicationManager,
75 internalCoordinationBufferForCounts,
76 internalCoordinationBufferForPayloads,
77 capacity,
78 payloadCapacity),
79 _payloadBuffer(std::move(payloadBuffer)),
80 _sizeInfoBuffer(std::move(sizeInfoBuffer)),
81 _payloadSize(payloadSize),
82 _tokenSizeBuffer(std::move(tokenBuffer)),
83 _consumerCoordinationBufferForCounts(std::move(consumerCoordinationBufferForCounts)),
84 _consumerCoordinationBufferForPayloads(std::move(consumerCoordinationBufferForPayloads))
85 {}
86
87 ~Producer() = default;
88
93 __INLINE__ void updateDepth() // NOTE: we DO know we have the lock!!!!
94 {
95 auto coordinationCommunicationManager = getCoordinationCommunicationManager();
96 coordinationCommunicationManager->memcpy(getCoordinationBufferForCounts(), /* destination */
97 0, /* dst_offset */
98 _consumerCoordinationBufferForCounts, /* source */
99 0, /* src_offset */
100 2 * sizeof(_HICR_CHANNEL_COORDINATION_BUFFER_ELEMENT_TYPE)); /* size */
101
102 coordinationCommunicationManager->memcpy(getCoordinationBufferForPayloads(), /* destination */
103 0, /* dst_offset */
104 _consumerCoordinationBufferForPayloads, /* source */
105 0, /* src_offset */
106 2 * sizeof(_HICR_CHANNEL_COORDINATION_BUFFER_ELEMENT_TYPE)); /* size */
107
108 coordinationCommunicationManager->fence(getCoordinationBufferForCounts(), 0, 1);
109 coordinationCommunicationManager->fence(getCoordinationBufferForPayloads(), 0, 1);
113 }
114
119 inline size_t getPayloadSize() { return _payloadSize; }
120
125 inline size_t getPayloadDepth() { return getCircularBufferForPayloads()->getDepth(); }
126
145 __INLINE__ bool push(const std::shared_ptr<LocalMemorySlot> &sourceSlot, const size_t n = 1)
146 {
147 if (n != 1) HICR_THROW_RUNTIME("HiCR currently has no implementation for n != 1 with push(sourceSlot, n) for variable size version.");
148
149 // Make sure source slot is big enough to satisfy the operation
150 size_t requiredBufferSize = sourceSlot->getSize();
151
152 // Flag to record whether the operation was successful or not (it simplifies code by releasing locks only once)
153 bool successFlag = false;
154
155 auto coordinationCommunicationManager = getCoordinationCommunicationManager();
156
157 // Locking remote token and coordination buffer slots
158 if (coordinationCommunicationManager->acquireGlobalLock(_consumerCoordinationBufferForCounts) == false) return successFlag;
159
160 // Updating depth of token (message sizes) and payload buffers from the consumer
161 updateDepth();
162
163 // Check if the amount of data we wish to write (requiredBufferSize)
164 // would fit in the consumer payload buffer in its current state.
165 // If not, reject the operation
166 if (getCircularBufferForPayloads()->getDepth() + requiredBufferSize > getCircularBufferForPayloads()->getCapacity())
167 {
168 coordinationCommunicationManager->releaseGlobalLock(_consumerCoordinationBufferForCounts);
169 return successFlag;
170 }
171
172 auto *sizeInfoBufferPtr = static_cast<size_t *>(_sizeInfoBuffer->getPointer());
173 sizeInfoBufferPtr[0] = requiredBufferSize;
174
175 // Check if the consumer buffer has n free slots. If not, reject the operation
176 if (getCircularBufferForCounts()->getDepth() + 1 > getCircularBufferForCounts()->getCapacity())
177 {
178 coordinationCommunicationManager->releaseGlobalLock(_consumerCoordinationBufferForCounts);
179 return successFlag;
180 }
181
186 coordinationCommunicationManager->memcpy(_tokenSizeBuffer, /* destination */
187 getTokenSize() * getCircularBufferForCounts()->getHeadPosition(), /* dst_offset */
188 _sizeInfoBuffer, /* source */
189 0, /* src_offset */
190 getTokenSize()); /* size */
191 coordinationCommunicationManager->fence(_sizeInfoBuffer, 1, 0);
192 successFlag = true;
193
194 auto payloadCommunicationManager = getPayloadCommunicationManager();
195
204 if (requiredBufferSize + getCircularBufferForPayloads()->getHeadPosition() > getCircularBufferForPayloads()->getCapacity())
205 {
206 size_t first_chunk = getCircularBufferForPayloads()->getCapacity() - getCircularBufferForPayloads()->getHeadPosition();
207 size_t second_chunk = requiredBufferSize - first_chunk;
208 // copy first part to end of buffer
209 payloadCommunicationManager->memcpy(_payloadBuffer, /* destination */
210 getCircularBufferForPayloads()->getHeadPosition(), /* dst_offset */
211 sourceSlot, /* source */
212 0, /* src_offset */
213 first_chunk); /* size */
214 // copy second part to beginning of buffer
215 payloadCommunicationManager->memcpy(_payloadBuffer, /* destination */
216 0, /* dst_offset */
217 sourceSlot, /* source */
218 first_chunk, /* src_offset */
219 second_chunk); /* size */
220
221 payloadCommunicationManager->fence(sourceSlot, 2, 0);
222 }
223 else
224 {
225 payloadCommunicationManager->memcpy(_payloadBuffer, getCircularBufferForPayloads()->getHeadPosition(), sourceSlot, 0, requiredBufferSize);
226 payloadCommunicationManager->fence(sourceSlot, 1, 0);
227 }
228
229 // Remotely push an element into consumer side, updating consumer head indices
230 getCircularBufferForCounts()->advanceHead(1);
231 getCircularBufferForPayloads()->advanceHead(requiredBufferSize);
232
233 // only update head index at consumer (byte size = one buffer element)
234 coordinationCommunicationManager->memcpy(_consumerCoordinationBufferForCounts, 0, getCoordinationBufferForCounts(), 0, sizeof(_HICR_CHANNEL_COORDINATION_BUFFER_ELEMENT_TYPE));
235 // only update head index at consumer (byte size = one buffer element)
236 coordinationCommunicationManager->memcpy(
237 _consumerCoordinationBufferForPayloads, 0, getCoordinationBufferForPayloads(), 0, sizeof(_HICR_CHANNEL_COORDINATION_BUFFER_ELEMENT_TYPE));
238 // backend LPF needs this to complete
239 coordinationCommunicationManager->fence(getCoordinationBufferForCounts(), 1, 0);
240 coordinationCommunicationManager->fence(getCoordinationBufferForPayloads(), 1, 0);
241
242 coordinationCommunicationManager->releaseGlobalLock(_consumerCoordinationBufferForCounts);
243
244 return successFlag;
245 }
246
251 __INLINE__ size_t getDepth()
252 {
253 // Because the current implementation first receives the message size in the token buffer, followed
254 // by the message payload, it is possible for the token buffer to have a larged depth (by 1) than the payload buffer.
255 // Therefore, we need to return the minimum of the two depths
257 }
258
267 __INLINE__ bool isEmpty() { return (getCircularBufferForCounts()->getDepth() == 0) && (getCircularBufferForPayloads()->getDepth() == 0); }
268
269 private:
270
274 std::shared_ptr<GlobalMemorySlot> _payloadBuffer;
275
279 std::shared_ptr<LocalMemorySlot> _sizeInfoBuffer;
280
284 size_t _payloadSize;
285
289 const std::shared_ptr<GlobalMemorySlot> _tokenSizeBuffer;
290
294 const std::shared_ptr<GlobalMemorySlot> _consumerCoordinationBufferForCounts;
295
299 const std::shared_ptr<GlobalMemorySlot> _consumerCoordinationBufferForPayloads;
300};
301
302} // namespace HiCR::channel::variableSize::MPSC::locking
Definition communicationManager.hpp:54
__INLINE__ size_t getTokenSize() const noexcept
Definition base.hpp:84
__INLINE__ CommunicationManager * getPayloadCommunicationManager() const
Definition base.hpp:223
__INLINE__ CommunicationManager * getCoordinationCommunicationManager() const
Definition base.hpp:229
Definition base.hpp:41
__INLINE__ auto getCircularBufferForPayloads() const
Definition base.hpp:101
__INLINE__ auto getCoordinationBufferForPayloads() const
Definition base.hpp:113
__INLINE__ auto getCoordinationBufferForCounts() const
Definition base.hpp:107
__INLINE__ auto getCircularBufferForCounts() const
Definition base.hpp:95
__INLINE__ void updateDepth()
Definition producer.hpp:93
size_t getPayloadDepth()
Definition producer.hpp:125
__INLINE__ bool isEmpty()
Definition producer.hpp:267
Producer(CommunicationManager &coordinationCommunicationManager, CommunicationManager &payloadCommunicationManager, std::shared_ptr< LocalMemorySlot > sizeInfoBuffer, std::shared_ptr< GlobalMemorySlot > payloadBuffer, std::shared_ptr< GlobalMemorySlot > tokenBuffer, const std::shared_ptr< LocalMemorySlot > &internalCoordinationBufferForCounts, const std::shared_ptr< LocalMemorySlot > &internalCoordinationBufferForPayloads, std::shared_ptr< GlobalMemorySlot > consumerCoordinationBufferForCounts, std::shared_ptr< GlobalMemorySlot > consumerCoordinationBufferForPayloads, const size_t payloadCapacity, const size_t payloadSize, const size_t capacity)
Definition producer.hpp:61
__INLINE__ size_t getDepth()
Definition producer.hpp:251
__INLINE__ bool push(const std::shared_ptr< LocalMemorySlot > &sourceSlot, const size_t n=1)
Definition producer.hpp:145
size_t getPayloadSize()
Definition producer.hpp:119
#define HICR_THROW_RUNTIME(...)
Definition exceptions.hpp:74
extends channel::Base into a base enabling var-size messages