/home/runner/work/HiCR/HiCR/include/hicr/backends/mpi/communicationManager.hpp Source File

HiCR: /home/runner/work/HiCR/HiCR/include/hicr/backends/mpi/communicationManager.hpp Source File
HiCR
communicationManager.hpp
Go to the documentation of this file.
1/*
2 * Copyright 2025 Huawei Technologies Co., Ltd.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
24#pragma once
25
26#include <mpi.h>
27#include <set>
28#include <hicr/core/definitions.hpp>
31#include "localMemorySlot.hpp"
32#include "globalMemorySlot.hpp"
33
34namespace HiCR::backend::mpi
35{
36
43{
44 public:
45
52 CommunicationManager(MPI_Comm comm = MPI_COMM_WORLD)
53 : HiCR::CommunicationManager(),
54 _comm(comm)
55 {
56 MPI_Comm_size(_comm, &_size);
57 MPI_Comm_rank(_comm, &_rank);
58 }
59
60 ~CommunicationManager() override = default;
61
66 [[nodiscard]] const MPI_Comm getComm() const { return _comm; }
67
72 [[nodiscard]] const int getSize() const { return _size; }
73
78 [[nodiscard]] const int getRank() const { return _rank; }
79
80 protected:
81
91 virtual __INLINE__ void fenceImpl(HiCR::GlobalMemorySlot::tag_t tag) override
92 {
93 MPI_Barrier(_comm);
94
95 // Call the slot destruction collective routine
96 destroyGlobalMemorySlotsCollectiveImpl(tag);
97 }
98
105 virtual __INLINE__ void exchangeGlobalMemorySlotsImpl(HiCR::GlobalMemorySlot::tag_t tag, const std::vector<globalKeyMemorySlotPair_t> &memorySlots) override
106 {
107 // Obtaining local slots to exchange
108 int localSlotCount = (int)memorySlots.size();
109
110 // Obtaining the local slots to exchange per process in the communicator
111 std::vector<int> perProcessSlotCount(_size);
112
113 MPI_Allgather(&localSlotCount, 1, MPI_INT, perProcessSlotCount.data(), 1, MPI_INT, _comm);
114
115 // Calculating respective offsets
116 std::vector<int> perProcessSlotOffsets(_size);
117 int currentOffset = 0;
118 for (int i = 0; i < _size; i++)
119 {
120 perProcessSlotOffsets[i] += currentOffset;
121 currentOffset += perProcessSlotCount[i];
122 }
123
124 // Calculating number of global slots
125 int globalSlotCount = 0;
126 for (const auto count : perProcessSlotCount) globalSlotCount += count;
127
128 // Allocating storage for local and global memory slot sizes, keys and process id
129 std::vector<size_t> localSlotSizes(localSlotCount);
130 std::vector<size_t> globalSlotSizes(globalSlotCount);
131 std::vector<HiCR::GlobalMemorySlot::globalKey_t> localSlotKeys(localSlotCount);
132 std::vector<HiCR::GlobalMemorySlot::globalKey_t> globalSlotKeys(globalSlotCount);
133 std::vector<int> localSlotProcessId(localSlotCount);
134 std::vector<int> globalSlotProcessId(globalSlotCount);
135
136 // Filling in the local size and keys storage
137 for (size_t i = 0; i < memorySlots.size(); i++)
138 {
139 const auto key = memorySlots[i].first;
140 const auto memorySlot = std::dynamic_pointer_cast<HiCR::backend::mpi::LocalMemorySlot>(memorySlots[i].second);
141 if (memorySlot.get() == nullptr) HICR_THROW_LOGIC("Trying to use MPI to promote a non-MPI local memory slot.");
142 localSlotSizes[i] = memorySlot->getSize();
143 localSlotKeys[i] = key;
144 localSlotProcessId[i] = _rank;
145 }
146
147 // Exchanging global sizes, keys and process ids
148 MPI_Allgatherv(
149 localSlotSizes.data(), localSlotCount, MPI_UNSIGNED_LONG, globalSlotSizes.data(), perProcessSlotCount.data(), perProcessSlotOffsets.data(), MPI_UNSIGNED_LONG, _comm);
150 MPI_Allgatherv(
151 localSlotKeys.data(), localSlotCount, MPI_UNSIGNED_LONG, globalSlotKeys.data(), perProcessSlotCount.data(), perProcessSlotOffsets.data(), MPI_UNSIGNED_LONG, _comm);
152 MPI_Allgatherv(localSlotProcessId.data(), localSlotCount, MPI_INT, globalSlotProcessId.data(), perProcessSlotCount.data(), perProcessSlotOffsets.data(), MPI_INT, _comm);
153
154 // Now also creating pointer vector to remember local pointers, when required for memcpys
155 std::vector<void **> globalSlotPointers(globalSlotCount);
156 std::vector<std::shared_ptr<HiCR::LocalMemorySlot>> globalSourceSlots(globalSlotCount);
157 std::vector<std::shared_ptr<HiCR::MemorySpace>> globalMemorySpaces(globalSlotCount);
158 size_t localPointerPos = 0;
159 for (size_t i = 0; i < globalSlotPointers.size(); i++)
160 {
161 // If the rank associated with this slot is remote, don't store the pointer, otherwise store it.
162 if (globalSlotProcessId[i] != _rank)
163 {
164 globalSlotPointers[i] = nullptr;
165 globalSourceSlots[i] = nullptr;
166 globalMemorySpaces[i] = nullptr;
167 }
168 else
169 {
170 const auto memorySlot = memorySlots[localPointerPos++].second;
171 globalSlotPointers[i] = &memorySlot->getPointer();
172 globalSourceSlots[i] = memorySlot;
173 globalMemorySpaces[i] = memorySlot->getMemorySpace();
174 }
175 }
176
177 // Now creating global slots and their MPI windows
178 for (size_t i = 0; i < globalSlotProcessId.size(); i++)
179 {
180 // Creating new memory slot object
181 auto memorySlot = std::make_shared<mpi::GlobalMemorySlot>(globalSlotProcessId[i], tag, globalSlotKeys[i], globalSourceSlots[i]);
182
183 // Allocating MPI windows
184 memorySlot->getDataWindow() = std::make_unique<MPI_Win>();
185 memorySlot->getRecvMessageCountWindow() = std::make_unique<MPI_Win>();
186 memorySlot->getSentMessageCountWindow() = std::make_unique<MPI_Win>();
187
188 // Termporary storage for the pointer returned by MPI_Win_Allocate. We will assign this a new internal storage to the local memory slot
189 void *ptr = nullptr;
190
191 // Creating MPI window for data transferring
192 auto status = MPI_Win_allocate(globalSlotProcessId[i] == _rank ? (int)globalSlotSizes[i] : 0, 1, MPI_INFO_NULL, _comm, &ptr, memorySlot->getDataWindow().get());
193 MPI_Win_set_errhandler(*memorySlot->getDataWindow(), MPI_ERRORS_RETURN);
194
195 // Unfortunately, we need to do an effective duplucation of the original local memory slot storage
196 // since no modern MPI library supports MPI_Win_create over user-allocated storage anymore
197 if (globalSlotProcessId[i] == _rank)
198 {
199 // Copying existing data over to the new storage
200 std::memcpy(ptr, *(globalSlotPointers[i]), globalSlotSizes[i]);
201
202 // Freeing up memory of the old local memory slot
203 // Commented out: it is the user who must free up this local memory slot
204 // MPI_Free_mem(*(globalSlotPointers[i]));
205
206 // Swapping pointers
207 *(globalSlotPointers[i]) = ptr;
208
209 // Registering the pointer as a local memory slot
210 auto newLocalMemorySlot = std::make_shared<HiCR::backend::mpi::LocalMemorySlot>(ptr, globalSlotSizes[i], globalMemorySpaces[i]);
211 memorySlot->setSourceLocalMemorySlot(newLocalMemorySlot);
212 }
213
214 if (status != MPI_SUCCESS) HICR_THROW_RUNTIME("Failed to create MPI data window on exchange global memory slots.");
215
216 // Creating MPI window for message received count transferring
217 status = MPI_Win_allocate(globalSlotProcessId[i] == _rank ? sizeof(size_t) : 0, 1, MPI_INFO_NULL, _comm, &ptr, memorySlot->getRecvMessageCountWindow().get());
218 MPI_Win_set_errhandler(*memorySlot->getRecvMessageCountWindow(), MPI_ERRORS_RETURN);
219
220 if (status != MPI_SUCCESS) HICR_THROW_RUNTIME("Failed to create MPI received message count window on exchange global memory slots.");
221
222 // Creating MPI window for message sent count transferring
223 status = MPI_Win_allocate(globalSlotProcessId[i] == _rank ? sizeof(size_t) : 0, 1, MPI_INFO_NULL, _comm, &ptr, memorySlot->getSentMessageCountWindow().get());
224 MPI_Win_set_errhandler(*memorySlot->getSentMessageCountWindow(), MPI_ERRORS_RETURN);
225
226 if (status != MPI_SUCCESS) HICR_THROW_RUNTIME("Failed to create MPI sent message count window on exchange global memory slots.");
227
228 // Registering global slot
229 registerGlobalMemorySlot(memorySlot);
230 }
231 }
232
233 private:
234
243 __INLINE__ void destroyGlobalMemorySlotImpl(std::shared_ptr<HiCR::GlobalMemorySlot> memorySlotPtr) override
244 {
245 // Getting up-casted pointer for the execution unit
246 auto memorySlot = dynamic_pointer_cast<mpi::GlobalMemorySlot>(memorySlotPtr);
247
248 // Checking whether the execution unit passed is compatible with this backend
249 if (memorySlot == nullptr) HICR_THROW_LOGIC("The memory slot is not supported by this backend\n");
250
251 auto status = MPI_Win_free(memorySlot->getDataWindow().get());
252 if (status != MPI_SUCCESS) HICR_THROW_RUNTIME("On deregister global memory slot, could not free MPI data window");
253
254 status = MPI_Win_free(memorySlot->getRecvMessageCountWindow().get());
255 if (status != MPI_SUCCESS) HICR_THROW_RUNTIME("On deregister global memory slot, could not free MPI recv message count window");
256
257 status = MPI_Win_free(memorySlot->getSentMessageCountWindow().get());
258 if (status != MPI_SUCCESS) HICR_THROW_RUNTIME("On deregister global memory slot, could not free MPI sent message count window");
259 }
260
261 __INLINE__ bool acquireGlobalLockImpl(std::shared_ptr<HiCR::GlobalMemorySlot> memorySlot) override
262 {
263 // Getting up-casted pointer for the execution unit
264 auto m = dynamic_pointer_cast<mpi::GlobalMemorySlot>(memorySlot);
265
266 // Checking whether the execution unit passed is compatible with this backend
267 if (m == nullptr) HICR_THROW_LOGIC("The passed memory slot is not supported by this backend\n");
268
269 // Locking access to all relevant memory slot windows
270 lockMPIWindow(m->getRank(), m->getDataWindow().get(), MPI_LOCK_EXCLUSIVE, 0);
271
272 // Setting memory slot lock as aquired
273 m->setLockAcquiredValue(true);
274
275 // This function is assumed to always succeed
276 return true;
277 }
278
279 __INLINE__ void releaseGlobalLockImpl(std::shared_ptr<HiCR::GlobalMemorySlot> memorySlot) override
280 {
281 // Getting up-casted pointer for the execution unit
282 auto m = dynamic_pointer_cast<mpi::GlobalMemorySlot>(memorySlot);
283
284 // Checking whether the execution unit passed is compatible with this backend
285 if (m == nullptr) HICR_THROW_LOGIC("The passed memory slot is not supported by this backend\n");
286
287 // Releasing access to all relevant memory slot windows
288 unlockMPIWindow(m->getRank(), m->getDataWindow().get());
289
290 // Setting memory slot lock as released
291 m->setLockAcquiredValue(false);
292 }
293
294 std::shared_ptr<HiCR::GlobalMemorySlot> getGlobalMemorySlotImpl(HiCR::GlobalMemorySlot::tag_t tag, HiCR::GlobalMemorySlot::globalKey_t globalKey) override { return nullptr; }
295
296 __INLINE__ void destroyGlobalMemorySlotsCollectiveImpl(HiCR::GlobalMemorySlot::tag_t tag)
297 {
298 // Destruction of global memory slots marked for destruction
299 // note: MPI expects int, not size_t as the parameter for allgather which we use here, so we have to work with int
300 int localDestroySlotsCount = (int)getGlobalMemorySlotsToDestroyPerTag()[tag].size();
301 std::vector<int> perProcessDestroySlotCount(_size);
302
303 // Obtaining the number of slots to destroy per process in the communicator
304 MPI_Allgather(&localDestroySlotsCount, 1, MPI_INT, perProcessDestroySlotCount.data(), 1, MPI_INT, _comm);
305
306 // Calculating respective offsets; TODO fix offset types for both this method and exchangeGlobalMemorySlotsImpl
307 std::vector<int> perProcessSlotOffsets(_size);
308 int currentOffset = 0;
309 for (int i = 0; i < _size; i++)
310 {
311 perProcessSlotOffsets[i] += currentOffset;
312 currentOffset += perProcessDestroySlotCount[i];
313 }
314
315 // Calculating number of global slots to destroy
316 int globalDestroySlotsCount = 0;
317 for (const auto count : perProcessDestroySlotCount) globalDestroySlotsCount += count;
318
319 // If there are no slots to destroy from any instance, return to avoid a second round of collectives
320 if (globalDestroySlotsCount == 0) return;
321
322 // Allocating storage for global memory slot keys
323 std::vector<HiCR::GlobalMemorySlot::globalKey_t> localDestroySlotKeys(localDestroySlotsCount);
324 std::vector<HiCR::GlobalMemorySlot::globalKey_t> globalDestroySlotKeys(globalDestroySlotsCount);
325
326 // Filling in the local keys storage
327 for (auto i = 0; i < localDestroySlotsCount; i++)
328 {
329 const auto memorySlot = getGlobalMemorySlotsToDestroyPerTag()[tag][i];
330 const auto key = memorySlot->getGlobalKey();
331 localDestroySlotKeys[i] = key;
332 }
333
334 // Exchanging global keys
335 MPI_Allgatherv(localDestroySlotKeys.data(),
336 localDestroySlotsCount,
337 MPI_UNSIGNED_LONG,
338 globalDestroySlotKeys.data(),
339 perProcessDestroySlotCount.data(),
340 perProcessSlotOffsets.data(),
341 MPI_UNSIGNED_LONG,
342 _comm);
343
344 // Deduplicating the global keys, as more than one process might want to destroy the same key
345 std::set<HiCR::GlobalMemorySlot::globalKey_t> globalDestroySlotKeysSet(globalDestroySlotKeys.begin(), globalDestroySlotKeys.end());
346
347 // Now we can iterate over the global slots to destroy one by one
348 for (auto key : globalDestroySlotKeysSet)
349 {
350 std::shared_ptr<HiCR::GlobalMemorySlot> memorySlot = nullptr;
351 // Getting the memory slot to destroy
352 // First check the standard map
353 if (getGlobalMemorySlotTagKeyMap()[tag].contains(key))
354 {
355 memorySlot = getGlobalMemorySlotTagKeyMap()[tag].at(key);
356 // Deregister because a later destroy will try and fail to destroy
357 getGlobalMemorySlotTagKeyMap()[tag].erase(key);
358 }
359 // If not found, check the deregistered map
360 else if (_deregisteredGlobalMemorySlotsTagKeyMap[tag].contains(key))
361 {
362 memorySlot = _deregisteredGlobalMemorySlotsTagKeyMap[tag].at(key);
363 _deregisteredGlobalMemorySlotsTagKeyMap[tag].erase(key);
364 }
365 else
366 HICR_THROW_FATAL("Could not find memory slot to destroy in this backend. Tag: %d, Key: %lu", tag, key);
367
368 // Destroying the memory slot collectively; there might be a case where the slot is not found, due to double calls to destroy
369 destroyGlobalMemorySlotImpl(memorySlot);
370 }
371 }
372
373 __INLINE__ void memcpyImpl(const std::shared_ptr<HiCR::LocalMemorySlot> &destinationSlot,
374 size_t dst_offset,
375 const std::shared_ptr<HiCR::GlobalMemorySlot> &sourceSlotPtr,
376 size_t sourceOffset,
377 size_t size) override
378 {
379 // Getting up-casted pointer for the execution unit
380 auto source = dynamic_pointer_cast<mpi::GlobalMemorySlot>(sourceSlotPtr);
381
382 // Checking whether the execution unit passed is compatible with this backend
383 if (source == nullptr) HICR_THROW_LOGIC("The passed source memory slot is not supported by this backend\n");
384
385 // Getting ranks for the involved processes
386 const auto sourceRank = source->getRank();
387
388 // Check if we already acquired a lock on the memory slots
389 bool isSourceSlotLockAcquired = source->getLockAcquiredValue();
390
391 // Calculating pointer
392 auto destinationPointer = (void *)(static_cast<uint8_t *>(destinationSlot->getPointer()) + dst_offset);
393
394 // Getting data window for the involved processes
395 auto sourceDataWindow = source->getDataWindow().get();
396
397 // Getting recv message count window for the involved processes
398 auto sourceSentMessageWindow = source->getSentMessageCountWindow().get();
399
400 // Locking MPI window to ensure the messages arrives before returning. This will not exclude other processes from accessing the data (MPI_LOCK_SHARED)
401 if (isSourceSlotLockAcquired == false) lockMPIWindow(sourceRank, sourceDataWindow, MPI_LOCK_SHARED, MPI_MODE_NOCHECK);
402
403 // Executing the get operation
404 {
405 auto status = MPI_Get(destinationPointer, (int)size, MPI_BYTE, sourceRank, (int)sourceOffset, (int)size, MPI_BYTE, *sourceDataWindow);
406
407 if (status != MPI_SUCCESS) HICR_THROW_RUNTIME("Failed to run MPI_Get");
408 }
409
410 // Making sure the operation finished
411 {
412 auto status = MPI_Win_flush(sourceRank, *sourceDataWindow);
413
414 if (status != MPI_SUCCESS) HICR_THROW_RUNTIME("Failed to run MPI_Win_flush");
415 }
416
417 // Unlocking window, if taken, after copy is completed
418 if (isSourceSlotLockAcquired == false) unlockMPIWindow(sourceRank, sourceDataWindow);
419
420 // Increasing the remote sent message counter and local destination received message counter
421 increaseWindowCounter(sourceRank, sourceSentMessageWindow);
422 increaseMessageRecvCounter(*destinationSlot);
423 }
424
425 __INLINE__ void memcpyImpl(const std::shared_ptr<HiCR::GlobalMemorySlot> &destinationSlotPtr,
426 size_t dst_offset,
427 const std::shared_ptr<HiCR::LocalMemorySlot> &sourceSlot,
428 size_t sourceOffset,
429 size_t size) override
430 {
431 // Getting up-casted pointer for the execution unit
432 auto destination = dynamic_pointer_cast<mpi::GlobalMemorySlot>(destinationSlotPtr);
433
434 // Checking whether the execution unit passed is compatible with this backend
435 if (destination == nullptr) HICR_THROW_LOGIC("The passed destination memory slot is not supported by this backend\n");
436
437 // Getting ranks for the involved processes
438 const auto destinationRank = destination->getRank();
439
440 // Check if we already acquired a lock on the memory slots
441 bool isDestinationSlotLockAcquired = destination->getLockAcquiredValue();
442
443 // Calculating pointers
444 auto sourcePointer = (void *)(static_cast<uint8_t *>(sourceSlot->getPointer()) + sourceOffset);
445
446 // Getting data window for the involved processes
447 auto destinationDataWindow = destination->getDataWindow().get();
448
449 // Getting recv message count windows for the involved process
450 auto destinationRecvMessageWindow = destination->getRecvMessageCountWindow().get();
451
452 // Locking MPI window to ensure the messages arrives before returning. This will not exclude other processes from accessing the data (MPI_LOCK_SHARED)
453 if (isDestinationSlotLockAcquired == false) lockMPIWindow(destinationRank, destinationDataWindow, MPI_LOCK_SHARED, MPI_MODE_NOCHECK);
454
455 // Executing the put operation
456 {
457 auto status = MPI_Put(sourcePointer, (int)size, MPI_BYTE, destinationRank, (int)dst_offset, (int)size, MPI_BYTE, *destinationDataWindow);
458
459 if (status != MPI_SUCCESS) HICR_THROW_RUNTIME("Failed to run data MPI_Put");
460 }
461
462 // Making sure the operation finished
463 {
464 auto status = MPI_Win_flush(destinationRank, *destinationDataWindow);
465
466 if (status != MPI_SUCCESS) HICR_THROW_RUNTIME("Failed to run data MPI_Win_flush");
467 }
468
469 // Unlocking window, if taken, after copy is completed
470 if (isDestinationSlotLockAcquired == false) unlockMPIWindow(destinationRank, destinationDataWindow);
471
472 // Increasing the remote received message counter and local sent message counter
473 increaseMessageSentCounter(*sourceSlot);
474 increaseWindowCounter(destinationRank, destinationRecvMessageWindow);
475 }
476
484 __INLINE__ void queryMemorySlotUpdatesImpl(std::shared_ptr<HiCR::LocalMemorySlot> memorySlot) override {}
485
499 __INLINE__ void deregisterGlobalMemorySlotImpl(const std::shared_ptr<HiCR::GlobalMemorySlot> &memorySlot) override
500 {
501 // Getting up-casted pointer for the slot
502 auto slot = dynamic_pointer_cast<mpi::GlobalMemorySlot>(memorySlot);
503
504 // Checking whether the slot passed is compatible with this backend
505 if (slot == nullptr) HICR_THROW_LOGIC("The memory slot is not supported by this backend\n");
506
507 // Getting the slot information
508 const auto tag = slot->getGlobalTag();
509 const auto key = slot->getGlobalKey();
510
511 // Storing the deregistered slot, and it is guaranteed that the (MPI) type is correct
512 _deregisteredGlobalMemorySlotsTagKeyMap[tag][key] = slot;
513 }
514
518 const MPI_Comm _comm;
519
523 int _size{};
524
528 int _rank{};
529
535 HiCR::CommunicationManager::globalMemorySlotTagKeyMap_t _deregisteredGlobalMemorySlotsTagKeyMap{};
536
537 __INLINE__ void lockMPIWindow(int rank, MPI_Win *window, int MPILockType, int MPIAssert)
538 {
539 // Locking MPI window to ensure the messages arrives before returning
540 int mpiStatus = MPI_Win_lock(MPILockType, rank, MPIAssert, *window);
541 if (mpiStatus != MPI_SUCCESS)
542 {
543 char err_string[MPI_MAX_ERROR_STRING];
544 int len;
545 MPI_Error_string(mpiStatus, err_string, &len);
546 HICR_THROW_LOGIC("MPI_Win_lock failed for rank %d: %s", rank, err_string);
547 }
548 }
549
550 __INLINE__ void unlockMPIWindow(int rank, MPI_Win *window)
551 {
552 // Unlocking window after copy is completed
553 int mpiStatus = MPI_Win_unlock(rank, *window);
554 if (mpiStatus != MPI_SUCCESS)
555 {
556 char err_string[MPI_MAX_ERROR_STRING];
557 int len;
558 MPI_Error_string(mpiStatus, err_string, &len);
559 HICR_THROW_LOGIC("MPI_Win_unlock failed for rank %d: %s", rank, err_string);
560 }
561 }
562
563 __INLINE__ void increaseWindowCounter(int rank, MPI_Win *window)
564 {
565 // This operation should be possible to do in one go with MPI_Accumulate or MPI_Fetch_and_op. However, the current implementation of openMPI deadlocks
566 // on these operations, so I rather do the whole thing manually.
567
568 // Locking MPI window to ensure the messages arrives before returning
569 lockMPIWindow(rank, window, MPI_LOCK_EXCLUSIVE, 0);
570
571 // Use atomic MPI operation to increment counter
572 const size_t one = 1;
573 size_t value = 0;
574
575 // There is no datatype in MPI for size_t (the counters), but
576 // MPI_AINT is supposed to be large enough and portable
577 auto status = MPI_Fetch_and_op(&one, &value, MPI_AINT, rank, 0, MPI_SUM, *window);
578
579 // Checking execution status
580 if (status != MPI_SUCCESS) HICR_THROW_RUNTIME("Failed to increase remote message counter (on operation: MPI_Put) for rank %d, MPI Window pointer %p", rank, window);
581
582 // Unlocking window after copy is completed
583 unlockMPIWindow(rank, window);
584 }
585
586 __INLINE__ void memcpyImpl(const std::shared_ptr<HiCR::LocalMemorySlot> &destination,
587 const size_t dst_offset,
588 const std::shared_ptr<HiCR::LocalMemorySlot> &source,
589 const size_t src_offset,
590 const size_t size) override
591 {
592 // Getting slot pointers
593 const auto srcPtr = source->getPointer();
594 const auto dstPtr = destination->getPointer();
595
596 // Calculating actual offsets
597 const auto actualSrcPtr = (void *)(static_cast<uint8_t *>(srcPtr) + src_offset);
598 const auto actualDstPtr = (void *)(static_cast<uint8_t *>(dstPtr) + dst_offset);
599
600 // Running memcpy now
601 std::memcpy(actualDstPtr, actualSrcPtr, size);
602
603 // Increasing recv/send counters
604 increaseMessageRecvCounter(*destination);
606 }
607};
608
609} // namespace HiCR::backend::mpi
Definition communicationManager.hpp:54
std::map< GlobalMemorySlot::tag_t, globalKeyToMemorySlotMap_t > globalMemorySlotTagKeyMap_t
Definition communicationManager.hpp:70
__INLINE__ auto & getGlobalMemorySlotTagKeyMap()
Definition communicationManager.hpp:475
__INLINE__ void registerGlobalMemorySlot(const std::shared_ptr< GlobalMemorySlot > &memorySlot)
Definition communicationManager.hpp:495
__INLINE__ void increaseMessageRecvCounter(HiCR::LocalMemorySlot &memorySlot) noexcept
Definition communicationManager.hpp:636
__INLINE__ auto & getGlobalMemorySlotsToDestroyPerTag()
Definition communicationManager.hpp:629
__INLINE__ void increaseMessageSentCounter(HiCR::LocalMemorySlot &memorySlot) noexcept
Definition communicationManager.hpp:643
uint64_t tag_t
Definition globalMemorySlot.hpp:49
uint64_t globalKey_t
Definition globalMemorySlot.hpp:44
Definition communicationManager.hpp:43
const int getRank() const
Definition communicationManager.hpp:78
virtual __INLINE__ void fenceImpl(HiCR::GlobalMemorySlot::tag_t tag) override
Definition communicationManager.hpp:91
const MPI_Comm getComm() const
Definition communicationManager.hpp:66
const int getSize() const
Definition communicationManager.hpp:72
CommunicationManager(MPI_Comm comm=MPI_COMM_WORLD)
Definition communicationManager.hpp:52
virtual __INLINE__ void exchangeGlobalMemorySlotsImpl(HiCR::GlobalMemorySlot::tag_t tag, const std::vector< globalKeyMemorySlotPair_t > &memorySlots) override
Definition communicationManager.hpp:105
Provides a definition for the base backend's communication manager class.
Provides a definition for a HiCR Global Memory Slot class.
Provides a definition for a HiCR Local Memory Slot class.
#define HICR_THROW_RUNTIME(...)
Definition exceptions.hpp:74
#define HICR_THROW_LOGIC(...)
Definition exceptions.hpp:67
#define HICR_THROW_FATAL(...)
Definition exceptions.hpp:81