/home/runner/work/HiCR/HiCR/include/hicr/frontends/channel/fixedSize/mpsc/locking/producer.hpp Source File

HiCR: /home/runner/work/HiCR/HiCR/include/hicr/frontends/channel/fixedSize/mpsc/locking/producer.hpp Source File
HiCR
producer.hpp
Go to the documentation of this file.
1/*
2 * Copyright 2025 Huawei Technologies Co., Ltd.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
24#pragma once
25
26#include <hicr/core/definitions.hpp>
29#include <utility>
30
31namespace HiCR::channel::fixedSize::MPSC::locking
32{
33
40class Producer final : public fixedSize::Base
41{
42 private:
43
47 const std::shared_ptr<GlobalMemorySlot> _tokenBuffer;
48
49 /*
50 * Global Memory slot pointing to the consumer's coordination buffer for acquiring a lock and updating
51 */
52 const std::shared_ptr<HiCR::GlobalMemorySlot> _consumerCoordinationBuffer;
53
54 public:
55
70 Producer(CommunicationManager &coordinationCommunicationManager,
71 CommunicationManager &payloadCommunicationManager,
72 std::shared_ptr<GlobalMemorySlot> tokenBuffer,
73 const std::shared_ptr<LocalMemorySlot> &internalCoordinationBuffer,
74 std::shared_ptr<GlobalMemorySlot> consumerCoordinationBuffer,
75 const size_t tokenSize,
76 const size_t capacity)
77 : fixedSize::Base(coordinationCommunicationManager, payloadCommunicationManager, internalCoordinationBuffer, tokenSize, capacity),
78 _tokenBuffer(std::move(tokenBuffer)),
79 _consumerCoordinationBuffer(std::move(consumerCoordinationBuffer))
80 {}
81 ~Producer() = default;
82
101 __INLINE__ bool push(const std::shared_ptr<LocalMemorySlot> &sourceSlot, const size_t n = 1)
102 {
103 // Make sure source slot is big enough to satisfy the operation
104 auto requiredBufferSize = getTokenSize() * n;
105 auto providedBufferSize = sourceSlot->getSize();
106 if (providedBufferSize < requiredBufferSize)
107 HICR_THROW_LOGIC("Attempting to push with a source buffer size (%lu) smaller than the required size (Token Size (%lu) x n (%lu) = %lu).\n",
108 providedBufferSize,
109 getTokenSize(),
110 n,
111 requiredBufferSize);
112
113 // Flag to record whether the operation was successful or not (it simplifies code by releasing locks only once)
114 bool successFlag = false;
115
116 auto coordinationCommunicationManager = getCoordinationCommunicationManager();
117
118 // Locking remote token and coordination buffer slots
119 if (coordinationCommunicationManager->acquireGlobalLock(_consumerCoordinationBuffer) == false) return successFlag;
120
121 // Updating local coordination buffer
122 coordinationCommunicationManager->memcpy(getCoordinationBuffer(), 0, _consumerCoordinationBuffer, 0, getCoordinationBufferSize());
123
124 // Adding fence operation to ensure buffers are ready for re-use
125 coordinationCommunicationManager->fence(getCoordinationBuffer(), 0, 1);
126
127 // Calculating current channel depth
128 const auto depth = getDepth();
129
130 auto payloadCommunicationManager = getPayloadCommunicationManager();
131
132 // If the exchange buffer does not have n free slots, reject the operation
133 if (depth + n <= getCircularBuffer()->getCapacity())
134 {
135 // Copying with source increasing offset per token
136 for (size_t i = 0; i < n; i++)
137 {
138 payloadCommunicationManager->memcpy(_tokenBuffer, /* destination */
139 getTokenSize() * getCircularBuffer()->getHeadPosition(), /* dst_offset */
140 sourceSlot, /* source */
141 i * getTokenSize(), /* src_offset */
142 getTokenSize()); /* size*/
143 // Advance head here, since the memcpy relies on the up-to-date head position
144 getCircularBuffer()->advanceHead(1);
145 }
146
147 payloadCommunicationManager->fence(sourceSlot, n, 0);
148
149 // Updating global coordination buffer
150 coordinationCommunicationManager->memcpy(_consumerCoordinationBuffer, 0, getCoordinationBuffer(), 0, getCoordinationBufferSize());
151 // Adding fence operation to ensure buffers are ready for re-use
152 coordinationCommunicationManager->fence(getCoordinationBuffer(), 1, 0);
153
154 // Mark operation as successful
155 successFlag = true;
156 }
157
158 // Releasing remote token and coordination buffer slots
159 coordinationCommunicationManager->releaseGlobalLock(_consumerCoordinationBuffer);
160
161 // Succeeded
162 return successFlag;
163 }
164};
165
166} // namespace HiCR::channel::fixedSize::MPSC::locking
Definition communicationManager.hpp:54
__INLINE__ auto getCoordinationBuffer() const
Definition base.hpp:235
__INLINE__ size_t getDepth() const noexcept
Definition base.hpp:141
__INLINE__ size_t getTokenSize() const noexcept
Definition base.hpp:84
__INLINE__ CommunicationManager * getPayloadCommunicationManager() const
Definition base.hpp:223
__INLINE__ CommunicationManager * getCoordinationCommunicationManager() const
Definition base.hpp:229
static __INLINE__ size_t getCoordinationBufferSize() noexcept
Definition base.hpp:92
__INLINE__ auto getCircularBuffer() const noexcept
Definition base.hpp:167
Definition base.hpp:42
Producer(CommunicationManager &coordinationCommunicationManager, CommunicationManager &payloadCommunicationManager, std::shared_ptr< GlobalMemorySlot > tokenBuffer, const std::shared_ptr< LocalMemorySlot > &internalCoordinationBuffer, std::shared_ptr< GlobalMemorySlot > consumerCoordinationBuffer, const size_t tokenSize, const size_t capacity)
Definition producer.hpp:70
__INLINE__ bool push(const std::shared_ptr< LocalMemorySlot > &sourceSlot, const size_t n=1)
Definition producer.hpp:101
Provides a failure model and corresponding exception classes.
#define HICR_THROW_LOGIC(...)
Definition exceptions.hpp:67
Provides base functionality for a fixed-size MPSC channel over HiCR.