/home/runner/work/HiCR/HiCR/include/hicr/frontends/channel/fixedSize/mpsc/locking/consumer.hpp Source File

HiCR: /home/runner/work/HiCR/HiCR/include/hicr/frontends/channel/fixedSize/mpsc/locking/consumer.hpp Source File
HiCR
consumer.hpp
Go to the documentation of this file.
1/*
2 * Copyright 2025 Huawei Technologies Co., Ltd.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
24#pragma once
25
26#include <hicr/core/definitions.hpp>
29#include <utility>
30
31namespace HiCR::channel::fixedSize::MPSC::locking
32{
33
41{
42 private:
43
48 const std::shared_ptr<HiCR::GlobalMemorySlot> _tokenBuffer;
49
50 /*
51 * Global Memory slot pointing to the consumer's coordination buffer for acquiring a lock and updating
52 */
53 const std::shared_ptr<HiCR::GlobalMemorySlot> _consumerCoordinationBuffer;
54
55 public:
56
72 Consumer(CommunicationManager &coordinationCommunicationManager,
73 CommunicationManager &payloadCommunicationManager,
74 std::shared_ptr<GlobalMemorySlot> tokenBuffer,
75 const std::shared_ptr<LocalMemorySlot> &internalCoordinationBuffer,
76 std::shared_ptr<GlobalMemorySlot> consumerCoordinationBuffer,
77 const size_t tokenSize,
78 const size_t capacity)
79 : channel::fixedSize::Base(coordinationCommunicationManager, payloadCommunicationManager, internalCoordinationBuffer, tokenSize, capacity),
80 _tokenBuffer(std::move(tokenBuffer)),
81 _consumerCoordinationBuffer(std::move(consumerCoordinationBuffer))
82 {}
83 ~Consumer() = default;
84
108 __INLINE__ ssize_t peek(const size_t pos = 0)
109 {
110 // Check if the requested position exceeds the capacity of the channel
111 if (pos >= getCircularBuffer()->getCapacity())
112 HICR_THROW_LOGIC("Attempting to peek for a token with position %lu (token number %lu when starting from zero), which is beyond than the channel capacity (%lu)",
113 pos,
114 pos + 1,
115 getCircularBuffer()->getCapacity());
116
117 // Value to return, initially set as -1 as default (not able to find the requested value)
118 ssize_t bufferPos = -1;
119
120 auto coordinationCommunicationManager = getCoordinationCommunicationManager();
121
122 // Obtaining coordination buffer slot lock
123 if (coordinationCommunicationManager->acquireGlobalLock(_consumerCoordinationBuffer) == false) return bufferPos;
124
125 coordinationCommunicationManager->flushReceived();
127 // Calculating current channel depth
128 const auto curDepth = getDepth();
129
130 // Calculating buffer position, if there are enough tokens in the buffer to satisfy the request
131 if (pos < curDepth) bufferPos = (ssize_t)((getCircularBuffer()->getTailPosition() + pos) % getCircularBuffer()->getCapacity());
132
133 // Releasing coordination buffer slot lock
134 coordinationCommunicationManager->releaseGlobalLock(_consumerCoordinationBuffer);
135
136 // Succeeded in pushing the token(s)
137 return bufferPos;
138 }
139
154 __INLINE__ bool pop(const size_t n = 1)
155 {
156 if (n > getCircularBuffer()->getCapacity())
157 HICR_THROW_LOGIC("Attempting to pop %lu tokens, which is larger than the channel capacity (%lu)", n, getCircularBuffer()->getCapacity());
158
159 // Flag to indicate whether the operaton was successful
160 bool successFlag = false;
161
162 auto coordinationCommunicationManager = getCoordinationCommunicationManager();
163
164 // Obtaining coordination buffer slot lock
165 if (coordinationCommunicationManager->acquireGlobalLock(_consumerCoordinationBuffer) == false) return successFlag;
166
167 // If the exchange buffer does not have n tokens pushed, reject operation, otherwise succeed
168 if (n <= getDepth())
169 {
170 // Advancing tail (removes elements from the circular buffer)
171 getCircularBuffer()->advanceTail(n);
172
173 // Setting success flag
174 successFlag = true;
175 }
176
177 // Releasing coordination buffer slot lock
178 coordinationCommunicationManager->releaseGlobalLock(_consumerCoordinationBuffer);
179
180 // Operation was successful
181 return successFlag;
182 }
183
190 [[nodiscard]] __INLINE__ std::shared_ptr<GlobalMemorySlot> getTokenBuffer() const { return _tokenBuffer; }
191};
192
193} // namespace HiCR::channel::fixedSize::MPSC::locking
Definition communicationManager.hpp:54
virtual __INLINE__ void flushReceived()
Definition communicationManager.hpp:469
__INLINE__ size_t getDepth() const noexcept
Definition base.hpp:141
__INLINE__ CommunicationManager * getPayloadCommunicationManager() const
Definition base.hpp:223
__INLINE__ CommunicationManager * getCoordinationCommunicationManager() const
Definition base.hpp:229
__INLINE__ auto getCircularBuffer() const noexcept
Definition base.hpp:167
Definition base.hpp:42
__INLINE__ ssize_t peek(const size_t pos=0)
Definition consumer.hpp:108
__INLINE__ bool pop(const size_t n=1)
Definition consumer.hpp:154
Consumer(CommunicationManager &coordinationCommunicationManager, CommunicationManager &payloadCommunicationManager, std::shared_ptr< GlobalMemorySlot > tokenBuffer, const std::shared_ptr< LocalMemorySlot > &internalCoordinationBuffer, std::shared_ptr< GlobalMemorySlot > consumerCoordinationBuffer, const size_t tokenSize, const size_t capacity)
Definition consumer.hpp:72
__INLINE__ std::shared_ptr< GlobalMemorySlot > getTokenBuffer() const
Definition consumer.hpp:190
Provides a failure model and corresponding exception classes.
#define HICR_THROW_LOGIC(...)
Definition exceptions.hpp:67
Provides base functionality for a fixed-size MPSC channel over HiCR.