/home/runner/work/HiCR/HiCR/include/hicr/frontends/channel/fixedSize/spsc/producer.hpp Source File

HiCR: /home/runner/work/HiCR/HiCR/include/hicr/frontends/channel/fixedSize/spsc/producer.hpp Source File
HiCR
producer.hpp
Go to the documentation of this file.
1/*
2 * Copyright 2025 Huawei Technologies Co., Ltd.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
24#pragma once
25
26#include <hicr/core/definitions.hpp>
29#include <utility>
30
31namespace HiCR::channel::fixedSize::SPSC
32{
33
41{
42 private:
43
47 const std::shared_ptr<GlobalMemorySlot> _tokenBuffer;
48
49 /*
50 * Global Memory slot pointing to the consumer coordination buffer
51 */
52 const std::shared_ptr<GlobalMemorySlot> _consumerCoordinationBuffer;
53
54 public:
55
70 Producer(CommunicationManager &communicationManager,
71 std::shared_ptr<GlobalMemorySlot> tokenBuffer,
72 const std::shared_ptr<LocalMemorySlot> &internalCoordinationBuffer,
73 std::shared_ptr<GlobalMemorySlot> consumerCoordinationBuffer,
74 const size_t tokenSize,
75 const size_t capacity)
76 : fixedSize::Base(communicationManager, internalCoordinationBuffer, tokenSize, capacity),
77 _tokenBuffer(std::move(tokenBuffer)),
78 _consumerCoordinationBuffer(std::move(consumerCoordinationBuffer))
79 {}
80
81 ~Producer() = default;
82
100 __INLINE__ void push(const std::shared_ptr<LocalMemorySlot> &sourceSlot, const size_t n = 1)
101 {
102 // Make sure source slot is big enough to satisfy the operation
103 auto requiredBufferSize = getTokenSize() * n;
104 auto providedBufferSize = sourceSlot->getSize();
105 if (providedBufferSize < requiredBufferSize)
106 HICR_THROW_LOGIC("Attempting to push with a source buffer size (%lu) smaller than the required size (Token Size (%lu) x n (%lu) = %lu).\n",
107 providedBufferSize,
108 getTokenSize(),
109 n,
110 requiredBufferSize);
111
112 // Updating channel depth
113 updateDepth();
114
115 // Calculating current channel depth
116 auto curDepth = getCircularBuffer()->getDepth();
117
118 // If the exchange buffer does not have n free slots, reject the operation
119 if (curDepth + n > getCircularBuffer()->getCapacity())
121 "Attempting to push with (%lu) tokens while the channel has (%lu) tokens and this would exceed capacity (%lu).\n", n, curDepth, getCircularBuffer()->getCapacity());
122
129 getCircularBuffer()->setCachedDepth(curDepth);
130 for (size_t i = 0; i < n; i++)
131 {
132 // Copying with source increasing offset per token
133 getCommunicationManager()->memcpy(_tokenBuffer, /* destination */
134 getTokenSize() * getCircularBuffer()->getHeadPosition(), /* dst_offset */
135 sourceSlot, /* source */
136 i * getTokenSize(), /* src_offset */
137 getTokenSize()); /* size */
138 }
139 getCommunicationManager()->fence(sourceSlot, n, 0);
140
141 // read possibly slightly outdated depth here (will be updated next round)
142 getCircularBuffer()->advanceHead(n, true);
143
144 /*
145 * In this implementation of producer-consumer, the producer
146 * actively and in a one-sided manner updates the depth at the consumer.
147 * This implementation has some advantages for MPSC implementations
148 * on top of SPSC
149 */
150 getCommunicationManager()->memcpy(_consumerCoordinationBuffer,
151 _HICR_CHANNEL_HEAD_ADVANCE_COUNT_IDX * sizeof(size_t),
153 _HICR_CHANNEL_HEAD_ADVANCE_COUNT_IDX * sizeof(size_t),
154 sizeof(size_t));
156 }
157
161 __INLINE__ void updateDepth()
162 {
163 // Perform a non-blocking check of the coordination and token buffers, to see and/or notify if there are new messages
165 }
166};
167
168} // namespace HiCR::channel::fixedSize::SPSC
Definition communicationManager.hpp:54
__INLINE__ void memcpy(const std::shared_ptr< LocalMemorySlot > &destination, size_t dst_offset, const std::shared_ptr< LocalMemorySlot > &source, size_t src_offset, size_t size)
Definition communicationManager.hpp:267
__INLINE__ void fence(GlobalMemorySlot::tag_t tag)
Definition communicationManager.hpp:377
__INLINE__ void queryMemorySlotUpdates(std::shared_ptr< LocalMemorySlot > memorySlot)
Definition communicationManager.hpp:248
__INLINE__ CommunicationManager * getCommunicationManager() const
Definition base.hpp:217
__INLINE__ auto getCoordinationBuffer() const
Definition base.hpp:223
__INLINE__ size_t getTokenSize() const noexcept
Definition base.hpp:84
__INLINE__ auto getCircularBuffer() const noexcept
Definition base.hpp:167
Definition base.hpp:42
__INLINE__ void push(const std::shared_ptr< LocalMemorySlot > &sourceSlot, const size_t n=1)
Definition producer.hpp:100
__INLINE__ void updateDepth()
Definition producer.hpp:161
Producer(CommunicationManager &communicationManager, std::shared_ptr< GlobalMemorySlot > tokenBuffer, const std::shared_ptr< LocalMemorySlot > &internalCoordinationBuffer, std::shared_ptr< GlobalMemorySlot > consumerCoordinationBuffer, const size_t tokenSize, const size_t capacity)
Definition producer.hpp:70
Provides a failure model and corresponding exception classes.
#define HICR_THROW_RUNTIME(...)
Definition exceptions.hpp:74
#define HICR_THROW_LOGIC(...)
Definition exceptions.hpp:67
Provides base functionality for a fixed-size MPSC channel over HiCR.