/home/runner/work/HiCR/HiCR/include/hicr/frontends/channel/fixedSize/mpsc/locking/producer.hpp Source File

HiCR: /home/runner/work/HiCR/HiCR/include/hicr/frontends/channel/fixedSize/mpsc/locking/producer.hpp Source File
HiCR
producer.hpp
Go to the documentation of this file.
1/*
2 * Copyright 2025 Huawei Technologies Co., Ltd.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
24#pragma once
25
26#include <hicr/core/definitions.hpp>
29#include <utility>
30
31namespace HiCR::channel::fixedSize::MPSC::locking
32{
33
40class Producer final : public fixedSize::Base
41{
42 private:
43
47 const std::shared_ptr<GlobalMemorySlot> _tokenBuffer;
48
49 /*
50 * Global Memory slot pointing to the consumer's coordination buffer for acquiring a lock and updating
51 */
52 const std::shared_ptr<HiCR::GlobalMemorySlot> _consumerCoordinationBuffer;
53
54 public:
55
69 Producer(CommunicationManager &communicationManager,
70 std::shared_ptr<GlobalMemorySlot> tokenBuffer,
71 const std::shared_ptr<LocalMemorySlot> &internalCoordinationBuffer,
72 std::shared_ptr<GlobalMemorySlot> consumerCoordinationBuffer,
73 const size_t tokenSize,
74 const size_t capacity)
75 : fixedSize::Base(communicationManager, internalCoordinationBuffer, tokenSize, capacity),
76 _tokenBuffer(std::move(tokenBuffer)),
77 _consumerCoordinationBuffer(std::move(consumerCoordinationBuffer))
78 {}
79 ~Producer() = default;
80
99 __INLINE__ bool push(const std::shared_ptr<LocalMemorySlot> &sourceSlot, const size_t n = 1)
100 {
101 // Make sure source slot is big enough to satisfy the operation
102 auto requiredBufferSize = getTokenSize() * n;
103 auto providedBufferSize = sourceSlot->getSize();
104 if (providedBufferSize < requiredBufferSize)
105 HICR_THROW_LOGIC("Attempting to push with a source buffer size (%lu) smaller than the required size (Token Size (%lu) x n (%lu) = %lu).\n",
106 providedBufferSize,
107 getTokenSize(),
108 n,
109 requiredBufferSize);
110
111 // Flag to record whether the operation was successful or not (it simplifies code by releasing locks only once)
112 bool successFlag = false;
113
114 // Locking remote token and coordination buffer slots
115 if (getCommunicationManager()->acquireGlobalLock(_consumerCoordinationBuffer) == false) return successFlag;
116
117 // Updating local coordination buffer
118 getCommunicationManager()->memcpy(getCoordinationBuffer(), 0, _consumerCoordinationBuffer, 0, getCoordinationBufferSize());
119
120 // Adding fence operation to ensure buffers are ready for re-use
122
123 // Calculating current channel depth
124 const auto depth = getDepth();
125
126 // If the exchange buffer does not have n free slots, reject the operation
127 if (depth + n <= getCircularBuffer()->getCapacity())
128 {
129 // Copying with source increasing offset per token
130 for (size_t i = 0; i < n; i++)
131 {
132 getCommunicationManager()->memcpy(_tokenBuffer, /* destination */
133 getTokenSize() * getCircularBuffer()->getHeadPosition(), /* dst_offset */
134 sourceSlot, /* source */
135 i * getTokenSize(), /* src_offset */
136 getTokenSize()); /* size*/
137 // Advance head here, since the memcpy relies on the up-to-date head position
138 getCircularBuffer()->advanceHead(1);
139 }
140
141 getCommunicationManager()->fence(sourceSlot, n, 0);
142
143 // Updating global coordination buffer
144 getCommunicationManager()->memcpy(_consumerCoordinationBuffer, 0, getCoordinationBuffer(), 0, getCoordinationBufferSize());
145 // Adding fence operation to ensure buffers are ready for re-use
147
148 // Mark operation as successful
149 successFlag = true;
150 }
151
152 // Releasing remote token and coordination buffer slots
153 getCommunicationManager()->releaseGlobalLock(_consumerCoordinationBuffer);
154
155 // Succeeded
156 return successFlag;
157 }
158};
159
160} // namespace HiCR::channel::fixedSize::MPSC::locking
Definition communicationManager.hpp:54
__INLINE__ void memcpy(const std::shared_ptr< LocalMemorySlot > &destination, size_t dst_offset, const std::shared_ptr< LocalMemorySlot > &source, size_t src_offset, size_t size)
Definition communicationManager.hpp:267
__INLINE__ void releaseGlobalLock(const std::shared_ptr< GlobalMemorySlot > &memorySlot)
Definition communicationManager.hpp:462
__INLINE__ void fence(GlobalMemorySlot::tag_t tag)
Definition communicationManager.hpp:377
__INLINE__ CommunicationManager * getCommunicationManager() const
Definition base.hpp:217
__INLINE__ auto getCoordinationBuffer() const
Definition base.hpp:223
__INLINE__ size_t getDepth() const noexcept
Definition base.hpp:141
__INLINE__ size_t getTokenSize() const noexcept
Definition base.hpp:84
static __INLINE__ size_t getCoordinationBufferSize() noexcept
Definition base.hpp:92
__INLINE__ auto getCircularBuffer() const noexcept
Definition base.hpp:167
Definition base.hpp:42
__INLINE__ bool push(const std::shared_ptr< LocalMemorySlot > &sourceSlot, const size_t n=1)
Definition producer.hpp:99
Producer(CommunicationManager &communicationManager, std::shared_ptr< GlobalMemorySlot > tokenBuffer, const std::shared_ptr< LocalMemorySlot > &internalCoordinationBuffer, std::shared_ptr< GlobalMemorySlot > consumerCoordinationBuffer, const size_t tokenSize, const size_t capacity)
Definition producer.hpp:69
Provides a failure model and corresponding exception classes.
#define HICR_THROW_LOGIC(...)
Definition exceptions.hpp:67
Provides base functionality for a fixed-size MPSC channel over HiCR.