/home/runner/work/HiCR/HiCR/include/hicr/backends/lpf/communicationManager.hpp Source File

HiCR: /home/runner/work/HiCR/HiCR/include/hicr/backends/lpf/communicationManager.hpp Source File
HiCR
communicationManager.hpp
Go to the documentation of this file.
1/*
2 * Copyright 2025 Huawei Technologies Co., Ltd.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
25#pragma once
26
27#include <cstring>
28#include <set>
29#include <lpf/collectives.h>
30#include <lpf/core.h>
31#include <lpf/noc.h>
34#include "globalMemorySlot.hpp"
35#include "localMemorySlot.hpp"
36
37namespace HiCR::backend::lpf
38{
39
46{
47 public:
48
60 CommunicationManager(size_t size, size_t rank, lpf_t lpf)
61 : HiCR::CommunicationManager(),
62 _size(size),
63 _rank((lpf_pid_t)rank),
64 _lpf(lpf),
65 _localSwapSlot(LPF_INVALID_MEMSLOT)
66 {}
67
77 __INLINE__ uint8_t *serializeGlobalMemorySlot(const std::shared_ptr<HiCR::GlobalMemorySlot> &globalSlot) const override
78 {
79 char *serialized;
80 size_t size = 0;
81 auto lpfSlot = dynamic_pointer_cast<lpf::GlobalMemorySlot>(globalSlot);
82 if (lpfSlot == nullptr) HICR_THROW_LOGIC("The memory slot is not compatible with this backend\n");
83
84 lpf_memslot_t lpfMemSlot = lpfSlot->getLPFSlot();
85
86 // This produces an allocated buffer that needs to be freed
87 CHECK(lpf_noc_serialize_slot(_lpf, lpfMemSlot, &serialized, &size));
88
89 uint8_t *ret = new uint8_t[sizeof(lpf_pid_t) + size + sizeof(size_t)];
90 std::memcpy(ret, &_rank, sizeof(lpf_pid_t));
91 std::memcpy(ret + sizeof(lpf_pid_t), &size, sizeof(size_t));
92 std::memcpy(ret + sizeof(lpf_pid_t) + sizeof(size_t), serialized, size);
93
94 // TODO: this is a temporary fix, the LPF API should be fixed to free the buffer
95 free(serialized);
96
97 return ret;
98 }
99
109 __INLINE__ std::shared_ptr<HiCR::GlobalMemorySlot> deserializeGlobalMemorySlot(uint8_t *buffer, GlobalMemorySlot::tag_t tag) override
110 {
111 // first <sizeof(lpf_pid_t)> bytes are the rank, followed by the size of the lpf slot buffer, the rest is the lpf slot buffer
112 size_t size;
113 lpf_pid_t rank;
114 lpf_memslot_t slot = LPF_INVALID_MEMSLOT;
115
116 std::memcpy(&rank, buffer, sizeof(lpf_pid_t));
117 std::memcpy(&size, buffer + sizeof(lpf_pid_t), sizeof(size_t));
118 uint8_t *serialized = buffer + sizeof(lpf_pid_t) + sizeof(size_t);
119
120 CHECK(lpf_noc_register(_lpf, nullptr, 0, &slot));
121
122 CHECK(lpf_noc_deserialize_slot(_lpf, (char *)serialized, slot));
123
124 return std::make_shared<HiCR::backend::lpf::GlobalMemorySlot>(rank, slot, LPF_INVALID_MEMSLOT, tag);
125 }
126
134 __INLINE__ std::shared_ptr<HiCR::GlobalMemorySlot> promoteLocalMemorySlot(const std::shared_ptr<HiCR::LocalMemorySlot> &memorySlot, HiCR::GlobalMemorySlot::tag_t tag) override
135 {
136 auto lpfSlot = dynamic_pointer_cast<lpf::LocalMemorySlot>(memorySlot);
137 if (lpfSlot == nullptr) HICR_THROW_LOGIC("The memory slot is not supported by this backend\n");
138
139 lpf_memslot_t promotedSlot = LPF_INVALID_MEMSLOT;
140 lpf_memslot_t lpfSwapSlot = LPF_INVALID_MEMSLOT;
141
142 void *ptr = lpfSlot->getPointer();
143 size_t size = lpfSlot->getSize();
144 void *lpfSwapSlotPtr = lpfSlot->getLPFSwapPointer();
145
146 CHECK(lpf_noc_register(_lpf, ptr, size, &promotedSlot));
147 CHECK(lpf_noc_register(_lpf, lpfSwapSlotPtr, sizeof(uint64_t), &lpfSwapSlot)); // TODO: check if sizeof(uint64_t) is correct
148
149 return std::make_shared<HiCR::backend::lpf::GlobalMemorySlot>(_rank, promotedSlot, lpfSwapSlot, tag, 0 /* global key */, memorySlot);
150 }
151
159 __INLINE__ void destroyPromotedGlobalMemorySlot(const std::shared_ptr<HiCR::GlobalMemorySlot> &memorySlot) override
160 {
161 // Getting up-casted pointer for the global memory slot
162 auto lpfSlot = dynamic_pointer_cast<lpf::GlobalMemorySlot>(memorySlot);
163
164 // Checking whether the memory slot passed is compatible with this backend
165 if (lpfSlot == nullptr) HICR_THROW_LOGIC("The memory slot is not supported by this backend\n");
166
167 CHECK(lpf_noc_deregister(_lpf, lpfSlot->getLPFSlot()));
168
169 // If the swap slot is not LPF_INVALID_MEMSLOT, deregister it
170 if (lpfSlot->getLPFSwapSlot() != LPF_INVALID_MEMSLOT) CHECK(lpf_noc_deregister(_lpf, lpfSlot->getLPFSwapSlot()));
171 }
172
173 private:
174
175 const size_t _size;
176 const lpf_pid_t _rank;
177 const lpf_t _lpf;
178
183 uint64_t _localSwap{0ULL};
184
188 lpf_memslot_t _localSwapSlot;
189
190 std::shared_ptr<HiCR::GlobalMemorySlot> getGlobalMemorySlotImpl(HiCR::GlobalMemorySlot::tag_t tag, HiCR::GlobalMemorySlot::globalKey_t globalKey) override { return nullptr; }
191
192 __INLINE__ void exchangeGlobalMemorySlotsImpl(HiCR::GlobalMemorySlot::tag_t tag, const std::vector<globalKeyMemorySlotPair_t> &memorySlots) override
193 {
194 // Obtaining local slots to exchange
195 size_t localSlotCount = memorySlots.size();
201 lpf_coll_t coll;
202 auto globalSlotCounts = std::vector<size_t>(_size);
203 lpf_memslot_t src_slot = LPF_INVALID_MEMSLOT;
204 lpf_memslot_t dst_slot = LPF_INVALID_MEMSLOT;
205 lpf_memslot_t slot_local_sizes = LPF_INVALID_MEMSLOT;
206 lpf_memslot_t slot_local_keys = LPF_INVALID_MEMSLOT;
207 lpf_memslot_t slot_global_sizes = LPF_INVALID_MEMSLOT;
208 lpf_memslot_t slot_global_keys = LPF_INVALID_MEMSLOT;
209 lpf_memslot_t slot_local_process_ids = LPF_INVALID_MEMSLOT;
210 lpf_memslot_t slot_global_process_ids = LPF_INVALID_MEMSLOT;
211
212 CHECK(lpf_register_global(_lpf, &localSlotCount, sizeof(size_t), &src_slot));
213 CHECK(lpf_register_global(_lpf, globalSlotCounts.data(), sizeof(size_t) * _size, &dst_slot));
214 CHECK(lpf_collectives_init(_lpf, _rank, _size, 1, 0, sizeof(size_t) * _size, &coll));
215 CHECK(lpf_allgather(coll, src_slot, dst_slot, sizeof(size_t), false));
216
217 CHECK(lpf_sync(_lpf, LPF_SYNC_DEFAULT));
218 CHECK(lpf_collectives_destroy(coll));
219 CHECK(lpf_deregister(_lpf, src_slot));
220 CHECK(lpf_deregister(_lpf, dst_slot));
221 // end allgather block
222
223 size_t globalSlotCount = 0;
224 for (size_t i = 0; i < _size; i++) globalSlotCount += globalSlotCounts[i];
225
226 auto globalSlotCountsInBytes = std::vector<size_t>(_size);
227 for (size_t i = 0; i < _size; i++) globalSlotCountsInBytes[i] = globalSlotCounts[i] * sizeof(size_t);
228
229 auto globalSlotPidCountsInBytes = std::vector<size_t>(_size);
230 for (size_t i = 0; i < _size; i++) globalSlotPidCountsInBytes[i] = globalSlotCounts[i] * sizeof(lpf_pid_t);
231
232 // globalSlotSizes will hold exactly the union of all slot sizes at
233 // each process (zero or more) to become global.
234
235 auto localSlotSizes = std::vector<size_t>(localSlotCount);
236 auto localSlotKeys = std::vector<HiCR::GlobalMemorySlot::globalKey_t>(localSlotCount);
237 auto localSlotProcessId = std::vector<lpf_pid_t>(localSlotCount);
238 auto globalSlotSizes = std::vector<size_t>(globalSlotCount);
239 auto globalSwapSlotSizes = std::vector<size_t>(globalSlotCount);
240 auto globalSlotKeys = std::vector<HiCR::GlobalMemorySlot::globalKey_t>(globalSlotCount);
241 auto globalSlotProcessId = std::vector<lpf_pid_t>(globalSlotCount);
242
243 for (size_t i = 0; i < localSlotCount; i++)
244 {
245 const auto key = memorySlots[i].first;
246 const auto memorySlot = memorySlots[i].second;
247 localSlotSizes[i] = memorySlot->getSize();
248 localSlotKeys[i] = key;
249 localSlotProcessId[i] = _rank;
250 }
251
252 CHECK(lpf_register_local(_lpf, localSlotSizes.data(), localSlotCount * sizeof(size_t), &slot_local_sizes));
253 CHECK(lpf_register_global(_lpf, globalSlotSizes.data(), globalSlotCount * sizeof(size_t), &slot_global_sizes));
254 CHECK(lpf_sync(_lpf, LPF_SYNC_DEFAULT));
255
256 // start allgatherv for global slot counts in bytes
257 CHECK(lpf_collectives_init(_lpf, _rank, _size, 2 /* will call gatherv 2 times */, 0, sizeof(size_t) * globalSlotCount, &coll));
258 CHECK(lpf_allgatherv(coll, slot_local_sizes, slot_global_sizes, globalSlotCountsInBytes.data(), false /*exclude myself*/));
259 CHECK(lpf_sync(_lpf, LPF_SYNC_DEFAULT));
260 CHECK(lpf_register_local(_lpf, localSlotProcessId.data(), localSlotCount * sizeof(lpf_pid_t), &slot_local_process_ids));
261 CHECK(lpf_register_global(_lpf, globalSlotProcessId.data(), globalSlotCount * sizeof(lpf_pid_t), &slot_global_process_ids));
262 CHECK(lpf_sync(_lpf, LPF_SYNC_DEFAULT));
263 // start allgatherv for process IDs assigned to each global slot
264 CHECK(lpf_allgatherv(coll, slot_local_process_ids, slot_global_process_ids, globalSlotPidCountsInBytes.data(), false /*exclude myself*/));
265 CHECK(lpf_sync(_lpf, LPF_SYNC_DEFAULT));
266 CHECK(lpf_collectives_destroy(coll));
267 CHECK(lpf_deregister(_lpf, slot_local_sizes));
268 CHECK(lpf_deregister(_lpf, slot_global_sizes));
269 CHECK(lpf_deregister(_lpf, slot_local_process_ids));
270 CHECK(lpf_deregister(_lpf, slot_global_process_ids));
271
272 CHECK(lpf_register_local(_lpf, localSlotKeys.data(), localSlotCount * sizeof(size_t), &slot_local_keys));
273 CHECK(lpf_register_global(_lpf, globalSlotKeys.data(), globalSlotCount * sizeof(size_t), &slot_global_keys));
274 CHECK(lpf_sync(_lpf, LPF_SYNC_DEFAULT));
275
276 CHECK(lpf_collectives_init(_lpf, _rank, _size, 1, 0, sizeof(size_t) * globalSlotCount, &coll));
277 CHECK(lpf_allgatherv(coll, slot_local_keys, slot_global_keys, globalSlotCountsInBytes.data(), false /*exclude myself*/));
278 CHECK(lpf_sync(_lpf, LPF_SYNC_DEFAULT));
279 CHECK(lpf_collectives_destroy(coll));
280 CHECK(lpf_deregister(_lpf, slot_local_keys));
281 CHECK(lpf_deregister(_lpf, slot_global_keys));
282
283 CHECK(lpf_register_local(_lpf, &_localSwap, sizeof(uint64_t), &_localSwapSlot));
284
285 size_t localPointerPos = 0;
286 for (size_t i = 0; i < globalSlotCount; i++)
287 {
288 // If the rank associated with this slot is remote, don't store the pointer, otherwise store it.
289 void *globalSlotPointer = nullptr;
290 void *globalSwapSlotPointer = nullptr;
291 std::shared_ptr<HiCR::LocalMemorySlot> globalSourceSlot = nullptr;
292
293 globalSwapSlotSizes[i] = sizeof(uint64_t);
294 // If the slot is remote, do not specify any local size assigned to it
295 if (globalSlotProcessId[i] != _rank)
296 {
297 globalSlotSizes[i] = 0;
298 globalSwapSlotSizes[i] = 0;
299 }
300
301 lpf_memslot_t newSlot = LPF_INVALID_MEMSLOT;
302 lpf_memslot_t swapValueSlot = LPF_INVALID_MEMSLOT;
303 std::shared_ptr<lpf::LocalMemorySlot> localSlot;
304 // If it's local, then assign the local information to it
305 if (globalSlotProcessId[i] == _rank)
306 {
307 auto memorySlot = memorySlots[localPointerPos++].second;
308 globalSlotPointer = memorySlot->getPointer();
309 localSlot = (dynamic_pointer_cast<lpf::LocalMemorySlot>(memorySlot));
310 globalSwapSlotPointer = localSlot->getLPFSwapPointer();
311 globalSourceSlot = memorySlot;
312 }
313 // Registering with the LPF library
314 CHECK(lpf_register_global(_lpf, globalSlotPointer, globalSlotSizes[i], &newSlot));
315 CHECK(lpf_register_global(_lpf, globalSwapSlotPointer, globalSwapSlotSizes[i], &swapValueSlot));
316 // Synchronizing with others
317 CHECK(lpf_sync(_lpf, LPF_SYNC_DEFAULT));
318
319 // Make sure the newly promoted slot points to the new
320 // lpf_memslot_t reference -- otherwise querying the local slot
321 // would yield incorrect results
322 if (globalSlotProcessId[i] == _rank) { localSlot->setLPFSlot(newSlot); }
323
324 // Creating new memory slot object
325 auto memorySlot = std::make_shared<lpf::GlobalMemorySlot>(globalSlotProcessId[i], newSlot, swapValueSlot, tag, globalSlotKeys[i], globalSourceSlot);
326
327 // Finally, registering the new global memory slot
328 registerGlobalMemorySlot(memorySlot);
329 }
330 }
331
332 __INLINE__ void destroyGlobalMemorySlotsCollectiveImpl(HiCR::GlobalMemorySlot::tag_t tag)
333 {
334 size_t localDestroySlotsCount = getGlobalMemorySlotsToDestroyPerTag()[tag].size();
335
336 // Allgather the global slot to destroy counts
337 auto globalDestroySlotCounts = std::vector<size_t>(_size);
338 for (size_t i = 0; i < _size; i++) globalDestroySlotCounts[i] = 0;
339 lpf_coll_t coll;
340 lpf_memslot_t src_slot = LPF_INVALID_MEMSLOT;
341 lpf_memslot_t dst_slot = LPF_INVALID_MEMSLOT;
342
343 CHECK(lpf_register_global(_lpf, &localDestroySlotsCount, sizeof(size_t), &src_slot));
344 CHECK(lpf_register_global(_lpf, globalDestroySlotCounts.data(), sizeof(size_t) * _size, &dst_slot));
345 CHECK(lpf_sync(_lpf, LPF_SYNC_DEFAULT));
346
347 CHECK(lpf_collectives_init(_lpf, _rank, _size, 1, 0, sizeof(size_t) * _size, &coll));
348 CHECK(lpf_allgather(coll, src_slot, dst_slot, sizeof(size_t), false /* exclude myself */));
349 CHECK(lpf_sync(_lpf, LPF_SYNC_DEFAULT));
350 CHECK(lpf_collectives_destroy(coll));
351 CHECK(lpf_deregister(_lpf, src_slot));
352 CHECK(lpf_deregister(_lpf, dst_slot));
353 // End allgather slots to destroy counts block
354
355 size_t globalDestroySlotTotalCount = 0;
356 for (size_t i = 0; i < _size; i++) globalDestroySlotTotalCount += globalDestroySlotCounts[i];
357
358 // If there are no slots to destroy from any instance, return to avoid a second round of collectives
359 if (globalDestroySlotTotalCount == 0) return;
360
361 // We need to destroy both the slot and the swap slot
362 localDestroySlotsCount *= 2lu;
363 globalDestroySlotTotalCount *= 2lu;
364
365 auto globalDestroySlotCountsInBytes = std::vector<size_t>(_size);
366 for (size_t i = 0; i < _size; i++) globalDestroySlotCountsInBytes[i] = globalDestroySlotCounts[i] * 2 * sizeof(size_t);
367
368 // Allgather the global slot keys: Here we use the actual slot_t as the key! (contrary to MPI)
369 // We need both the slot and the swap slot IDs
370 auto localDestroySlotIds = std::vector<lpf_memslot_t>(localDestroySlotsCount);
371 auto globalDestroySlotIds = std::vector<lpf_memslot_t>(globalDestroySlotTotalCount);
372
373 // Filling in the local IDs storage
374 size_t i = 0;
375 for (const auto &slot : getGlobalMemorySlotsToDestroyPerTag()[tag])
376 {
377 const auto memorySlot = std::dynamic_pointer_cast<HiCR::backend::lpf::GlobalMemorySlot>(slot);
378 if (memorySlot.get() == nullptr) HICR_THROW_FATAL("Trying to use LPF to destroy a non-LPF global slot");
379 localDestroySlotIds[i++] = memorySlot->getLPFSlot();
380 localDestroySlotIds[i++] = memorySlot->getLPFSwapSlot();
381 }
382
383 lpf_memslot_t slot_local_ids = LPF_INVALID_MEMSLOT;
384 lpf_memslot_t slot_global_ids = LPF_INVALID_MEMSLOT;
385
386 // Prepare the relevant slots
387 CHECK(lpf_register_local(_lpf, localDestroySlotIds.data(), localDestroySlotsCount * sizeof(lpf_memslot_t), &slot_local_ids));
388 CHECK(lpf_register_global(_lpf, globalDestroySlotIds.data(), globalDestroySlotTotalCount * sizeof(lpf_memslot_t), &slot_global_ids));
389 CHECK(lpf_sync(_lpf, LPF_SYNC_DEFAULT));
390
391 // Execute the allgatherv
392 CHECK(lpf_collectives_init(_lpf, _rank, _size, 1, 0, sizeof(lpf_memslot_t) * globalDestroySlotTotalCount, &coll));
393 CHECK(lpf_allgatherv(coll, slot_local_ids, slot_global_ids, globalDestroySlotCountsInBytes.data(), false /* exclude myself */));
394 CHECK(lpf_sync(_lpf, LPF_SYNC_DEFAULT));
395 CHECK(lpf_collectives_destroy(coll));
396 CHECK(lpf_deregister(_lpf, slot_local_ids));
397 CHECK(lpf_deregister(_lpf, slot_global_ids));
398 // End allgather global slot IDs block
399
400 // Deduplicate the global slot keys
401 auto globalDestroySlotIdsSet = std::set<lpf_memslot_t>(globalDestroySlotIds.begin(), globalDestroySlotIds.end());
402
403 // Now we can iterate over the global slots to destroy one by one
404 for (auto id : globalDestroySlotIdsSet) lpf_deregister(_lpf, id);
405 }
406
413 __INLINE__ void destroyGlobalMemorySlotImpl(std::shared_ptr<HiCR::GlobalMemorySlot> memorySlotPtr) override
414 {
415 // Getting up-casted pointer for the global memory slot
416 auto memorySlot = dynamic_pointer_cast<lpf::GlobalMemorySlot>(memorySlotPtr);
417
418 // Checking whether the memory slot passed is compatible with this backend
419 if (memorySlot == nullptr) HICR_THROW_LOGIC("The memory slot is not supported by this backend\n");
420
421 // Deregistering from LPF
422 CHECK(lpf_deregister(_lpf, memorySlot->getLPFSlot()));
423 CHECK(lpf_deregister(_lpf, memorySlot->getLPFSwapSlot()));
424 }
425
426 __INLINE__ void memcpyImpl(const std::shared_ptr<HiCR::LocalMemorySlot> &destination,
427 size_t dst_offset,
428 const std::shared_ptr<HiCR::GlobalMemorySlot> &source,
429 size_t src_offset,
430 size_t size) override
431 {
432 // Getting up-casted pointer
433 auto src = dynamic_pointer_cast<lpf::GlobalMemorySlot>(source);
434
435 // Checking whether the execution unit passed is compatible with this backend
436 if (source == nullptr) HICR_THROW_LOGIC("The passed source memory slot is not supported by this backend\n");
437
438 // Getting up-casted pointer
439 auto dest = dynamic_pointer_cast<lpf::LocalMemorySlot>(destination);
440
441 // Checking whether the execution unit passed is compatible with this backend
442 if (dest == nullptr) HICR_THROW_LOGIC("The passed destination memory slot is not supported by this backend\n");
443
444 // Getting internal lpf slots
445 lpf_memslot_t srcSlot = src->getLPFSlot();
446 lpf_memslot_t dstSlot = dest->getLPFSlot();
447
448 // Getting remote rank
449 auto remoteRank = src->getRank();
450
451 // Perform the get operation
452 lpf_get(_lpf, remoteRank, srcSlot, src_offset, dstSlot, dst_offset, size, LPF_MSG_DEFAULT);
453 }
454
455 __INLINE__ void memcpyImpl(const std::shared_ptr<HiCR::GlobalMemorySlot> &destination,
456 size_t dst_offset,
457 const std::shared_ptr<HiCR::LocalMemorySlot> &source,
458 size_t src_offset,
459 size_t size) override
460 {
461 // Getting up-casted pointer
462 auto src = dynamic_pointer_cast<lpf::LocalMemorySlot>(source);
463
464 // Checking whether the execution unit passed is compatible with this backend
465 if (src == nullptr) HICR_THROW_LOGIC("The passed source memory slot is not supported by this backend\n");
466
467 // Getting up-casted pointer
468 auto dest = dynamic_pointer_cast<lpf::GlobalMemorySlot>(destination);
469
470 // Checking whether the execution unit passed is compatible with this backend
471 if (dest == nullptr) HICR_THROW_LOGIC("The passed destination memory slot is not supported by this backend\n");
472
473 // Calculating pointers
474 lpf_memslot_t dstSlot = dest->getLPFSlot();
475 lpf_memslot_t srcSlot = src->getLPFSlot();
476
477 // Getting remote rank
478 auto remoteRank = dest->getRank();
479
480 // Perform the put operation
481 lpf_put(_lpf, srcSlot, src_offset, remoteRank, dstSlot, dst_offset, size, LPF_MSG_DEFAULT);
482 }
483
490 __INLINE__ void fenceImpl(HiCR::GlobalMemorySlot::tag_t tag) override
491 {
492 CHECK(lpf_sync(_lpf, LPF_SYNC_DEFAULT));
493
494 // Call the slot destrction collective routine
495 destroyGlobalMemorySlotsCollectiveImpl(tag);
496 }
497
503 __INLINE__ globalKeyToMemorySlotMap_t getGlobalMemorySlots(GlobalMemorySlot::tag_t tag)
504 {
505 // If the requested tag and key are not found, return empty storage
506 if (getGlobalMemorySlotTagKeyMap().contains(tag) == false)
507 HICR_THROW_LOGIC("getGlobalMemorySlots: Requesting a global memory slot for a tag (%lu) that has not been registered.", tag);
509 return slotsForTag;
510 }
511
518 __INLINE__ void fenceImpl(const std::shared_ptr<HiCR::LocalMemorySlot> &slot, size_t expectedSent, size_t expectedRcvd) override
519 {
520 auto memSlot = dynamic_pointer_cast<lpf::LocalMemorySlot>(slot);
521 lpf_memslot_t lpfSlot = memSlot->getLPFSlot();
522 if (lpfSlot == LPF_INVALID_MEMSLOT) { HICR_THROW_LOGIC("This slot is not registered with LPF!"); }
523 CHECK(lpf_counting_sync_per_slot(_lpf, LPF_SYNC_DEFAULT, lpfSlot, memSlot->getMessagesSent() + expectedSent, memSlot->getMessagesRecv() + expectedRcvd));
524
525 setMessagesSent(*memSlot, memSlot->getMessagesSent() + expectedSent);
526 setMessagesRecv(*memSlot, memSlot->getMessagesRecv() + expectedRcvd);
527 }
528
535 __INLINE__ void updateMessagesRecv(const std::shared_ptr<HiCR::LocalMemorySlot> &memorySlot)
536 {
537 size_t msg_cnt = 0;
538 auto memSlot = dynamic_pointer_cast<lpf::LocalMemorySlot>(memorySlot);
539 lpf_memslot_t lpfSlot = memSlot->getLPFSlot();
540 lpf_get_rcvd_msg_count_per_slot(_lpf, &msg_cnt, lpfSlot);
541 setMessagesRecv(*memSlot, msg_cnt);
542 }
543
550 __INLINE__ void updateMessagesSent(const std::shared_ptr<HiCR::LocalMemorySlot> &memorySlot)
551 {
552 size_t msg_cnt = 0;
553 auto memSlot = dynamic_pointer_cast<lpf::LocalMemorySlot>(memorySlot);
554 lpf_memslot_t lpfSlot = memSlot->getLPFSlot();
555 lpf_get_sent_msg_count_per_slot(_lpf, &msg_cnt, lpfSlot);
556 setMessagesSent(*memSlot, msg_cnt);
557 }
558
559 __INLINE__ void queryMemorySlotUpdatesImpl(std::shared_ptr<HiCR::LocalMemorySlot> memorySlot) override { fenceImplSingle(memorySlot); }
560
561 __INLINE__ void flushSent() override { lpf_flush_sent(_lpf); }
562 __INLINE__ void flushReceived() override { lpf_flush_received(_lpf); }
563
564 __INLINE__ bool acquireGlobalLockImpl(std::shared_ptr<HiCR::GlobalMemorySlot> memorySlot) override
565 {
566 auto hicrSlot = dynamic_pointer_cast<lpf::GlobalMemorySlot>(memorySlot);
567 lpf_memslot_t lpfSwapSlot = hicrSlot->getLPFSwapSlot();
568 auto slotRank = hicrSlot->getRank();
569 CHECK(lpf_lock_slot(_lpf, _localSwapSlot, 0, slotRank, lpfSwapSlot, 0, sizeof(uint64_t), LPF_MSG_DEFAULT));
570 return true;
571 }
572
573 __INLINE__ void releaseGlobalLockImpl(std::shared_ptr<HiCR::GlobalMemorySlot> memorySlot) override
574 {
575 auto hicrSlot = dynamic_pointer_cast<lpf::GlobalMemorySlot>(memorySlot);
576 auto slotRank = hicrSlot->getRank();
577 lpf_memslot_t lpfSwapSlot = hicrSlot->getLPFSwapSlot();
578 CHECK(lpf_unlock_slot(_lpf, _localSwapSlot, 0, slotRank, lpfSwapSlot, 0, sizeof(uint64_t), LPF_MSG_DEFAULT));
579 }
580
581 private:
582
587 __INLINE__ void fenceImplSingle(const std::shared_ptr<HiCR::LocalMemorySlot> &hicrSlot)
588 {
589 auto memorySlot = dynamic_pointer_cast<lpf::LocalMemorySlot>(hicrSlot);
590 lpf_memslot_t slot = memorySlot->getLPFSlot();
591 CHECK(lpf_sync_per_slot(_lpf, LPF_SYNC_DEFAULT, slot));
592 updateMessagesRecv(memorySlot);
593 updateMessagesSent(memorySlot);
594 }
595};
596
597} // namespace HiCR::backend::lpf
#define CHECK(f...)
Definition common.hpp:29
Definition communicationManager.hpp:54
__INLINE__ auto & getGlobalMemorySlotTagKeyMap()
Definition communicationManager.hpp:643
__INLINE__ void setMessagesSent(HiCR::LocalMemorySlot &memorySlot, const size_t count) noexcept
Definition communicationManager.hpp:675
__INLINE__ void registerGlobalMemorySlot(const std::shared_ptr< GlobalMemorySlot > &memorySlot)
Definition communicationManager.hpp:503
std::map< GlobalMemorySlot::globalKey_t, std::shared_ptr< GlobalMemorySlot > > globalKeyToMemorySlotMap_t
Definition communicationManager.hpp:65
__INLINE__ auto & getGlobalMemorySlotsToDestroyPerTag()
Definition communicationManager.hpp:637
__INLINE__ void setMessagesRecv(HiCR::LocalMemorySlot &memorySlot, const size_t count) noexcept
Definition communicationManager.hpp:667
uint64_t tag_t
Definition globalMemorySlot.hpp:49
uint64_t globalKey_t
Definition globalMemorySlot.hpp:44
Definition communicationManager.hpp:46
__INLINE__ uint8_t * serializeGlobalMemorySlot(const std::shared_ptr< HiCR::GlobalMemorySlot > &globalSlot) const override
Definition communicationManager.hpp:77
__INLINE__ void destroyPromotedGlobalMemorySlot(const std::shared_ptr< HiCR::GlobalMemorySlot > &memorySlot) override
Definition communicationManager.hpp:159
__INLINE__ std::shared_ptr< HiCR::GlobalMemorySlot > promoteLocalMemorySlot(const std::shared_ptr< HiCR::LocalMemorySlot > &memorySlot, HiCR::GlobalMemorySlot::tag_t tag) override
Definition communicationManager.hpp:134
__INLINE__ std::shared_ptr< HiCR::GlobalMemorySlot > deserializeGlobalMemorySlot(uint8_t *buffer, GlobalMemorySlot::tag_t tag) override
Definition communicationManager.hpp:109
CommunicationManager(size_t size, size_t rank, lpf_t lpf)
Definition communicationManager.hpp:60
Provides a definition for the base backend's communication manager class.
Provides a definition for a HiCR Global Memory Slot class.
Provides a definition for a HiCR Local Memory Slot class.
#define HICR_THROW_LOGIC(...)
Definition exceptions.hpp:67
#define HICR_THROW_FATAL(...)
Definition exceptions.hpp:81