/home/runner/work/HiCR/HiCR/include/hicr/frontends/RPCEngine/RPCEngine.hpp Source File

HiCR: /home/runner/work/HiCR/HiCR/include/hicr/frontends/RPCEngine/RPCEngine.hpp Source File
HiCR
RPCEngine.hpp
Go to the documentation of this file.
1/*
2 * Copyright 2025 Huawei Technologies Co., Ltd.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
24#pragma once
25
34
38#define _HICR_RPC_ENGINE_CHANNEL_PAYLOAD_CAPACITY 1048576
39
43#define _HICR_RPC_ENGINE_CHANNEL_COUNT_CAPACITY 1024
44
48#define _HICR_RPC_ENGINE_CHANNEL_BASE_TAG 0xF0000000
49
50namespace HiCR::frontend
51{
52
61{
62 public:
63
67 using RPCTargetIndex_t = uint64_t;
68
72 using RPCArgument_t = uint64_t;
73
85
90 {
92 std::string name;
93
95 std::shared_ptr<HiCR::ExecutionUnit> executionUnit;
96 };
97
109 RPCEngine(CommunicationManager &communicationManager,
110 InstanceManager &instanceManager,
111 MemoryManager &memoryManager,
112 ComputeManager &computeManager,
113 std::shared_ptr<MemorySpace> bufferMemorySpace,
114 std::shared_ptr<ComputeResource> computeResource,
115 const uint64_t baseTag = _HICR_RPC_ENGINE_CHANNEL_BASE_TAG)
116 : _communicationManager(communicationManager),
117 _instanceManager(instanceManager),
118 _memoryManager(memoryManager),
119 _computeManager(computeManager),
120 _bufferMemorySpace(bufferMemorySpace),
121 _computeResource(computeResource),
122 _baseTag(baseTag)
123 {}
124
130 __INLINE__ void initialize()
131 {
132 // Creating MPSC channels to receive RPC requests
133 initializeRPCChannels();
134 initializeReturnValueChannels();
135 }
136
140 ~RPCEngine() = default;
141
147 __INLINE__ void addRPCTarget(const std::string &RPCName, const std::shared_ptr<HiCR::ExecutionUnit> e)
148 {
149 // Obtaining hash from the RPC name
150 const auto idx = getRPCTargetIndexFromString(RPCName);
151
152 // Inserting the new entry
153 _RPCTargetMap[idx] = {RPCName, e};
154 }
155
161 __INLINE__ bool hasPendingRPCs()
162 {
163 // Updating depth
164 _RPCConsumerChannel->updateDepth();
165
166 // Checking if empty
167 return _RPCConsumerChannel->isEmpty() == false;
168 }
169
175 __INLINE__ void listen()
176 {
177 // Keep querying the channel until a new message arrives
178 while (_RPCConsumerChannel->getDepth() == 0) _RPCConsumerChannel->updateDepth();
179
180 // Execute the RPC
182 }
183
191 __INLINE__ bool tryListen()
192 {
193 // If there are messages to process early return
194 if (_RPCConsumerChannel->getDepth() > 0) return true;
195
196 // If the channel is empty, check if new messages arrived since last check
197 _RPCConsumerChannel->updateDepth();
198
199 // Return whether there are new messages
200 return _RPCConsumerChannel->getDepth() > 0;
201 }
202
206 __INLINE__ void parseAndExecuteRPC()
207 {
208 // Once a request has arrived, gather its value from the channel
209 auto request = _RPCConsumerChannel->peek();
210 auto requester = request[0];
211 RPCPayload_t *buffer = (RPCPayload_t *)(_RPCConsumerChannel->getTokenBuffers()[requester]->getSourceLocalMemorySlot()->getPointer());
212 RPCPayload_t payload = buffer[request[1]];
213 _RPCConsumerChannel->pop();
214
215 // Setting requester instance index
216 _requesterInstanceIdx = requester;
217
218 // Storing rpc argument
219 _currentRPCArgument = payload.argument;
220
221 // Execute RPC
222 executeRPC(payload.index);
223 }
224
230 __INLINE__ std::shared_ptr<HiCR::Instance> getRPCRequester() { return _instanceManager.getInstances()[_requesterInstanceIdx]; }
231
237 [[nodiscard]] __INLINE__ const RPCArgument_t getRPCArgument() { return _currentRPCArgument; }
238
245 virtual void requestRPC(HiCR::Instance::instanceId_t targetInstanceId, const std::string &RPCName, const HiCR::frontend::RPCEngine::RPCArgument_t argument = 0)
246 {
247 // Creating message payload
248 RPCPayload_t RPCPayload;
249 RPCPayload.index = getRPCTargetIndexFromString(RPCName);
250 RPCPayload.argument = argument;
251
252 // Registering source buffer
253 auto tempBufferSlot = _memoryManager.registerLocalMemorySlot(_bufferMemorySpace, (void *)&RPCPayload, sizeof(RPCPayload_t));
254
255 // Sending source buffer
256 _RPCProducerChannels.at(targetInstanceId)->push(tempBufferSlot);
257 }
258
264 __INLINE__ void submitReturnValue(void *pointer, const size_t size)
265 {
266 // Getting source buffers
267 auto tempBufferSlot = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, size);
268 auto sourceBufferSlot = _memoryManager.registerLocalMemorySlot(_bufferMemorySpace, pointer, size);
269
270 // Copying data
271 _communicationManager.memcpy(tempBufferSlot, 0, sourceBufferSlot, 0, size);
272
273 // Waiting for communication to end
274 _communicationManager.fence(tempBufferSlot, 1, 0);
275
276 // Sending return value data
277 _returnValueProducerChannels.at(_requesterInstanceIdx)->push(tempBufferSlot);
278
279 // Freeing up local memory slot
280 _memoryManager.freeLocalMemorySlot(tempBufferSlot);
281 }
282
287 __INLINE__ std::shared_ptr<HiCR::LocalMemorySlot> getReturnValue() const
288 {
289 // Calling the backend-specific implementation of the listen function
290 while (_returnValueConsumerChannel->isEmpty());
291
292 // Calling backend-specific implementation of this function
293 auto returnValue = _returnValueConsumerChannel->peek();
294
295 // Getting message info
296 auto msgOffset = returnValue[0];
297 auto msgSize = returnValue[1];
298
299 // Creating local buffer
300 auto tempBufferSlot = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, msgSize);
301
302 // Copying data
303 _communicationManager.memcpy(tempBufferSlot, 0, _returnValueConsumerChannel->getPayloadBufferMemorySlot(), msgOffset, msgSize);
304
305 // Waiting for communication to end
306 _communicationManager.fence(tempBufferSlot, 1, 0);
307
308 // Freeing up channel
309 _returnValueConsumerChannel->pop();
310
311 // Returning internal buffer
312 return tempBufferSlot;
313 }
314
320 [[nodiscard]] __INLINE__ HiCR::CommunicationManager *getCommunicationManager() const { return &_communicationManager; }
321
327 [[nodiscard]] __INLINE__ HiCR::InstanceManager *getInstanceManager() const { return &_instanceManager; }
328
334 [[nodiscard]] __INLINE__ HiCR::MemoryManager *getMemoryManager() const { return &_memoryManager; }
335
341 [[nodiscard]] __INLINE__ HiCR::ComputeManager *getComputeManager() const { return &_computeManager; }
342
343 private:
344
351 static RPCTargetIndex_t getRPCTargetIndexFromString(const std::string &name) { return std::hash<std::string>()(name); }
352
357 __INLINE__ void executeRPC(const RPCTargetIndex_t rpcIdx) const
358 {
359 // Getting RPC target from the index
360 if (_RPCTargetMap.contains(rpcIdx) == false) HICR_THROW_RUNTIME("Attempting to run an RPC target (Hash: %lu) that was not defined in this instance (0x%lX).\n", rpcIdx, this);
361 auto &target = _RPCTargetMap.at(rpcIdx);
362 // auto &targetName = target.name;
363 auto &targetExecutionUnit = target.executionUnit;
364
365 // printf("Running: %s\n", targetName.c_str());
366
367 // Creating new processing unit to execute the RPC
368 auto p = _computeManager.createProcessingUnit(_computeResource);
369 _computeManager.initialize(p);
370
371 // Creating execution state
372 auto s = _computeManager.createExecutionState(targetExecutionUnit);
373
374 // Executing RPC
375 _computeManager.start(p, s);
376
377 // Waiting for execution to finish
378 _computeManager.await(p);
379 }
380
381 __INLINE__ void initializeRPCChannels()
382 {
383 // Defining tag values
384 const uint64_t _HICR_RPC_ENGINE_CHANNEL_CONSUMER_TOKEN_BUFFER_TAG = _baseTag + 4;
385 const uint64_t _HICR_RPC_ENGINE_CHANNEL_PRODUCER_COORDINATION_BUFFER_TAG = _baseTag + 5;
386 const uint64_t _HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_TAG = _baseTag + 6;
387
388 // Getting required buffer sizes
389 auto tokenSize = sizeof(RPCPayload_t);
390
391 // Getting required buffer sizes
393
394 // Getting my current instance
395 const auto currentInstanceId = _instanceManager.getCurrentInstance()->getId();
396
397 // Getting total instance count
398 const auto instanceCount = _instanceManager.getInstances().size();
399
400 // Creating and exchanging buffers
401
402 std::vector<HiCR::CommunicationManager::globalKeyMemorySlotPair_t> tokenBuffers;
403 std::vector<HiCR::CommunicationManager::globalKeyMemorySlotPair_t> consumerCoordinationBuffers;
404 std::vector<HiCR::CommunicationManager::globalKeyMemorySlotPair_t> producerCoordinationBuffers;
405 std::vector<std::shared_ptr<HiCR::LocalMemorySlot>> localConsumerCoordinationBuffers;
406 std::vector<std::shared_ptr<HiCR::LocalMemorySlot>> localProducerCoordinationBuffers;
407
408 auto coordinationBufferSize = HiCR::channel::fixedSize::Base::getCoordinationBufferSize();
409
410 for (size_t i = 0; i < instanceCount; i++)
411 {
412 // Calculating these particular slots' key
413 const HiCR::GlobalMemorySlot::globalKey_t slotkey = currentInstanceId * instanceCount + i;
414
415 // consumer needs to allocate #producers token buffers for #producers SPSCs
416 auto tokenBufferSlot = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, tokenBufferSize);
417
418 // consumer needs to allocate #producers consumer side coordination buffers for #producers SPSCs
419 auto consumerCoordinationBuffer = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
421
422 // producer needs to allocate #consumers producer-side coordination buffers for #consumer SPSCs
423 auto producerCoordinationBuffer = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
425
426 // Adding to collections
427 localConsumerCoordinationBuffers.push_back(consumerCoordinationBuffer);
428 localProducerCoordinationBuffers.push_back(producerCoordinationBuffer);
429 tokenBuffers.push_back(std::make_pair(slotkey, tokenBufferSlot));
430 consumerCoordinationBuffers.push_back(std::make_pair(slotkey, consumerCoordinationBuffer));
431 producerCoordinationBuffers.push_back(std::make_pair(slotkey, producerCoordinationBuffer));
432 }
433
434 // communicate to producers the token buffer references
435 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_TOKEN_BUFFER_TAG, tokenBuffers);
436 _communicationManager.fence(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_TOKEN_BUFFER_TAG);
437
438 // get from producers their coordination buffer references
439 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_PRODUCER_COORDINATION_BUFFER_TAG, producerCoordinationBuffers);
440 _communicationManager.fence(_HICR_RPC_ENGINE_CHANNEL_PRODUCER_COORDINATION_BUFFER_TAG);
441
442 // communicate to producers the consumer buffers
443 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_TAG, consumerCoordinationBuffers);
444 _communicationManager.fence(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_TAG);
445
447 {
448 std::vector<std::shared_ptr<HiCR::GlobalMemorySlot>> globalConsumerTokenBuffers;
449 std::vector<std::shared_ptr<HiCR::GlobalMemorySlot>> globalConsumerCoordinationBuffers;
450 std::vector<std::shared_ptr<HiCR::GlobalMemorySlot>> globalProducerCoordinationBuffers;
451
452 for (size_t i = 0; i < instanceCount; i++)
453 {
454 // Calculating these particular slots' key
455 const HiCR::GlobalMemorySlot::globalKey_t localSlotKey = currentInstanceId * instanceCount + i;
456 const HiCR::GlobalMemorySlot::globalKey_t remoteSlotKey = i * instanceCount + currentInstanceId;
457
458 auto globalConsumerTokenBufferSlot = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_TOKEN_BUFFER_TAG, localSlotKey);
459 globalConsumerTokenBuffers.push_back(globalConsumerTokenBufferSlot);
460
461 auto producerCoordinationBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_PRODUCER_COORDINATION_BUFFER_TAG, remoteSlotKey);
462 globalProducerCoordinationBuffers.push_back(producerCoordinationBuffer);
463
464 auto consumerCoordinationBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_TAG, localSlotKey);
465 globalConsumerCoordinationBuffers.push_back(consumerCoordinationBuffer);
466 }
467
468 _RPCConsumerChannel = std::make_shared<HiCR::channel::fixedSize::MPSC::nonlocking::Consumer>(_communicationManager,
469 _communicationManager,
470 globalConsumerTokenBuffers,
471 localConsumerCoordinationBuffers,
472 globalProducerCoordinationBuffers,
473 tokenSize,
475 }
476
478
479 {
480 std::vector<std::shared_ptr<HiCR::GlobalMemorySlot>> globalConsumerTokenBuffers;
481 std::vector<std::shared_ptr<HiCR::GlobalMemorySlot>> globalConsumerCoordinationBuffers;
482 std::vector<std::shared_ptr<HiCR::GlobalMemorySlot>> globalProducerCoordinationBuffers;
483
484 for (size_t i = 0; i < instanceCount; i++)
485 {
486 // Calculating these particular slots' key
487 const HiCR::GlobalMemorySlot::globalKey_t localSlotKey = currentInstanceId * instanceCount + i;
488 const HiCR::GlobalMemorySlot::globalKey_t remoteSlotKey = i * instanceCount + currentInstanceId;
489
490 auto globalConsumerTokenBufferSlot = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_TOKEN_BUFFER_TAG, remoteSlotKey);
491 globalConsumerTokenBuffers.push_back(globalConsumerTokenBufferSlot);
492
493 auto producerCoordinationBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_PRODUCER_COORDINATION_BUFFER_TAG, localSlotKey);
494 globalProducerCoordinationBuffers.push_back(producerCoordinationBuffer);
495
496 auto consumerCoordinationBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_TAG, remoteSlotKey);
497 globalConsumerCoordinationBuffers.push_back(consumerCoordinationBuffer);
498 }
499
500 for (size_t i = 0; i < instanceCount; i++)
501 {
502 // Getting consumer instance id
503 const auto consumerInstanceId = _instanceManager.getInstances()[i]->getId();
504
505 // Creating producer channel
506 // This call does the same as the SPSC Producer constructor, as
507 // the producer of MPSC::nonlocking has the same view
508 _RPCProducerChannels[consumerInstanceId] =
509 std::make_shared<HiCR::channel::fixedSize::MPSC::nonlocking::Producer>(_communicationManager,
510 _communicationManager,
511 globalConsumerTokenBuffers[i],
512 globalProducerCoordinationBuffers[i]->getSourceLocalMemorySlot(),
513 globalConsumerCoordinationBuffers[i],
514 tokenSize,
516 }
517 }
518 }
519
520 __INLINE__ void initializeReturnValueChannels()
521 {
522 // Defining tag values
523 const uint64_t _HICR_RPC_ENGINE_CHANNEL_CONSUMER_SIZES_BUFFER_TAG = _baseTag + 0;
524 const uint64_t _HICR_RPC_ENGINE_CHANNEL_CONSUMER_PAYLOAD_BUFFER_TAG = _baseTag + 1;
525 const uint64_t _HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_SIZES_TAG = _baseTag + 2;
526 const uint64_t _HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_PAYLOADS_TAG = _baseTag + 3;
527
528 // Getting my current instance
529 const auto currentInstanceId = _instanceManager.getCurrentInstance()->getId();
530
532
533 // Getting required buffer sizes
535
536 // Allocating token size buffer as a local memory slot
537 auto tokenSizeBufferSlot = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, tokenSizeBufferSize);
538
539 // Allocating token size buffer as a local memory slot
540 auto payloadBufferSlot = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, _HICR_RPC_ENGINE_CHANNEL_PAYLOAD_CAPACITY);
541
542 // Getting required buffer size for coordination buffers
544
545 // Allocating coordination buffers
546 auto localConsumerCoordinationBufferMessageSizes = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
547 auto localConsumerCoordinationBufferMessagePayloads = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
548
549 // Initializing coordination buffers
550 HiCR::channel::variableSize::Base::initializeCoordinationBuffer(localConsumerCoordinationBufferMessageSizes);
551 HiCR::channel::variableSize::Base::initializeCoordinationBuffer(localConsumerCoordinationBufferMessagePayloads);
552
553 // Exchanging local memory slots to become global for them to be used by the remote end
554 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_SIZES_BUFFER_TAG, {{currentInstanceId, tokenSizeBufferSlot}});
555 _communicationManager.fence(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_SIZES_BUFFER_TAG);
556
557 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_PAYLOAD_BUFFER_TAG, {{currentInstanceId, payloadBufferSlot}});
558 _communicationManager.fence(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_PAYLOAD_BUFFER_TAG);
559
560 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_SIZES_TAG,
561 {{currentInstanceId, localConsumerCoordinationBufferMessageSizes}});
562 _communicationManager.fence(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_SIZES_TAG);
563
564 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_PAYLOADS_TAG,
565 {{currentInstanceId, localConsumerCoordinationBufferMessagePayloads}});
566 _communicationManager.fence(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_PAYLOADS_TAG);
567
568 // Obtaining the globally exchanged memory slots
569 auto consumerMessagePayloadBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_PAYLOAD_BUFFER_TAG, currentInstanceId);
570 auto consumerMessageSizesBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_SIZES_BUFFER_TAG, currentInstanceId);
571 auto consumerCoodinationPayloadsBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_PAYLOADS_TAG, currentInstanceId);
572 auto consumerCoordinationSizesBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_SIZES_TAG, currentInstanceId);
573
574 // Creating channel
575 _returnValueConsumerChannel = std::make_shared<HiCR::channel::variableSize::MPSC::locking::Consumer>(_communicationManager,
576 _communicationManager,
577 consumerMessagePayloadBuffer,
578 consumerMessageSizesBuffer,
579 localConsumerCoordinationBufferMessageSizes,
580 localConsumerCoordinationBufferMessagePayloads,
581 consumerCoordinationSizesBuffer,
582 consumerCoodinationPayloadsBuffer,
585
586 // Creating producer channels
587 for (const auto &instance : _instanceManager.getInstances())
588 {
589 // Getting consumer instance id
590 const auto consumerInstanceId = instance->getId();
591
592 // Allocating coordination buffers
593 auto localProducerSizeInfoBuffer = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, sizeof(size_t));
594 auto localProducerCoordinationBufferMessageSizes = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
595 auto localProducerCoordinationBufferMessagePayloads = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
596
597 // Initializing coordination buffers
598 HiCR::channel::variableSize::Base::initializeCoordinationBuffer(localProducerCoordinationBufferMessageSizes);
599 HiCR::channel::variableSize::Base::initializeCoordinationBuffer(localProducerCoordinationBufferMessagePayloads);
600
601 // Obtaining the globally exchanged memory slots
602 auto consumerMessagePayloadBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_PAYLOAD_BUFFER_TAG, consumerInstanceId);
603 auto consumerMessageSizesBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_SIZES_BUFFER_TAG, consumerInstanceId);
604 auto consumerPayloadConsumerBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_PAYLOADS_TAG, consumerInstanceId);
605 auto consumerSizesConsumerBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_SIZES_TAG, consumerInstanceId);
606
607 // Creating channel
608 _returnValueProducerChannels[consumerInstanceId] = std::make_shared<HiCR::channel::variableSize::MPSC::locking::Producer>(_communicationManager,
609 _communicationManager,
610 localProducerSizeInfoBuffer,
611 consumerMessagePayloadBuffer,
612 consumerMessageSizesBuffer,
613 localProducerCoordinationBufferMessageSizes,
614 localProducerCoordinationBufferMessagePayloads,
615 consumerSizesConsumerBuffer,
616 consumerPayloadConsumerBuffer,
618 sizeof(uint8_t),
620 }
621 }
622
626 CommunicationManager &_communicationManager;
627
631 InstanceManager &_instanceManager;
632
636 MemoryManager &_memoryManager;
637
641 ComputeManager &_computeManager;
642
646 const std::shared_ptr<HiCR::MemorySpace> _bufferMemorySpace;
647
651 const std::shared_ptr<HiCR::ComputeResource> _computeResource;
652
653 const uint64_t _baseTag;
654
655 size_t _requesterInstanceIdx;
656 RPCArgument_t _currentRPCArgument;
657
661 std::shared_ptr<HiCR::channel::variableSize::MPSC::locking::Consumer> _returnValueConsumerChannel;
662
666 std::map<HiCR::Instance::instanceId_t, std::shared_ptr<HiCR::channel::variableSize::MPSC::locking::Producer>> _returnValueProducerChannels;
667
671 std::shared_ptr<HiCR::channel::fixedSize::MPSC::nonlocking::Consumer> _RPCConsumerChannel;
672
676 std::map<HiCR::Instance::instanceId_t, std::shared_ptr<HiCR::channel::fixedSize::MPSC::nonlocking::Producer>> _RPCProducerChannels;
677
681 std::map<RPCTargetIndex_t, RPCTarget_t> _RPCTargetMap;
682};
683
684} // namespace HiCR::frontend
#define _HICR_RPC_ENGINE_CHANNEL_PAYLOAD_CAPACITY
Definition RPCEngine.hpp:38
#define _HICR_RPC_ENGINE_CHANNEL_BASE_TAG
Definition RPCEngine.hpp:48
#define _HICR_RPC_ENGINE_CHANNEL_COUNT_CAPACITY
Definition RPCEngine.hpp:43
Definition communicationManager.hpp:54
__INLINE__ std::shared_ptr< GlobalMemorySlot > getGlobalMemorySlot(GlobalMemorySlot::tag_t tag, GlobalMemorySlot::globalKey_t globalKey)
Definition communicationManager.hpp:98
__INLINE__ void memcpy(const std::shared_ptr< LocalMemorySlot > &destination, size_t dst_offset, const std::shared_ptr< LocalMemorySlot > &source, size_t src_offset, size_t size)
Definition communicationManager.hpp:248
__INLINE__ void fence(GlobalMemorySlot::tag_t tag)
Definition communicationManager.hpp:358
__INLINE__ void exchangeGlobalMemorySlots(GlobalMemorySlot::tag_t tag, const std::vector< globalKeyMemorySlotPair_t > &memorySlots)
Definition communicationManager.hpp:83
Definition computeManager.hpp:48
virtual std::unique_ptr< HiCR::ProcessingUnit > createProcessingUnit(std::shared_ptr< HiCR::ComputeResource > resource) const =0
__INLINE__ void initialize(std::unique_ptr< HiCR::ProcessingUnit > &processingUnit)
Definition computeManager.hpp:93
__INLINE__ void start(std::unique_ptr< HiCR::ProcessingUnit > &processingUnit, std::unique_ptr< HiCR::ExecutionState > &executionState)
Definition computeManager.hpp:115
virtual std::unique_ptr< HiCR::ExecutionState > createExecutionState(std::shared_ptr< HiCR::ExecutionUnit > executionUnit, void *const argument=nullptr) const =0
__INLINE__ void await(std::unique_ptr< HiCR::ProcessingUnit > &processingUnit)
Definition computeManager.hpp:189
uint64_t globalKey_t
Definition globalMemorySlot.hpp:44
Definition instanceManager.hpp:57
__INLINE__ std::shared_ptr< HiCR::Instance > getCurrentInstance() const
Definition instanceManager.hpp:85
__INLINE__ instanceList_t & getInstances()
Definition instanceManager.hpp:79
uint64_t instanceId_t
Definition instance.hpp:44
Definition memoryManager.hpp:51
__INLINE__ void freeLocalMemorySlot(const std::shared_ptr< HiCR::LocalMemorySlot > &memorySlot)
Definition memoryManager.hpp:129
virtual std::shared_ptr< LocalMemorySlot > registerLocalMemorySlot(const std::shared_ptr< HiCR::MemorySpace > &memorySpace, void *const ptr, const size_t size)
Definition memoryManager.hpp:83
__INLINE__ std::shared_ptr< LocalMemorySlot > allocateLocalMemorySlot(const std::shared_ptr< MemorySpace > &memorySpace, const size_t size)
Definition memoryManager.hpp:66
static __INLINE__ size_t getTokenBufferSize(const size_t tokenSize, const size_t capacity) noexcept
Definition base.hpp:123
static __INLINE__ void initializeCoordinationBuffer(const std::shared_ptr< LocalMemorySlot > &coordinationBuffer)
Definition base.hpp:100
static __INLINE__ size_t getCoordinationBufferSize() noexcept
Definition base.hpp:92
Definition RPCEngine.hpp:61
uint64_t RPCArgument_t
Definition RPCEngine.hpp:72
__INLINE__ void listen()
Definition RPCEngine.hpp:175
__INLINE__ void addRPCTarget(const std::string &RPCName, const std::shared_ptr< HiCR::ExecutionUnit > e)
Definition RPCEngine.hpp:147
virtual void requestRPC(HiCR::Instance::instanceId_t targetInstanceId, const std::string &RPCName, const HiCR::frontend::RPCEngine::RPCArgument_t argument=0)
Definition RPCEngine.hpp:245
__INLINE__ HiCR::ComputeManager * getComputeManager() const
Definition RPCEngine.hpp:341
__INLINE__ HiCR::CommunicationManager * getCommunicationManager() const
Definition RPCEngine.hpp:320
RPCEngine(CommunicationManager &communicationManager, InstanceManager &instanceManager, MemoryManager &memoryManager, ComputeManager &computeManager, std::shared_ptr< MemorySpace > bufferMemorySpace, std::shared_ptr< ComputeResource > computeResource, const uint64_t baseTag=_HICR_RPC_ENGINE_CHANNEL_BASE_TAG)
Definition RPCEngine.hpp:109
__INLINE__ bool hasPendingRPCs()
Definition RPCEngine.hpp:161
uint64_t RPCTargetIndex_t
Definition RPCEngine.hpp:67
__INLINE__ void parseAndExecuteRPC()
Definition RPCEngine.hpp:206
__INLINE__ HiCR::InstanceManager * getInstanceManager() const
Definition RPCEngine.hpp:327
__INLINE__ void initialize()
Definition RPCEngine.hpp:130
__INLINE__ bool tryListen()
Definition RPCEngine.hpp:191
__INLINE__ void submitReturnValue(void *pointer, const size_t size)
Definition RPCEngine.hpp:264
__INLINE__ std::shared_ptr< HiCR::Instance > getRPCRequester()
Definition RPCEngine.hpp:230
__INLINE__ std::shared_ptr< HiCR::LocalMemorySlot > getReturnValue() const
Definition RPCEngine.hpp:287
__INLINE__ HiCR::MemoryManager * getMemoryManager() const
Definition RPCEngine.hpp:334
__INLINE__ const RPCArgument_t getRPCArgument()
Definition RPCEngine.hpp:237
Provides a definition for the base backend's communication manager class.
Provides a definition for the abstract instance manager class.
Provides a definition for the base backend's memory manager class.
Provides a definition for the abstract device manager class.
#define HICR_THROW_RUNTIME(...)
Definition exceptions.hpp:74
Provides Consumer functionality for MPSC based on SPSC, that is without global locks.
Provides prod.
Definition RPCEngine.hpp:78
RPCTargetIndex_t index
Index of the RPC.
Definition RPCEngine.hpp:80
RPCArgument_t argument
Optional argument to the RPC.
Definition RPCEngine.hpp:83
Definition RPCEngine.hpp:90
std::shared_ptr< HiCR::ExecutionUnit > executionUnit
RPC execution unit.
Definition RPCEngine.hpp:95
std::string name
RPC name.
Definition RPCEngine.hpp:92
Provides variable-sized MPSC consumer channel, locking version.
Provides variable-sized MPSC producer channel, locking version.