/home/runner/work/HiCR/HiCR/include/hicr/core/communicationManager.hpp Source File

HiCR: /home/runner/work/HiCR/HiCR/include/hicr/core/communicationManager.hpp Source File
HiCR
communicationManager.hpp
Go to the documentation of this file.
1/*
2 * Copyright 2025 Huawei Technologies Co., Ltd.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17/*
18 * Copyright Huawei Technologies Switzerland AG
19 * All rights reserved.
20 */
21
29#pragma once
30
31#include <map>
32#include <memory>
33#include <mutex>
34#include <utility>
35#include <vector>
39#include <hicr/core/definitions.hpp>
41
42namespace HiCR
43{
44
54{
55 public:
56
60 using globalKeyMemorySlotPair_t = std::pair<GlobalMemorySlot::globalKey_t, std::shared_ptr<LocalMemorySlot>>;
61
65 using globalKeyToMemorySlotMap_t = std::map<GlobalMemorySlot::globalKey_t, std::shared_ptr<GlobalMemorySlot>>;
66
70 using globalMemorySlotTagKeyMap_t = std::map<GlobalMemorySlot::tag_t, globalKeyToMemorySlotMap_t>;
71
75 virtual ~CommunicationManager() = default;
76
86 virtual void lock() { _mutex.lock(); };
87
92 virtual void unlock() { _mutex.unlock(); };
93
100 __INLINE__ void exchangeGlobalMemorySlots(GlobalMemorySlot::tag_t tag, const std::vector<globalKeyMemorySlotPair_t> &memorySlots)
101 {
102 // Calling internal implementation of this function
103 exchangeGlobalMemorySlotsImpl(tag, memorySlots);
104 }
105
115 __INLINE__ std::shared_ptr<GlobalMemorySlot> getGlobalMemorySlot(GlobalMemorySlot::tag_t tag, GlobalMemorySlot::globalKey_t globalKey)
116 {
117 auto globalSlotImpl = getGlobalMemorySlotImpl(tag, globalKey);
118
119 if (globalSlotImpl != nullptr) { return globalSlotImpl; }
120 else
121 {
122 // If the requested tag and key are not found, return empty storage
123 if (_globalMemorySlotTagKeyMap.contains(tag) == false) { HICR_THROW_LOGIC("Requesting a global memory slot for a tag (%lu) that has not been registered.", tag); }
124 if (_globalMemorySlotTagKeyMap.at(tag).contains(globalKey) == false)
125 {
126 for (const auto &elem : _globalMemorySlotTagKeyMap.at(tag)) { printf("For Tag %lu: Key %lu\n", tag, elem.first); }
127 printf("But tag %lu does not contain globalKey = %lu\n", tag, globalKey);
128 HICR_THROW_LOGIC("Requesting a global memory slot for a global key (%lu) not registered within the tag (%lu).", globalKey, tag);
129 }
130
131 // Getting requested memory slot
132 auto value = _globalMemorySlotTagKeyMap.at(tag).at(globalKey);
133
134 // Returning value
135 return value;
136 }
137 }
138
148 virtual uint8_t *serializeGlobalMemorySlot(const std::shared_ptr<HiCR::GlobalMemorySlot> &globalSlot) const
149 {
150 HICR_THROW_LOGIC("Trying to serialize a global memory slot; this is not supported in this backend\n");
151 return nullptr;
152 }
153
161 virtual std::shared_ptr<GlobalMemorySlot> deserializeGlobalMemorySlot(uint8_t *buffer, GlobalMemorySlot::tag_t tag)
162 {
163 HICR_THROW_LOGIC("Trying to deserialize a global memory slot; this is not supported in this backend\n");
164 return nullptr;
165 }
166
177 virtual std::shared_ptr<GlobalMemorySlot> promoteLocalMemorySlot(const std::shared_ptr<LocalMemorySlot> &localMemorySlot, GlobalMemorySlot::tag_t tag)
178 {
179 HICR_THROW_LOGIC("This backend does not support one-sided promotion of local memory slots to global");
180 return nullptr;
181 }
182
192 __INLINE__ void deregisterGlobalMemorySlot(const std::shared_ptr<GlobalMemorySlot> &memorySlot)
193 {
194 // Getting memory slot global information
195 const auto memorySlotTag = memorySlot->getGlobalTag();
196 const auto memorySlotGlobalKey = memorySlot->getGlobalKey();
197
198 // Checking whether the memory slot is correctly registered as global
199 if (_globalMemorySlotTagKeyMap.contains(memorySlotTag) == false)
200 {
201 HICR_THROW_LOGIC("Attempting to de-register a global memory slot but its tag/key pair is not registered in this backend");
202 }
203
204 // Removing memory slot from the global memory slot map
205 _globalMemorySlotTagKeyMap.at(memorySlotTag).erase(memorySlotGlobalKey);
207 }
208
221 __INLINE__ void destroyGlobalMemorySlot(const std::shared_ptr<GlobalMemorySlot> &memorySlot)
222 {
223 auto tag = memorySlot->getGlobalTag();
224 // Implicit creation of the tag entry if it doesn't exist is desired here
225 _globalMemorySlotsToDestroyPerTag[tag].push_back(memorySlot);
226 }
227
236 virtual void destroyPromotedGlobalMemorySlot(const std::shared_ptr<GlobalMemorySlot> &memorySlot)
237 {
238 HICR_THROW_LOGIC("This backend does not support promoted global memory slots.");
239 }
240
248 __INLINE__ void queryMemorySlotUpdates(std::shared_ptr<LocalMemorySlot> memorySlot)
249 {
250 // Getting value by copy
251 queryMemorySlotUpdatesImpl(std::move(memorySlot));
252 }
253
267 __INLINE__ void memcpy(const std::shared_ptr<LocalMemorySlot> &destination, size_t dst_offset, const std::shared_ptr<LocalMemorySlot> &source, size_t src_offset, size_t size)
268 {
269 // Getting slot sizes. This operation is thread-safe
270 const auto srcSize = source->getSize();
271 const auto dstSize = destination->getSize();
272
273 // Making sure the memory slots exist and is not null
274 const auto actualSrcSize = size + src_offset;
275 const auto actualDstSize = size + dst_offset;
276
277 // Checking size doesn't exceed slot size
278 if (actualSrcSize > srcSize)
279 HICR_THROW_RUNTIME("Memcpy size (%lu) + offset (%lu) = (%lu) exceeds source slot (%p) capacity (%lu).", size, src_offset, actualSrcSize, source->getPointer(), srcSize);
280
281 // Checking size doesn't exceed slot size
282 if (actualDstSize > dstSize)
284 "Memcpy size (%lu) + offset (%lu) = (%lu) exceeds destination slot (%p) capacity (%lu).", size, dst_offset, actualDstSize, destination->getPointer(), dstSize);
285
286 // To enable concurrent memcpy operations, the implementation is executed outside the mutex zone
287 // This means that the developer needs to make sure that the implementation is concurrency-safe,
288 // and try not to access any of the internal Backend class fields without proper mutex locking
289
290 // Now calling internal memcpy function to give us a function that satisfies the operation
291 memcpyImpl(destination, dst_offset, source, src_offset, size);
292 }
293
307 __INLINE__ void memcpy(const std::shared_ptr<GlobalMemorySlot> &destination, size_t dst_offset, const std::shared_ptr<LocalMemorySlot> &source, size_t src_offset, size_t size)
308 {
309 // Getting slot sizes. This operation is thread-safe
310 const auto srcSize = source->getSize();
311
312 // Making sure the memory slots exist and is not null
313 const auto actualSrcSize = size + src_offset;
314
315 // Checking size doesn't exceed slot size
316 if (actualSrcSize > srcSize)
317 HICR_THROW_RUNTIME("Memcpy size (%lu) + offset (%lu) = (%lu) exceeds source slot (%p) capacity (%lu).", size, src_offset, actualSrcSize, source->getPointer(), srcSize);
318
319 // Now calling internal memcpy function to give us a function that satisfies the operation
320 memcpyImpl(destination, dst_offset, source, src_offset, size);
321 }
322
336 __INLINE__ void memcpy(const std::shared_ptr<LocalMemorySlot> &destination, size_t dst_offset, const std::shared_ptr<GlobalMemorySlot> &source, size_t src_offset, size_t size)
337 {
338 // Getting slot sizes. This operation is thread-safe
339 const auto dstSize = destination->getSize();
340
341 // Making sure the memory slots exist and is not null
342 const auto actualDstSize = size + dst_offset;
343
344 // Checking size doesn't exceed slot size
345 if (actualDstSize > dstSize)
347 "Memcpy size (%lu) + offset (%lu) = (%lu) exceeds destination slot (%p) capacity (%lu).", size, dst_offset, actualDstSize, destination->getPointer(), dstSize);
348
349 // Now calling internal memcpy function to give us a function that satisfies the operation
350 memcpyImpl(destination, dst_offset, source, src_offset, size);
351 }
352
377 __INLINE__ void fence(GlobalMemorySlot::tag_t tag)
378 {
379 lock();
380 // Now call the proper fence, as implemented by the backend
381 fenceImpl(tag);
382
383 // Clear the memory slots to destroy
384 _globalMemorySlotsToDestroyPerTag[tag].clear();
385 _globalMemorySlotsToDestroyPerTag.erase(tag);
386 unlock();
387 }
388
400 __INLINE__ void fence(const std::shared_ptr<LocalMemorySlot> &slot, size_t expectedSent, size_t expectedRecvd)
401 {
402 // To enable concurrent fence operations, the implementation is executed outside the mutex zone
403 // This means that the developer needs to make sure that the implementation is concurrency-safe,
404 // and try not to access any of the internal Backend class fields without proper mutex locking
405
406 // Now call the proper fence, as implemented by the backend
407 fenceImpl(slot, expectedSent, expectedRecvd);
408 }
409
421 __INLINE__ void fence(const std::shared_ptr<GlobalMemorySlot> &slot, size_t expectedSent, size_t expectedRecvd)
422 {
423 // To enable concurrent fence operations, the implementation is executed outside the mutex zone
424 // This means that the developer needs to make sure that the implementation is concurrency-safe,
425 // and try not to access any of the internal Backend class fields without proper mutex locking
426
427 // Now call the proper fence, as implemented by the backend
428 fenceImpl(slot, expectedSent, expectedRecvd);
429 }
430
439 __INLINE__ bool acquireGlobalLock(const std::shared_ptr<GlobalMemorySlot> &memorySlot)
440 {
441 // Getting memory slot global information
442 const auto memorySlotTag = memorySlot->getGlobalTag();
443 const auto memorySlotGlobalKey = memorySlot->getGlobalKey();
444
445 // Checking whether the memory slot is correctly registered as global
446 if (_globalMemorySlotTagKeyMap.contains(memorySlotTag) == false)
447 HICR_THROW_LOGIC("Attempting to lock a global memory slot but its tag/key pair is not registered in this backend");
448
449 // Checking whether the memory slot is correctly registered as global
450 if (_globalMemorySlotTagKeyMap.at(memorySlotTag).contains(memorySlotGlobalKey) == false)
451 HICR_THROW_LOGIC("Attempting to lock a global memory slot but its tag/key pair is not registered in this backend");
452
453 // Calling internal implementation
454 return acquireGlobalLockImpl(memorySlot);
455 }
456
462 __INLINE__ void releaseGlobalLock(const std::shared_ptr<GlobalMemorySlot> &memorySlot)
463 {
464 // Getting memory slot global information
465 const auto memorySlotTag = memorySlot->getGlobalTag();
466 const auto memorySlotGlobalKey = memorySlot->getGlobalKey();
467
468 // Checking whether the memory slot is correctly registered as global
469 if (_globalMemorySlotTagKeyMap.contains(memorySlotTag) == false)
470 {
471 HICR_THROW_LOGIC("Attempting to release a global memory slot but its tag/key pair is not registered in this backend");
472 }
473
474 // Checking whether the memory slot is correctly registered as global
475 if (_globalMemorySlotTagKeyMap.at(memorySlotTag).contains(memorySlotGlobalKey) == false)
476 {
477 HICR_THROW_LOGIC("Attempting to release a global memory slot but its tag/key pair is not registered in this backend");
478 }
479
480 // Calling internal implementation
481 releaseGlobalLockImpl(memorySlot);
482 }
483
487 __INLINE__ virtual void flushSent() {}
488
492 __INLINE__ virtual void flushReceived() {}
493
494 protected:
495
503 __INLINE__ void registerGlobalMemorySlot(const std::shared_ptr<GlobalMemorySlot> &memorySlot)
504 {
505 // Getting memory slot information
506 const auto tag = memorySlot->getGlobalTag();
507 const auto globalKey = memorySlot->getGlobalKey();
508
509 // Adding memory slot to the global map (based on tag and key)
510 _globalMemorySlotTagKeyMap[tag][globalKey] = memorySlot;
511 }
512
522 virtual std::shared_ptr<GlobalMemorySlot> getGlobalMemorySlotImpl(GlobalMemorySlot::tag_t tag, GlobalMemorySlot::globalKey_t globalKey) = 0;
523
530 virtual void exchangeGlobalMemorySlotsImpl(GlobalMemorySlot::tag_t tag, const std::vector<globalKeyMemorySlotPair_t> &memorySlots) = 0;
531
537 virtual void queryMemorySlotUpdatesImpl(std::shared_ptr<LocalMemorySlot> memorySlot) = 0;
538
544 virtual void deregisterGlobalMemorySlotImpl(const std::shared_ptr<GlobalMemorySlot> &memorySlot) {};
545
552 virtual void destroyGlobalMemorySlotImpl(std::shared_ptr<GlobalMemorySlot> memorySlot) = 0;
553
563 virtual void memcpyImpl(const std::shared_ptr<LocalMemorySlot> &destination, size_t dst_offset, const std::shared_ptr<LocalMemorySlot> &source, size_t src_offset, size_t size)
564 {
565 HICR_THROW_LOGIC("Local->Local memcpy operations are unsupported by the given backend");
566 };
567
577 virtual void memcpyImpl(const std::shared_ptr<GlobalMemorySlot> &destination, size_t dst_offset, const std::shared_ptr<LocalMemorySlot> &source, size_t src_offset, size_t size)
578 {
579 HICR_THROW_LOGIC("Local->Global memcpy operations are unsupported by the given backend");
580 };
581
591 virtual void memcpyImpl(const std::shared_ptr<LocalMemorySlot> &destination, size_t dst_offset, const std::shared_ptr<GlobalMemorySlot> &source, size_t src_offset, size_t size)
592 {
593 HICR_THROW_LOGIC("Global->Local memcpy operations are unsupported by the given backend");
594 };
595
603 virtual void fenceImpl(const std::shared_ptr<LocalMemorySlot> &slot, size_t expectedSent, size_t expectedRcvd) {}
611 virtual void fenceImpl(const std::shared_ptr<GlobalMemorySlot> &slot, size_t expectedSent, size_t expectedRcvd) {}
618 virtual void fenceImpl(GlobalMemorySlot::tag_t tag) = 0;
619
625 virtual bool acquireGlobalLockImpl(std::shared_ptr<GlobalMemorySlot> memorySlot) = 0;
626
631 virtual void releaseGlobalLockImpl(std::shared_ptr<GlobalMemorySlot> memorySlot) = 0;
632
637 [[nodiscard]] __INLINE__ auto &getGlobalMemorySlotsToDestroyPerTag() { return _globalMemorySlotsToDestroyPerTag; }
638
643 [[nodiscard]] __INLINE__ auto &getGlobalMemorySlotTagKeyMap() { return _globalMemorySlotTagKeyMap; }
644
645 protected:
646
652 __INLINE__ void increaseMessageRecvCounter(HiCR::LocalMemorySlot &memorySlot) noexcept { memorySlot.increaseMessagesRecv(); }
653
659 __INLINE__ void increaseMessageSentCounter(HiCR::LocalMemorySlot &memorySlot) noexcept { memorySlot.increaseMessagesSent(); }
660
667 __INLINE__ void setMessagesRecv(HiCR::LocalMemorySlot &memorySlot, const size_t count) noexcept { memorySlot.setMessagesRecv(count); }
668
675 __INLINE__ void setMessagesSent(HiCR::LocalMemorySlot &memorySlot, const size_t count) noexcept { memorySlot.setMessagesSent(count); }
676
677 private:
678
683 std::mutex _mutex;
684
688 globalMemorySlotTagKeyMap_t _globalMemorySlotTagKeyMap;
689
693 std::map<GlobalMemorySlot::tag_t, std::vector<std::shared_ptr<GlobalMemorySlot>>> _globalMemorySlotsToDestroyPerTag;
694};
695
696} // namespace HiCR
Definition communicationManager.hpp:54
std::map< GlobalMemorySlot::tag_t, globalKeyToMemorySlotMap_t > globalMemorySlotTagKeyMap_t
Definition communicationManager.hpp:70
virtual std::shared_ptr< GlobalMemorySlot > promoteLocalMemorySlot(const std::shared_ptr< LocalMemorySlot > &localMemorySlot, GlobalMemorySlot::tag_t tag)
Definition communicationManager.hpp:177
virtual uint8_t * serializeGlobalMemorySlot(const std::shared_ptr< HiCR::GlobalMemorySlot > &globalSlot) const
Definition communicationManager.hpp:148
__INLINE__ auto & getGlobalMemorySlotTagKeyMap()
Definition communicationManager.hpp:643
__INLINE__ void setMessagesSent(HiCR::LocalMemorySlot &memorySlot, const size_t count) noexcept
Definition communicationManager.hpp:675
__INLINE__ std::shared_ptr< GlobalMemorySlot > getGlobalMemorySlot(GlobalMemorySlot::tag_t tag, GlobalMemorySlot::globalKey_t globalKey)
Definition communicationManager.hpp:115
virtual void memcpyImpl(const std::shared_ptr< LocalMemorySlot > &destination, size_t dst_offset, const std::shared_ptr< LocalMemorySlot > &source, size_t src_offset, size_t size)
Definition communicationManager.hpp:563
virtual __INLINE__ void flushReceived()
Definition communicationManager.hpp:492
__INLINE__ void memcpy(const std::shared_ptr< LocalMemorySlot > &destination, size_t dst_offset, const std::shared_ptr< LocalMemorySlot > &source, size_t src_offset, size_t size)
Definition communicationManager.hpp:267
virtual void fenceImpl(const std::shared_ptr< LocalMemorySlot > &slot, size_t expectedSent, size_t expectedRcvd)
Definition communicationManager.hpp:603
virtual ~CommunicationManager()=default
__INLINE__ void releaseGlobalLock(const std::shared_ptr< GlobalMemorySlot > &memorySlot)
Definition communicationManager.hpp:462
virtual void fenceImpl(const std::shared_ptr< GlobalMemorySlot > &slot, size_t expectedSent, size_t expectedRcvd)
Definition communicationManager.hpp:611
virtual void releaseGlobalLockImpl(std::shared_ptr< GlobalMemorySlot > memorySlot)=0
virtual std::shared_ptr< GlobalMemorySlot > getGlobalMemorySlotImpl(GlobalMemorySlot::tag_t tag, GlobalMemorySlot::globalKey_t globalKey)=0
__INLINE__ void fence(GlobalMemorySlot::tag_t tag)
Definition communicationManager.hpp:377
virtual void unlock()
Definition communicationManager.hpp:92
virtual __INLINE__ void flushSent()
Definition communicationManager.hpp:487
__INLINE__ void deregisterGlobalMemorySlot(const std::shared_ptr< GlobalMemorySlot > &memorySlot)
Definition communicationManager.hpp:192
__INLINE__ void memcpy(const std::shared_ptr< GlobalMemorySlot > &destination, size_t dst_offset, const std::shared_ptr< LocalMemorySlot > &source, size_t src_offset, size_t size)
Definition communicationManager.hpp:307
virtual void lock()
Definition communicationManager.hpp:86
__INLINE__ void registerGlobalMemorySlot(const std::shared_ptr< GlobalMemorySlot > &memorySlot)
Definition communicationManager.hpp:503
__INLINE__ bool acquireGlobalLock(const std::shared_ptr< GlobalMemorySlot > &memorySlot)
Definition communicationManager.hpp:439
virtual bool acquireGlobalLockImpl(std::shared_ptr< GlobalMemorySlot > memorySlot)=0
virtual void memcpyImpl(const std::shared_ptr< LocalMemorySlot > &destination, size_t dst_offset, const std::shared_ptr< GlobalMemorySlot > &source, size_t src_offset, size_t size)
Definition communicationManager.hpp:591
__INLINE__ void destroyGlobalMemorySlot(const std::shared_ptr< GlobalMemorySlot > &memorySlot)
Definition communicationManager.hpp:221
virtual std::shared_ptr< GlobalMemorySlot > deserializeGlobalMemorySlot(uint8_t *buffer, GlobalMemorySlot::tag_t tag)
Definition communicationManager.hpp:161
virtual void fenceImpl(GlobalMemorySlot::tag_t tag)=0
virtual void queryMemorySlotUpdatesImpl(std::shared_ptr< LocalMemorySlot > memorySlot)=0
__INLINE__ void queryMemorySlotUpdates(std::shared_ptr< LocalMemorySlot > memorySlot)
Definition communicationManager.hpp:248
virtual void deregisterGlobalMemorySlotImpl(const std::shared_ptr< GlobalMemorySlot > &memorySlot)
Definition communicationManager.hpp:544
virtual void destroyGlobalMemorySlotImpl(std::shared_ptr< GlobalMemorySlot > memorySlot)=0
std::pair< GlobalMemorySlot::globalKey_t, std::shared_ptr< LocalMemorySlot > > globalKeyMemorySlotPair_t
Definition communicationManager.hpp:60
virtual void exchangeGlobalMemorySlotsImpl(GlobalMemorySlot::tag_t tag, const std::vector< globalKeyMemorySlotPair_t > &memorySlots)=0
__INLINE__ void fence(const std::shared_ptr< GlobalMemorySlot > &slot, size_t expectedSent, size_t expectedRecvd)
Definition communicationManager.hpp:421
std::map< GlobalMemorySlot::globalKey_t, std::shared_ptr< GlobalMemorySlot > > globalKeyToMemorySlotMap_t
Definition communicationManager.hpp:65
__INLINE__ void increaseMessageRecvCounter(HiCR::LocalMemorySlot &memorySlot) noexcept
Definition communicationManager.hpp:652
__INLINE__ auto & getGlobalMemorySlotsToDestroyPerTag()
Definition communicationManager.hpp:637
__INLINE__ void memcpy(const std::shared_ptr< LocalMemorySlot > &destination, size_t dst_offset, const std::shared_ptr< GlobalMemorySlot > &source, size_t src_offset, size_t size)
Definition communicationManager.hpp:336
__INLINE__ void setMessagesRecv(HiCR::LocalMemorySlot &memorySlot, const size_t count) noexcept
Definition communicationManager.hpp:667
virtual void destroyPromotedGlobalMemorySlot(const std::shared_ptr< GlobalMemorySlot > &memorySlot)
Definition communicationManager.hpp:236
virtual void memcpyImpl(const std::shared_ptr< GlobalMemorySlot > &destination, size_t dst_offset, const std::shared_ptr< LocalMemorySlot > &source, size_t src_offset, size_t size)
Definition communicationManager.hpp:577
__INLINE__ void increaseMessageSentCounter(HiCR::LocalMemorySlot &memorySlot) noexcept
Definition communicationManager.hpp:659
__INLINE__ void exchangeGlobalMemorySlots(GlobalMemorySlot::tag_t tag, const std::vector< globalKeyMemorySlotPair_t > &memorySlots)
Definition communicationManager.hpp:100
__INLINE__ void fence(const std::shared_ptr< LocalMemorySlot > &slot, size_t expectedSent, size_t expectedRecvd)
Definition communicationManager.hpp:400
uint64_t tag_t
Definition globalMemorySlot.hpp:49
uint64_t globalKey_t
Definition globalMemorySlot.hpp:44
Definition localMemorySlot.hpp:44
Provides a definition for a HiCR Global Memory Slot class.
Provides a definition for a HiCR Local Memory Slot class.
Provides a base definition for a HiCR MemorySpace class.
Provides a failure model and corresponding exception classes.
#define HICR_THROW_RUNTIME(...)
Definition exceptions.hpp:74
#define HICR_THROW_LOGIC(...)
Definition exceptions.hpp:67