/home/runner/work/HiCR/HiCR/include/hicr/frontends/channel/fixedSize/spsc/consumer.hpp Source File

HiCR: /home/runner/work/HiCR/HiCR/include/hicr/frontends/channel/fixedSize/spsc/consumer.hpp Source File
HiCR
consumer.hpp
Go to the documentation of this file.
1/*
2 * Copyright 2025 Huawei Technologies Co., Ltd.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
24#pragma once
25
26#include <hicr/core/definitions.hpp>
29#include <utility>
30
31namespace HiCR::channel::fixedSize::SPSC
32{
33
41{
42 private:
43
48 const std::shared_ptr<HiCR::GlobalMemorySlot> _tokenBuffer;
49
54 const std::shared_ptr<HiCR::GlobalMemorySlot> _producerCoordinationBuffer;
55
56 public:
57
72 Consumer(CommunicationManager &communicationManager,
73 const std::shared_ptr<GlobalMemorySlot> &tokenBuffer,
74 const std::shared_ptr<LocalMemorySlot> &internalCoordinationBuffer,
75 std::shared_ptr<GlobalMemorySlot> producerCoordinationBuffer,
76 const size_t tokenSize,
77 const size_t capacity)
78 : channel::fixedSize::Base(communicationManager, internalCoordinationBuffer, tokenSize, capacity),
79 _tokenBuffer(tokenBuffer),
80 _producerCoordinationBuffer(std::move(producerCoordinationBuffer))
81
82 {
83 // Checking whether the memory slot is local. This backend only supports local data transfers
84 if (tokenBuffer->getSourceLocalMemorySlot() == nullptr)
85 HICR_THROW_LOGIC("The passed token buffer slot was not created locally (it must be to be used internally by the channel implementation)\n");
86
87 // Checking that the provided token exchange buffer has the right size
88 auto requiredTokenBufferSize = getTokenBufferSize(getTokenSize(), capacity);
89 auto providedTokenBufferSize = tokenBuffer->getSourceLocalMemorySlot()->getSize();
90 if (providedTokenBufferSize < requiredTokenBufferSize)
92 "Attempting to create a channel with a token data buffer size (%lu) smaller than the required size (%lu).\n", providedTokenBufferSize, requiredTokenBufferSize);
93 }
94 ~Consumer() = default;
95
119 __INLINE__ size_t peek(const size_t pos = 0)
120 {
121 // Check if the requested position exceeds the capacity of the channel
122 if (pos >= getCircularBuffer()->getCapacity())
123 HICR_THROW_LOGIC("Attempting to peek for a token with position (%lu), which is beyond than the channel capacity (%lu)", pos, getCircularBuffer()->getCapacity());
124
125 // Make sure receiver queues are occasionally processed
127
128 // Updating channel depth
129 updateDepth();
130
131 // Check if there are enough tokens in the buffer to satisfy the request
132 if (pos >= getCircularBuffer()->getDepth())
133 HICR_THROW_RUNTIME("Attempting to peek position (%lu) but not enough tokens (%lu) are in the buffer", pos, getCircularBuffer()->getDepth());
134
135 // Calculating buffer position
136 const size_t bufferPos = (getCircularBuffer()->getTailPosition() + pos) % getCircularBuffer()->getCapacity();
137
138 // Succeeded in pushing the token(s)
139 return bufferPos;
140 }
141
155 __INLINE__ void pop(const size_t n = 1)
156 {
157 if (n > getCircularBuffer()->getCapacity())
158 HICR_THROW_LOGIC("Attempting to pop (%lu) tokens, which is larger than the channel capacity (%lu)", n, getCircularBuffer()->getCapacity());
159
160 // Updating channel depth
161 updateDepth();
162
163 // If the exchange buffer does not have n tokens pushed, reject operation
164 if (n > getCircularBuffer()->getDepth())
165 HICR_THROW_RUNTIME("Attempting to pop (%lu) tokens, which is more than the number of current tokens in the channel (%lu)", n, getCircularBuffer()->getDepth());
166
167 // Advancing tail (removes elements from the circular buffer)
168 getCircularBuffer()->advanceTail(n);
169
170 const auto coordBuffElemSize = sizeof(_HICR_CHANNEL_COORDINATION_BUFFER_ELEMENT_TYPE);
171 // Notifying producer(s) of buffer liberation
172 getCommunicationManager()->memcpy(_producerCoordinationBuffer,
173 _HICR_CHANNEL_TAIL_ADVANCE_COUNT_IDX * coordBuffElemSize,
175 _HICR_CHANNEL_TAIL_ADVANCE_COUNT_IDX * coordBuffElemSize,
176 coordBuffElemSize);
178 }
179
185 __INLINE__ void updateDepth() {}
186
193 [[nodiscard]] __INLINE__ std::shared_ptr<GlobalMemorySlot> getTokenBuffer() const { return _tokenBuffer; }
194};
195
196} // namespace HiCR::channel::fixedSize::SPSC
Definition communicationManager.hpp:54
virtual __INLINE__ void flushReceived()
Definition communicationManager.hpp:492
__INLINE__ void memcpy(const std::shared_ptr< LocalMemorySlot > &destination, size_t dst_offset, const std::shared_ptr< LocalMemorySlot > &source, size_t src_offset, size_t size)
Definition communicationManager.hpp:267
__INLINE__ void fence(GlobalMemorySlot::tag_t tag)
Definition communicationManager.hpp:377
__INLINE__ CommunicationManager * getCommunicationManager() const
Definition base.hpp:217
__INLINE__ auto getCoordinationBuffer() const
Definition base.hpp:223
__INLINE__ size_t getDepth() const noexcept
Definition base.hpp:141
__INLINE__ size_t getTokenSize() const noexcept
Definition base.hpp:84
static __INLINE__ size_t getTokenBufferSize(const size_t tokenSize, const size_t capacity) noexcept
Definition base.hpp:123
__INLINE__ auto getCircularBuffer() const noexcept
Definition base.hpp:167
Definition base.hpp:42
Consumer(CommunicationManager &communicationManager, const std::shared_ptr< GlobalMemorySlot > &tokenBuffer, const std::shared_ptr< LocalMemorySlot > &internalCoordinationBuffer, std::shared_ptr< GlobalMemorySlot > producerCoordinationBuffer, const size_t tokenSize, const size_t capacity)
Definition consumer.hpp:72
__INLINE__ size_t peek(const size_t pos=0)
Definition consumer.hpp:119
__INLINE__ void pop(const size_t n=1)
Definition consumer.hpp:155
__INLINE__ void updateDepth()
Definition consumer.hpp:185
__INLINE__ std::shared_ptr< GlobalMemorySlot > getTokenBuffer() const
Definition consumer.hpp:193
Provides a failure model and corresponding exception classes.
#define HICR_THROW_RUNTIME(...)
Definition exceptions.hpp:74
#define HICR_THROW_LOGIC(...)
Definition exceptions.hpp:67
Provides base functionality for a fixed-size MPSC channel over HiCR.