/home/runner/work/HiCR/HiCR/include/hicr/frontends/channel/variableSize/mpsc/locking/producer.hpp Source File

HiCR: /home/runner/work/HiCR/HiCR/include/hicr/frontends/channel/variableSize/mpsc/locking/producer.hpp Source File
HiCR
producer.hpp
Go to the documentation of this file.
1/*
2 * Copyright 2025 Huawei Technologies Co., Ltd.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
24#pragma once
25
27#include <utility>
28
29namespace HiCR::channel::variableSize::MPSC::locking
30{
31
38class Producer final : public variableSize::Base
39{
40 public:
41
62 Producer(CommunicationManager &communicationManager,
63 std::shared_ptr<LocalMemorySlot> sizeInfoBuffer,
64 std::shared_ptr<GlobalMemorySlot> payloadBuffer,
65 std::shared_ptr<GlobalMemorySlot> tokenBuffer,
66 const std::shared_ptr<LocalMemorySlot> &internalCoordinationBufferForCounts,
67 const std::shared_ptr<LocalMemorySlot> &internalCoordinationBufferForPayloads,
68 std::shared_ptr<GlobalMemorySlot> consumerCoordinationBufferForCounts,
69 std::shared_ptr<GlobalMemorySlot> consumerCoordinationBufferForPayloads,
70 const size_t payloadCapacity,
71 const size_t payloadSize,
72 const size_t capacity)
73 : variableSize::Base(communicationManager, internalCoordinationBufferForCounts, internalCoordinationBufferForPayloads, capacity, payloadCapacity),
74 _payloadBuffer(std::move(payloadBuffer)),
75 _sizeInfoBuffer(std::move(sizeInfoBuffer)),
76 _payloadSize(payloadSize),
77 _tokenSizeBuffer(std::move(tokenBuffer)),
78 _consumerCoordinationBufferForCounts(std::move(consumerCoordinationBufferForCounts)),
79 _consumerCoordinationBufferForPayloads(std::move(consumerCoordinationBufferForPayloads))
80 {}
81
82 ~Producer() = default;
83
88 __INLINE__ void updateDepth() // NOTE: we DO know we have the lock!!!!
89 {
91 0, /* dst_offset */
92 _consumerCoordinationBufferForCounts, /* source */
93 0, /* src_offset */
94 2 * sizeof(_HICR_CHANNEL_COORDINATION_BUFFER_ELEMENT_TYPE)); /* size */
95
97 0, /* dst_offset */
98 _consumerCoordinationBufferForPayloads, /* source */
99 0, /* src_offset */
100 2 * sizeof(_HICR_CHANNEL_COORDINATION_BUFFER_ELEMENT_TYPE)); /* size */
101
107 }
108
113 inline size_t getPayloadSize() { return _payloadSize; }
114
119 inline size_t getPayloadDepth() { return getCircularBufferForPayloads()->getDepth(); }
120
139 __INLINE__ bool push(const std::shared_ptr<LocalMemorySlot> &sourceSlot, const size_t n = 1)
140 {
141 if (n != 1) HICR_THROW_RUNTIME("HiCR currently has no implementation for n != 1 with push(sourceSlot, n) for variable size version.");
142
143 // Make sure source slot is big enough to satisfy the operation
144 size_t requiredBufferSize = sourceSlot->getSize();
145
146 // Flag to record whether the operation was successful or not (it simplifies code by releasing locks only once)
147 bool successFlag = false;
148
149 // Locking remote token and coordination buffer slots
150 if (getCommunicationManager()->acquireGlobalLock(_consumerCoordinationBufferForCounts) == false) return successFlag;
151
152 // Updating depth of token (message sizes) and payload buffers from the consumer
153 updateDepth();
154
155 // Check if the amount of data we wish to write (requiredBufferSize)
156 // would fit in the consumer payload buffer in its current state.
157 // If not, reject the operation
158 if (getCircularBufferForPayloads()->getDepth() + requiredBufferSize > getCircularBufferForPayloads()->getCapacity())
159 {
160 getCommunicationManager()->releaseGlobalLock(_consumerCoordinationBufferForCounts);
161 return successFlag;
162 }
163
164 auto *sizeInfoBufferPtr = static_cast<size_t *>(_sizeInfoBuffer->getPointer());
165 sizeInfoBufferPtr[0] = requiredBufferSize;
166
167 // Check if the consumer buffer has n free slots. If not, reject the operation
168 if (getCircularBufferForCounts()->getDepth() + 1 > getCircularBufferForCounts()->getCapacity())
169 {
170 getCommunicationManager()->releaseGlobalLock(_consumerCoordinationBufferForCounts);
171 return successFlag;
172 }
173
178 getCommunicationManager()->memcpy(_tokenSizeBuffer, /* destination */
179 getTokenSize() * getCircularBufferForCounts()->getHeadPosition(), /* dst_offset */
180 _sizeInfoBuffer, /* source */
181 0, /* src_offset */
182 getTokenSize()); /* size */
183 getCommunicationManager()->fence(_sizeInfoBuffer, 1, 0);
184 successFlag = true;
185
194 if (requiredBufferSize + getCircularBufferForPayloads()->getHeadPosition() > getCircularBufferForPayloads()->getCapacity())
195 {
196 size_t first_chunk = getCircularBufferForPayloads()->getCapacity() - getCircularBufferForPayloads()->getHeadPosition();
197 size_t second_chunk = requiredBufferSize - first_chunk;
198 // copy first part to end of buffer
199 getCommunicationManager()->memcpy(_payloadBuffer, /* destination */
200 getCircularBufferForPayloads()->getHeadPosition(), /* dst_offset */
201 sourceSlot, /* source */
202 0, /* src_offset */
203 first_chunk); /* size */
204 // copy second part to beginning of buffer
205 getCommunicationManager()->memcpy(_payloadBuffer, /* destination */
206 0, /* dst_offset */
207 sourceSlot, /* source */
208 first_chunk, /* src_offset */
209 second_chunk); /* size */
210
211 getCommunicationManager()->fence(sourceSlot, 2, 0);
212 }
213 else
214 {
215 getCommunicationManager()->memcpy(_payloadBuffer, getCircularBufferForPayloads()->getHeadPosition(), sourceSlot, 0, requiredBufferSize);
216 getCommunicationManager()->fence(sourceSlot, 1, 0);
217 }
218
219 // Remotely push an element into consumer side, updating consumer head indices
220 getCircularBufferForCounts()->advanceHead(1);
221 getCircularBufferForPayloads()->advanceHead(requiredBufferSize);
222
223 // only update head index at consumer (byte size = one buffer element)
224 getCommunicationManager()->memcpy(_consumerCoordinationBufferForCounts, 0, getCoordinationBufferForCounts(), 0, sizeof(_HICR_CHANNEL_COORDINATION_BUFFER_ELEMENT_TYPE));
225 // only update head index at consumer (byte size = one buffer element)
226 getCommunicationManager()->memcpy(_consumerCoordinationBufferForPayloads, 0, getCoordinationBufferForPayloads(), 0, sizeof(_HICR_CHANNEL_COORDINATION_BUFFER_ELEMENT_TYPE));
227 // backend LPF needs this to complete
230
231 getCommunicationManager()->releaseGlobalLock(_consumerCoordinationBufferForCounts);
232
233 return successFlag;
234 }
235
240 __INLINE__ size_t getDepth()
241 {
242 // Because the current implementation first receives the message size in the token buffer, followed
243 // by the message payload, it is possible for the token buffer to have a larged depth (by 1) than the payload buffer.
244 // Therefore, we need to return the minimum of the two depths
246 }
247
256 __INLINE__ bool isEmpty() { return (getCircularBufferForCounts()->getDepth() == 0) && (getCircularBufferForPayloads()->getDepth() == 0); }
257
258 private:
259
263 std::shared_ptr<GlobalMemorySlot> _payloadBuffer;
264
268 std::shared_ptr<LocalMemorySlot> _sizeInfoBuffer;
269
273 size_t _payloadSize;
274
278 const std::shared_ptr<GlobalMemorySlot> _tokenSizeBuffer;
279
283 const std::shared_ptr<GlobalMemorySlot> _consumerCoordinationBufferForCounts;
284
288 const std::shared_ptr<GlobalMemorySlot> _consumerCoordinationBufferForPayloads;
289};
290
291} // namespace HiCR::channel::variableSize::MPSC::locking
Definition communicationManager.hpp:54
__INLINE__ void memcpy(const std::shared_ptr< LocalMemorySlot > &destination, size_t dst_offset, const std::shared_ptr< LocalMemorySlot > &source, size_t src_offset, size_t size)
Definition communicationManager.hpp:267
__INLINE__ void releaseGlobalLock(const std::shared_ptr< GlobalMemorySlot > &memorySlot)
Definition communicationManager.hpp:462
__INLINE__ void fence(GlobalMemorySlot::tag_t tag)
Definition communicationManager.hpp:377
__INLINE__ CommunicationManager * getCommunicationManager() const
Definition base.hpp:217
__INLINE__ size_t getTokenSize() const noexcept
Definition base.hpp:84
Definition base.hpp:41
__INLINE__ auto getCircularBufferForPayloads() const
Definition base.hpp:99
__INLINE__ auto getCoordinationBufferForPayloads() const
Definition base.hpp:111
__INLINE__ auto getCoordinationBufferForCounts() const
Definition base.hpp:105
__INLINE__ auto getCircularBufferForCounts() const
Definition base.hpp:93
__INLINE__ void updateDepth()
Definition producer.hpp:88
size_t getPayloadDepth()
Definition producer.hpp:119
__INLINE__ bool isEmpty()
Definition producer.hpp:256
__INLINE__ size_t getDepth()
Definition producer.hpp:240
Producer(CommunicationManager &communicationManager, std::shared_ptr< LocalMemorySlot > sizeInfoBuffer, std::shared_ptr< GlobalMemorySlot > payloadBuffer, std::shared_ptr< GlobalMemorySlot > tokenBuffer, const std::shared_ptr< LocalMemorySlot > &internalCoordinationBufferForCounts, const std::shared_ptr< LocalMemorySlot > &internalCoordinationBufferForPayloads, std::shared_ptr< GlobalMemorySlot > consumerCoordinationBufferForCounts, std::shared_ptr< GlobalMemorySlot > consumerCoordinationBufferForPayloads, const size_t payloadCapacity, const size_t payloadSize, const size_t capacity)
Definition producer.hpp:62
__INLINE__ bool push(const std::shared_ptr< LocalMemorySlot > &sourceSlot, const size_t n=1)
Definition producer.hpp:139
size_t getPayloadSize()
Definition producer.hpp:113
#define HICR_THROW_RUNTIME(...)
Definition exceptions.hpp:74
extends channel::Base into a base enabling var-size messages