/home/runner/work/HiCR/HiCR/include/hicr/backends/mpi/communicationManager.hpp Source File

HiCR: /home/runner/work/HiCR/HiCR/include/hicr/backends/mpi/communicationManager.hpp Source File
HiCR
communicationManager.hpp
Go to the documentation of this file.
1/*
2 * Copyright 2025 Huawei Technologies Co., Ltd.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
24#pragma once
25
26#include <mpi.h>
27#include <set>
28#include <hicr/core/definitions.hpp>
31#include "localMemorySlot.hpp"
32#include "globalMemorySlot.hpp"
33
34namespace HiCR::backend::mpi
35{
36
43{
44 public:
45
52 CommunicationManager(MPI_Comm comm = MPI_COMM_WORLD)
53 : HiCR::CommunicationManager(),
54 _comm(comm)
55 {
56 MPI_Comm_size(_comm, &_size);
57 MPI_Comm_rank(_comm, &_rank);
58 }
59
60 ~CommunicationManager() override = default;
61
66 [[nodiscard]] const MPI_Comm getComm() const { return _comm; }
67
72 [[nodiscard]] const int getSize() const { return _size; }
73
78 [[nodiscard]] const int getRank() const { return _rank; }
79
80 protected:
81
91 virtual __INLINE__ void fenceImpl(HiCR::GlobalMemorySlot::tag_t tag) override
92 {
93 MPI_Barrier(_comm);
94
95 // Call the slot destruction collective routine
96 destroyGlobalMemorySlotsCollectiveImpl(tag);
97 }
98
105 virtual __INLINE__ void exchangeGlobalMemorySlotsImpl(HiCR::GlobalMemorySlot::tag_t tag, const std::vector<globalKeyMemorySlotPair_t> &memorySlots) override
106 {
107 // Obtaining local slots to exchange
108 int localSlotCount = (int)memorySlots.size();
109
110 // Obtaining the local slots to exchange per process in the communicator
111 std::vector<int> perProcessSlotCount(_size);
112 lock();
113 MPI_Allgather(&localSlotCount, 1, MPI_INT, perProcessSlotCount.data(), 1, MPI_INT, _comm);
114 unlock();
115
116 // Calculating respective offsets
117 std::vector<int> perProcessSlotOffsets(_size);
118 int currentOffset = 0;
119 for (int i = 0; i < _size; i++)
120 {
121 perProcessSlotOffsets[i] += currentOffset;
122 currentOffset += perProcessSlotCount[i];
123 }
124
125 // Calculating number of global slots
126 int globalSlotCount = 0;
127 for (const auto count : perProcessSlotCount) globalSlotCount += count;
128
129 // Allocating storage for local and global memory slot sizes, keys and process id
130 std::vector<size_t> localSlotSizes(localSlotCount);
131 std::vector<size_t> globalSlotSizes(globalSlotCount);
132 std::vector<HiCR::GlobalMemorySlot::globalKey_t> localSlotKeys(localSlotCount);
133 std::vector<HiCR::GlobalMemorySlot::globalKey_t> globalSlotKeys(globalSlotCount);
134 std::vector<int> localSlotProcessId(localSlotCount);
135 std::vector<int> globalSlotProcessId(globalSlotCount);
136
137 // Filling in the local size and keys storage
138 for (size_t i = 0; i < memorySlots.size(); i++)
139 {
140 const auto key = memorySlots[i].first;
141 const auto memorySlot = std::dynamic_pointer_cast<HiCR::backend::mpi::LocalMemorySlot>(memorySlots[i].second);
142 if (memorySlot.get() == nullptr) HICR_THROW_LOGIC("Trying to use MPI to promote a non-MPI local memory slot.");
143 localSlotSizes[i] = memorySlot->getSize();
144 localSlotKeys[i] = key;
145 localSlotProcessId[i] = _rank;
146 }
147
148 // Exchanging global sizes, keys and process ids
149 lock();
150 MPI_Allgatherv(
151 localSlotSizes.data(), localSlotCount, MPI_UNSIGNED_LONG, globalSlotSizes.data(), perProcessSlotCount.data(), perProcessSlotOffsets.data(), MPI_UNSIGNED_LONG, _comm);
152 MPI_Allgatherv(
153 localSlotKeys.data(), localSlotCount, MPI_UNSIGNED_LONG, globalSlotKeys.data(), perProcessSlotCount.data(), perProcessSlotOffsets.data(), MPI_UNSIGNED_LONG, _comm);
154 MPI_Allgatherv(localSlotProcessId.data(), localSlotCount, MPI_INT, globalSlotProcessId.data(), perProcessSlotCount.data(), perProcessSlotOffsets.data(), MPI_INT, _comm);
155 unlock();
156
157 // Now also creating pointer vector to remember local pointers, when required for memcpys
158 std::vector<void **> globalSlotPointers(globalSlotCount);
159 std::vector<std::shared_ptr<HiCR::LocalMemorySlot>> globalSourceSlots(globalSlotCount);
160 size_t localPointerPos = 0;
161 for (size_t i = 0; i < globalSlotPointers.size(); i++)
162 {
163 // If the rank associated with this slot is remote, don't store the pointer, otherwise store it.
164 if (globalSlotProcessId[i] != _rank)
165 {
166 globalSlotPointers[i] = nullptr;
167 globalSourceSlots[i] = nullptr;
168 }
169 else
170 {
171 const auto memorySlot = memorySlots[localPointerPos++].second;
172 globalSlotPointers[i] = &memorySlot->getPointer();
173 globalSourceSlots[i] = memorySlot;
174 }
175 }
176
177 // Now creating global slots and their MPI windows
178 for (size_t i = 0; i < globalSlotProcessId.size(); i++)
179 {
180 // Creating new memory slot object
181 auto memorySlot = std::make_shared<mpi::GlobalMemorySlot>(globalSlotProcessId[i], tag, globalSlotKeys[i], globalSourceSlots[i]);
182
183 // Allocating MPI windows
184 memorySlot->getDataWindow() = std::make_unique<MPI_Win>();
185 memorySlot->getRecvMessageCountWindow() = std::make_unique<MPI_Win>();
186 memorySlot->getSentMessageCountWindow() = std::make_unique<MPI_Win>();
187
188 // Termporary storage for the pointer returned by MPI_Win_Allocate. We will assign this a new internal storage to the local memory slot
189 void *ptr = nullptr;
190
191 // Creating MPI window for data transferring
192 lock();
193 auto status = MPI_Win_allocate(globalSlotProcessId[i] == _rank ? (int)globalSlotSizes[i] : 0, 1, MPI_INFO_NULL, _comm, &ptr, memorySlot->getDataWindow().get());
194 MPI_Win_set_errhandler(*memorySlot->getDataWindow(), MPI_ERRORS_RETURN);
195 unlock();
196
197 // Unfortunately, we need to do an effective duplucation of the original local memory slot storage
198 // since no modern MPI library supports MPI_Win_create over user-allocated storage anymore
199 if (globalSlotProcessId[i] == _rank)
200 {
201 // Copying existing data over to the new storage
202 std::memcpy(ptr, *(globalSlotPointers[i]), globalSlotSizes[i]);
203
204 // Freeing up memory
205 lock();
206 MPI_Free_mem(*(globalSlotPointers[i]));
207 unlock();
208
209 // Swapping pointers
210 *(globalSlotPointers[i]) = ptr;
211 }
212
213 if (status != MPI_SUCCESS) HICR_THROW_RUNTIME("Failed to create MPI data window on exchange global memory slots.");
214
215 // Creating MPI window for message received count transferring
216 lock();
217 status = MPI_Win_allocate(globalSlotProcessId[i] == _rank ? sizeof(size_t) : 0, 1, MPI_INFO_NULL, _comm, &ptr, memorySlot->getRecvMessageCountWindow().get());
218 MPI_Win_set_errhandler(*memorySlot->getRecvMessageCountWindow(), MPI_ERRORS_RETURN);
219 unlock();
220
221 if (status != MPI_SUCCESS) HICR_THROW_RUNTIME("Failed to create MPI received message count window on exchange global memory slots.");
222
223 // Creating MPI window for message sent count transferring
224 lock();
225 status = MPI_Win_allocate(globalSlotProcessId[i] == _rank ? sizeof(size_t) : 0, 1, MPI_INFO_NULL, _comm, &ptr, memorySlot->getSentMessageCountWindow().get());
226 MPI_Win_set_errhandler(*memorySlot->getSentMessageCountWindow(), MPI_ERRORS_RETURN);
227 unlock();
228
229 if (status != MPI_SUCCESS) HICR_THROW_RUNTIME("Failed to create MPI sent message count window on exchange global memory slots.");
230
231 // Registering global slot
232 registerGlobalMemorySlot(memorySlot);
233 }
234 }
235
236 private:
237
246 __INLINE__ void destroyGlobalMemorySlotImpl(std::shared_ptr<HiCR::GlobalMemorySlot> memorySlotPtr) override
247 {
248 // Getting up-casted pointer for the execution unit
249 auto memorySlot = dynamic_pointer_cast<mpi::GlobalMemorySlot>(memorySlotPtr);
250
251 // Checking whether the execution unit passed is compatible with this backend
252 if (memorySlot == nullptr) HICR_THROW_LOGIC("The memory slot is not supported by this backend\n");
253
254 auto status = MPI_Win_free(memorySlot->getDataWindow().get());
255 if (status != MPI_SUCCESS) HICR_THROW_RUNTIME("On deregister global memory slot, could not free MPI data window");
256
257 status = MPI_Win_free(memorySlot->getRecvMessageCountWindow().get());
258 if (status != MPI_SUCCESS) HICR_THROW_RUNTIME("On deregister global memory slot, could not free MPI recv message count window");
259
260 status = MPI_Win_free(memorySlot->getSentMessageCountWindow().get());
261 if (status != MPI_SUCCESS) HICR_THROW_RUNTIME("On deregister global memory slot, could not free MPI sent message count window");
262 }
263
264 __INLINE__ bool acquireGlobalLockImpl(std::shared_ptr<HiCR::GlobalMemorySlot> memorySlot) override
265 {
266 // Getting up-casted pointer for the execution unit
267 auto m = dynamic_pointer_cast<mpi::GlobalMemorySlot>(memorySlot);
268
269 // Checking whether the execution unit passed is compatible with this backend
270 if (m == nullptr) HICR_THROW_LOGIC("The passed memory slot is not supported by this backend\n");
271
272 // Locking access to all relevant memory slot windows
273 lockMPIWindow(m->getRank(), m->getDataWindow().get(), MPI_LOCK_EXCLUSIVE, 0);
274
275 // Setting memory slot lock as aquired
276 m->setLockAcquiredValue(true);
277
278 // This function is assumed to always succeed
279 return true;
280 }
281
282 __INLINE__ void releaseGlobalLockImpl(std::shared_ptr<HiCR::GlobalMemorySlot> memorySlot) override
283 {
284 // Getting up-casted pointer for the execution unit
285 auto m = dynamic_pointer_cast<mpi::GlobalMemorySlot>(memorySlot);
286
287 // Checking whether the execution unit passed is compatible with this backend
288 if (m == nullptr) HICR_THROW_LOGIC("The passed memory slot is not supported by this backend\n");
289
290 // Releasing access to all relevant memory slot windows
291 unlockMPIWindow(m->getRank(), m->getDataWindow().get());
292
293 // Setting memory slot lock as released
294 m->setLockAcquiredValue(false);
295 }
296
297 std::shared_ptr<HiCR::GlobalMemorySlot> getGlobalMemorySlotImpl(HiCR::GlobalMemorySlot::tag_t tag, HiCR::GlobalMemorySlot::globalKey_t globalKey) override { return nullptr; }
298
299 __INLINE__ void destroyGlobalMemorySlotsCollectiveImpl(HiCR::GlobalMemorySlot::tag_t tag)
300 {
301 // Destruction of global memory slots marked for destruction
302 // note: MPI expects int, not size_t as the parameter for allgather which we use here, so we have to work with int
303 int localDestroySlotsCount = (int)getGlobalMemorySlotsToDestroyPerTag()[tag].size();
304 std::vector<int> perProcessDestroySlotCount(_size);
305
306 // Obtaining the number of slots to destroy per process in the communicator
307 MPI_Allgather(&localDestroySlotsCount, 1, MPI_INT, perProcessDestroySlotCount.data(), 1, MPI_INT, _comm);
308
309 // Calculating respective offsets; TODO fix offset types for both this method and exchangeGlobalMemorySlotsImpl
310 std::vector<int> perProcessSlotOffsets(_size);
311 int currentOffset = 0;
312 for (int i = 0; i < _size; i++)
313 {
314 perProcessSlotOffsets[i] += currentOffset;
315 currentOffset += perProcessDestroySlotCount[i];
316 }
317
318 // Calculating number of global slots to destroy
319 int globalDestroySlotsCount = 0;
320 for (const auto count : perProcessDestroySlotCount) globalDestroySlotsCount += count;
321
322 // If there are no slots to destroy from any instance, return to avoid a second round of collectives
323 if (globalDestroySlotsCount == 0) return;
324
325 // Allocating storage for global memory slot keys
326 std::vector<HiCR::GlobalMemorySlot::globalKey_t> localDestroySlotKeys(localDestroySlotsCount);
327 std::vector<HiCR::GlobalMemorySlot::globalKey_t> globalDestroySlotKeys(globalDestroySlotsCount);
328
329 // Filling in the local keys storage
330 for (auto i = 0; i < localDestroySlotsCount; i++)
331 {
332 const auto memorySlot = getGlobalMemorySlotsToDestroyPerTag()[tag][i];
333 const auto key = memorySlot->getGlobalKey();
334 localDestroySlotKeys[i] = key;
335 }
336
337 // Exchanging global keys
338 MPI_Allgatherv(localDestroySlotKeys.data(),
339 localDestroySlotsCount,
340 MPI_UNSIGNED_LONG,
341 globalDestroySlotKeys.data(),
342 perProcessDestroySlotCount.data(),
343 perProcessSlotOffsets.data(),
344 MPI_UNSIGNED_LONG,
345 _comm);
346
347 // Deduplicating the global keys, as more than one process might want to destroy the same key
348 std::set<HiCR::GlobalMemorySlot::globalKey_t> globalDestroySlotKeysSet(globalDestroySlotKeys.begin(), globalDestroySlotKeys.end());
349
350 // Now we can iterate over the global slots to destroy one by one
351 for (auto key : globalDestroySlotKeysSet)
352 {
353 std::shared_ptr<HiCR::GlobalMemorySlot> memorySlot = nullptr;
354 // Getting the memory slot to destroy
355 // First check the standard map
356 if (getGlobalMemorySlotTagKeyMap()[tag].contains(key))
357 {
358 memorySlot = getGlobalMemorySlotTagKeyMap()[tag].at(key);
359 // Deregister because a later destroy will try and fail to destroy
360 getGlobalMemorySlotTagKeyMap()[tag].erase(key);
361 }
362 // If not found, check the deregistered map
363 else if (_deregisteredGlobalMemorySlotsTagKeyMap[tag].contains(key))
364 {
365 memorySlot = _deregisteredGlobalMemorySlotsTagKeyMap[tag].at(key);
366 _deregisteredGlobalMemorySlotsTagKeyMap[tag].erase(key);
367 }
368 else
369 HICR_THROW_FATAL("Could not find memory slot to destroy in this backend. Tag: %d, Key: %lu", tag, key);
370
371 // Destroying the memory slot collectively; there might be a case where the slot is not found, due to double calls to destroy
372 destroyGlobalMemorySlotImpl(memorySlot);
373 }
374 }
375
376 __INLINE__ void memcpyImpl(const std::shared_ptr<HiCR::LocalMemorySlot> &destinationSlot,
377 size_t dst_offset,
378 const std::shared_ptr<HiCR::GlobalMemorySlot> &sourceSlotPtr,
379 size_t sourceOffset,
380 size_t size) override
381 {
382 // Getting up-casted pointer for the execution unit
383 auto source = dynamic_pointer_cast<mpi::GlobalMemorySlot>(sourceSlotPtr);
384
385 // Checking whether the execution unit passed is compatible with this backend
386 if (source == nullptr) HICR_THROW_LOGIC("The passed source memory slot is not supported by this backend\n");
387
388 // Getting ranks for the involved processes
389 const auto sourceRank = source->getRank();
390
391 // Check if we already acquired a lock on the memory slots
392 bool isSourceSlotLockAcquired = source->getLockAcquiredValue();
393
394 // Calculating pointer
395 auto destinationPointer = (void *)(static_cast<uint8_t *>(destinationSlot->getPointer()) + dst_offset);
396
397 // Getting data window for the involved processes
398 auto sourceDataWindow = source->getDataWindow().get();
399
400 // Getting recv message count window for the involved processes
401 auto sourceSentMessageWindow = source->getSentMessageCountWindow().get();
402
403 // Locking MPI window to ensure the messages arrives before returning. This will not exclude other processes from accessing the data (MPI_LOCK_SHARED)
404 if (isSourceSlotLockAcquired == false) lockMPIWindow(sourceRank, sourceDataWindow, MPI_LOCK_SHARED, MPI_MODE_NOCHECK);
405
406 // Executing the get operation
407 {
408 lock();
409 auto status = MPI_Get(destinationPointer, (int)size, MPI_BYTE, sourceRank, (int)sourceOffset, (int)size, MPI_BYTE, *sourceDataWindow);
410 unlock();
411 if (status != MPI_SUCCESS) HICR_THROW_RUNTIME("Failed to run MPI_Get");
412 }
413
414 // Making sure the operation finished
415 {
416 lock();
417 auto status = MPI_Win_flush(sourceRank, *sourceDataWindow);
418 unlock();
419 if (status != MPI_SUCCESS) HICR_THROW_RUNTIME("Failed to run MPI_Win_flush");
420 }
421
422 // Unlocking window, if taken, after copy is completed
423 if (isSourceSlotLockAcquired == false) unlockMPIWindow(sourceRank, sourceDataWindow);
424
425 // Increasing the remote sent message counter and local destination received message counter
426 increaseWindowCounter(sourceRank, sourceSentMessageWindow);
427 increaseMessageRecvCounter(*destinationSlot);
428 }
429
430 __INLINE__ void memcpyImpl(const std::shared_ptr<HiCR::GlobalMemorySlot> &destinationSlotPtr,
431 size_t dst_offset,
432 const std::shared_ptr<HiCR::LocalMemorySlot> &sourceSlot,
433 size_t sourceOffset,
434 size_t size) override
435 {
436 // Getting up-casted pointer for the execution unit
437 auto destination = dynamic_pointer_cast<mpi::GlobalMemorySlot>(destinationSlotPtr);
438
439 // Checking whether the execution unit passed is compatible with this backend
440 if (destination == nullptr) HICR_THROW_LOGIC("The passed destination memory slot is not supported by this backend\n");
441
442 // Getting ranks for the involved processes
443 const auto destinationRank = destination->getRank();
444
445 // Check if we already acquired a lock on the memory slots
446 bool isDestinationSlotLockAcquired = destination->getLockAcquiredValue();
447
448 // Calculating pointers
449 auto sourcePointer = (void *)(static_cast<uint8_t *>(sourceSlot->getPointer()) + sourceOffset);
450
451 // Getting data window for the involved processes
452 auto destinationDataWindow = destination->getDataWindow().get();
453
454 // Getting recv message count windows for the involved process
455 auto destinationRecvMessageWindow = destination->getRecvMessageCountWindow().get();
456
457 // Locking MPI window to ensure the messages arrives before returning. This will not exclude other processes from accessing the data (MPI_LOCK_SHARED)
458 if (isDestinationSlotLockAcquired == false) lockMPIWindow(destinationRank, destinationDataWindow, MPI_LOCK_SHARED, MPI_MODE_NOCHECK);
459
460 // Executing the put operation
461 {
462 lock();
463 auto status = MPI_Put(sourcePointer, (int)size, MPI_BYTE, destinationRank, (int)dst_offset, (int)size, MPI_BYTE, *destinationDataWindow);
464 unlock();
465 if (status != MPI_SUCCESS) HICR_THROW_RUNTIME("Failed to run data MPI_Put");
466 }
467
468 // Making sure the operation finished
469 {
470 lock();
471 auto status = MPI_Win_flush(destinationRank, *destinationDataWindow);
472 unlock();
473 if (status != MPI_SUCCESS) HICR_THROW_RUNTIME("Failed to run data MPI_Win_flush");
474 }
475
476 // Unlocking window, if taken, after copy is completed
477 if (isDestinationSlotLockAcquired == false) unlockMPIWindow(destinationRank, destinationDataWindow);
478
479 // Increasing the remote received message counter and local sent message counter
480 increaseMessageSentCounter(*sourceSlot);
481 increaseWindowCounter(destinationRank, destinationRecvMessageWindow);
482 }
483
491 __INLINE__ void queryMemorySlotUpdatesImpl(std::shared_ptr<HiCR::LocalMemorySlot> memorySlot) override {}
492
506 __INLINE__ void deregisterGlobalMemorySlotImpl(const std::shared_ptr<HiCR::GlobalMemorySlot> &memorySlot) override
507 {
508 // Getting up-casted pointer for the slot
509 auto slot = dynamic_pointer_cast<mpi::GlobalMemorySlot>(memorySlot);
510
511 // Checking whether the slot passed is compatible with this backend
512 if (slot == nullptr) HICR_THROW_LOGIC("The memory slot is not supported by this backend\n");
513
514 // Getting the slot information
515 const auto tag = slot->getGlobalTag();
516 const auto key = slot->getGlobalKey();
517
518 // Storing the deregistered slot, and it is guaranteed that the (MPI) type is correct
519 _deregisteredGlobalMemorySlotsTagKeyMap[tag][key] = slot;
520 }
521
525 const MPI_Comm _comm;
526
530 int _size{};
531
535 int _rank{};
536
542 HiCR::CommunicationManager::globalMemorySlotTagKeyMap_t _deregisteredGlobalMemorySlotsTagKeyMap{};
543
544 __INLINE__ void lockMPIWindow(int rank, MPI_Win *window, int MPILockType, int MPIAssert)
545 {
546 // Locking MPI window to ensure the messages arrives before returning
547 int mpiStatus = 0;
548 do {
549 lock();
550 mpiStatus = MPI_Win_lock(MPILockType, rank, MPIAssert, *window) != MPI_SUCCESS;
551 unlock();
552 }
553 while (mpiStatus != MPI_SUCCESS);
554 }
555
556 __INLINE__ void unlockMPIWindow(int rank, MPI_Win *window)
557 {
558 // Unlocking window after copy is completed
559 int mpiStatus = 0;
560 do {
561 lock();
562 mpiStatus = MPI_Win_unlock(rank, *window) != MPI_SUCCESS;
563 unlock();
564 }
565 while (mpiStatus != MPI_SUCCESS);
566 }
567
568 __INLINE__ void increaseWindowCounter(int rank, MPI_Win *window)
569 {
570 // This operation should be possible to do in one go with MPI_Accumulate or MPI_Fetch_and_op. However, the current implementation of openMPI deadlocks
571 // on these operations, so I rather do the whole thing manually.
572
573 // Locking MPI window to ensure the messages arrives before returning
574 lockMPIWindow(rank, window, MPI_LOCK_EXCLUSIVE, 0);
575
576 // Use atomic MPI operation to increment counter
577 const size_t one = 1;
578 size_t value = 0;
579
580 // There is no datatype in MPI for size_t (the counters), but
581 // MPI_AINT is supposed to be large enough and portable
582 lock();
583 auto status = MPI_Fetch_and_op(&one, &value, MPI_AINT, rank, 0, MPI_SUM, *window);
584 unlock();
585
586 // Checking execution status
587 if (status != MPI_SUCCESS) HICR_THROW_RUNTIME("Failed to increase remote message counter (on operation: MPI_Put) for rank %d, MPI Window pointer %p", rank, window);
588
589 // Unlocking window after copy is completed
590 unlockMPIWindow(rank, window);
591 }
592
593 __INLINE__ void memcpyImpl(const std::shared_ptr<HiCR::LocalMemorySlot> &destination,
594 const size_t dst_offset,
595 const std::shared_ptr<HiCR::LocalMemorySlot> &source,
596 const size_t src_offset,
597 const size_t size) override
598 {
599 // Getting slot pointers
600 const auto srcPtr = source->getPointer();
601 const auto dstPtr = destination->getPointer();
602
603 // Calculating actual offsets
604 const auto actualSrcPtr = (void *)(static_cast<uint8_t *>(srcPtr) + src_offset);
605 const auto actualDstPtr = (void *)(static_cast<uint8_t *>(dstPtr) + dst_offset);
606
607 // Running memcpy now
608 std::memcpy(actualDstPtr, actualSrcPtr, size);
609
610 // Increasing recv/send counters
611 increaseMessageRecvCounter(*destination);
613 }
614};
615
616} // namespace HiCR::backend::mpi
Definition communicationManager.hpp:54
std::map< GlobalMemorySlot::tag_t, globalKeyToMemorySlotMap_t > globalMemorySlotTagKeyMap_t
Definition communicationManager.hpp:70
__INLINE__ auto & getGlobalMemorySlotTagKeyMap()
Definition communicationManager.hpp:643
virtual void unlock()
Definition communicationManager.hpp:92
virtual void lock()
Definition communicationManager.hpp:86
__INLINE__ void registerGlobalMemorySlot(const std::shared_ptr< GlobalMemorySlot > &memorySlot)
Definition communicationManager.hpp:503
__INLINE__ void increaseMessageRecvCounter(HiCR::LocalMemorySlot &memorySlot) noexcept
Definition communicationManager.hpp:652
__INLINE__ auto & getGlobalMemorySlotsToDestroyPerTag()
Definition communicationManager.hpp:637
__INLINE__ void increaseMessageSentCounter(HiCR::LocalMemorySlot &memorySlot) noexcept
Definition communicationManager.hpp:659
uint64_t tag_t
Definition globalMemorySlot.hpp:49
uint64_t globalKey_t
Definition globalMemorySlot.hpp:44
Definition communicationManager.hpp:43
const int getRank() const
Definition communicationManager.hpp:78
virtual __INLINE__ void fenceImpl(HiCR::GlobalMemorySlot::tag_t tag) override
Definition communicationManager.hpp:91
const MPI_Comm getComm() const
Definition communicationManager.hpp:66
const int getSize() const
Definition communicationManager.hpp:72
CommunicationManager(MPI_Comm comm=MPI_COMM_WORLD)
Definition communicationManager.hpp:52
virtual __INLINE__ void exchangeGlobalMemorySlotsImpl(HiCR::GlobalMemorySlot::tag_t tag, const std::vector< globalKeyMemorySlotPair_t > &memorySlots) override
Definition communicationManager.hpp:105
Provides a definition for the base backend's communication manager class.
Provides a definition for a HiCR Global Memory Slot class.
Provides a definition for a HiCR Local Memory Slot class.
#define HICR_THROW_RUNTIME(...)
Definition exceptions.hpp:74
#define HICR_THROW_LOGIC(...)
Definition exceptions.hpp:67
#define HICR_THROW_FATAL(...)
Definition exceptions.hpp:81