/home/runner/work/HiCR/HiCR/include/hicr/backends/mpi/communicationManager.hpp Source File
HiCR
|
communicationManager.hpp
Go to the documentation of this file.
102 HiCR::CommunicationManager::globalMemorySlotTagKeyMap_t _deregisteredGlobalMemorySlotsTagKeyMap{};
130 // This operation should be possible to do in one go with MPI_Accumulate or MPI_Fetch_and_op. However, the current implementation of openMPI deadlocks
147 if (status != MPI_SUCCESS) HICR_THROW_RUNTIME("Failed to increase remote message counter (on operation: MPI_Put) for rank %d, MPI Window pointer %p", rank, window);
185 if (source == nullptr) HICR_THROW_LOGIC("The passed source memory slot is not supported by this backend\n");
194 auto destinationPointer = (void *)(static_cast<uint8_t *>(destinationSlot->getPointer()) + dst_offset);
202 // Locking MPI window to ensure the messages arrives before returning. This will not exclude other processes from accessing the data (MPI_LOCK_SHARED)
203 if (isSourceSlotLockAcquired == false) lockMPIWindow(sourceRank, sourceDataWindow, MPI_LOCK_SHARED, MPI_MODE_NOCHECK);
208 auto status = MPI_Get(destinationPointer, (int)size, MPI_BYTE, sourceRank, (int)sourceOffset, (int)size, MPI_BYTE, *sourceDataWindow);
239 if (destination == nullptr) HICR_THROW_LOGIC("The passed destination memory slot is not supported by this backend\n");
256 // Locking MPI window to ensure the messages arrives before returning. This will not exclude other processes from accessing the data (MPI_LOCK_SHARED)
257 if (isDestinationSlotLockAcquired == false) lockMPIWindow(destinationRank, destinationDataWindow, MPI_LOCK_SHARED, MPI_MODE_NOCHECK);
262 auto status = MPI_Put(sourcePointer, (int)size, MPI_BYTE, destinationRank, (int)dst_offset, (int)size, MPI_BYTE, *destinationDataWindow);
276 if (isDestinationSlotLockAcquired == false) unlockMPIWindow(destinationRank, destinationDataWindow);
290 __INLINE__ void queryMemorySlotUpdatesImpl(std::shared_ptr<HiCR::LocalMemorySlot> memorySlot) override {}
305 __INLINE__ void deregisterGlobalMemorySlotImpl(const std::shared_ptr<HiCR::GlobalMemorySlot> &memorySlot) override
339 // note: MPI expects int, not size_t as the parameter for allgather which we use here, so we have to work with int
344 MPI_Allgather(&localDestroySlotsCount, 1, MPI_INT, perProcessDestroySlotCount.data(), 1, MPI_INT, _comm);
346 // Calculating respective offsets; TODO fix offset types for both this method and exchangeGlobalMemorySlotsImpl
359 // If there are no slots to destroy from any instance, return to avoid a second round of collectives
364 std::vector<HiCR::GlobalMemorySlot::globalKey_t> globalDestroySlotKeys(globalDestroySlotsCount);
385 std::set<HiCR::GlobalMemorySlot::globalKey_t> globalDestroySlotKeysSet(globalDestroySlotKeys.begin(), globalDestroySlotKeys.end());
406 HICR_THROW_FATAL("Could not find memory slot to destroy in this backend. Tag: %d, Key: %lu", tag, key);
408 // Destroying the memory slot collectively; there might be a case where the slot is not found, due to double calls to destroy
413 __INLINE__ void exchangeGlobalMemorySlotsImpl(HiCR::GlobalMemorySlot::tag_t tag, const std::vector<globalKeyMemorySlotPair_t> &memorySlots) override
449 const auto memorySlot = std::dynamic_pointer_cast<HiCR::backend::mpi::LocalMemorySlot>(memorySlots[i].second);
450 if (memorySlot.get() == nullptr) HICR_THROW_LOGIC("Trying to use MPI to promote a non-MPI local memory slot.");
459 localSlotSizes.data(), localSlotCount, MPI_UNSIGNED_LONG, globalSlotSizes.data(), perProcessSlotCount.data(), perProcessSlotOffsets.data(), MPI_UNSIGNED_LONG, _comm);
461 localSlotKeys.data(), localSlotCount, MPI_UNSIGNED_LONG, globalSlotKeys.data(), perProcessSlotCount.data(), perProcessSlotOffsets.data(), MPI_UNSIGNED_LONG, _comm);
462 MPI_Allgatherv(localSlotProcessId.data(), localSlotCount, MPI_INT, globalSlotProcessId.data(), perProcessSlotCount.data(), perProcessSlotOffsets.data(), MPI_INT, _comm);
471 // If the rank associated with this slot is remote, don't store the pointer, otherwise store it.
489 auto memorySlot = std::make_shared<mpi::GlobalMemorySlot>(globalSlotProcessId[i], tag, globalSlotKeys[i], globalSourceSlots[i]);
496 // Termporary storage for the pointer returned by MPI_Win_Allocate. We will assign this a new internal storage to the local memory slot
501 auto status = MPI_Win_allocate(globalSlotProcessId[i] == _rank ? (int)globalSlotSizes[i] : 0, 1, MPI_INFO_NULL, _comm, &ptr, memorySlot->getDataWindow().get());
505 // Unfortunately, we need to do an effective duplucation of the original local memory slot storage
521 if (status != MPI_SUCCESS) HICR_THROW_RUNTIME("Failed to create MPI data window on exchange global memory slots.");
525 status = MPI_Win_allocate(globalSlotProcessId[i] == _rank ? sizeof(size_t) : 0, 1, MPI_INFO_NULL, _comm, &ptr, memorySlot->getRecvMessageCountWindow().get());
529 if (status != MPI_SUCCESS) HICR_THROW_RUNTIME("Failed to create MPI received message count window on exchange global memory slots.");
533 status = MPI_Win_allocate(globalSlotProcessId[i] == _rank ? sizeof(size_t) : 0, 1, MPI_INFO_NULL, _comm, &ptr, memorySlot->getSentMessageCountWindow().get());
537 if (status != MPI_SUCCESS) HICR_THROW_RUNTIME("Failed to create MPI sent message count window on exchange global memory slots.");
552 __INLINE__ void destroyGlobalMemorySlotImpl(std::shared_ptr<HiCR::GlobalMemorySlot> memorySlotPtr) override
558 if (memorySlot == nullptr) HICR_THROW_LOGIC("The memory slot is not supported by this backend\n");
561 if (status != MPI_SUCCESS) HICR_THROW_RUNTIME("On deregister global memory slot, could not free MPI data window");
564 if (status != MPI_SUCCESS) HICR_THROW_RUNTIME("On deregister global memory slot, could not free MPI recv message count window");
567 if (status != MPI_SUCCESS) HICR_THROW_RUNTIME("On deregister global memory slot, could not free MPI sent message count window");
570 __INLINE__ bool acquireGlobalLockImpl(std::shared_ptr<HiCR::GlobalMemorySlot> memorySlot) override
576 if (m == nullptr) HICR_THROW_LOGIC("The passed memory slot is not supported by this backend\n");
588 __INLINE__ void releaseGlobalLockImpl(std::shared_ptr<HiCR::GlobalMemorySlot> memorySlot) override
594 if (m == nullptr) HICR_THROW_LOGIC("The passed memory slot is not supported by this backend\n");
603 std::shared_ptr<HiCR::GlobalMemorySlot> getGlobalMemorySlotImpl(HiCR::GlobalMemorySlot::tag_t tag, HiCR::GlobalMemorySlot::globalKey_t globalKey) override { return nullptr; }
Definition communicationManager.hpp:54
std::map< GlobalMemorySlot::tag_t, globalKeyToMemorySlotMap_t > globalMemorySlotTagKeyMap_t
Definition communicationManager.hpp:70
__INLINE__ auto & getGlobalMemorySlotTagKeyMap()
Definition communicationManager.hpp:643
__INLINE__ void registerGlobalMemorySlot(const std::shared_ptr< GlobalMemorySlot > &memorySlot)
Definition communicationManager.hpp:503
__INLINE__ void increaseMessageRecvCounter(HiCR::LocalMemorySlot &memorySlot) noexcept
Definition communicationManager.hpp:652
__INLINE__ auto & getGlobalMemorySlotsToDestroyPerTag()
Definition communicationManager.hpp:637
__INLINE__ void increaseMessageSentCounter(HiCR::LocalMemorySlot &memorySlot) noexcept
Definition communicationManager.hpp:659
Definition communicationManager.hpp:43
const int getRank() const
Definition communicationManager.hpp:78
const MPI_Comm getComm() const
Definition communicationManager.hpp:66
~CommunicationManager() override=default
const int getSize() const
Definition communicationManager.hpp:72
CommunicationManager(MPI_Comm comm=MPI_COMM_WORLD)
Definition communicationManager.hpp:52
Provides a definition for the base backend's communication manager class.
Provides a definition for a HiCR Global Memory Slot class.
Provides a definition for a HiCR Local Memory Slot class.
Generated by