/home/runner/work/HiCR/HiCR/include/hicr/frontends/RPCEngine/RPCEngine.hpp Source File
|
HiCR
|
RPCEngine.hpp
Go to the documentation of this file.
147 __INLINE__ void addRPCTarget(const std::string &RPCName, const std::shared_ptr<HiCR::ExecutionUnit> e)
211 RPCPayload_t *buffer = (RPCPayload_t *)(_RPCConsumerChannel->getTokenBuffers()[requester]->getSourceLocalMemorySlot()->getPointer());
230 __INLINE__ std::shared_ptr<HiCR::Instance> getRPCRequester() { return _instanceManager.getInstances()[_requesterInstanceIdx]; }
245 virtual void requestRPC(HiCR::Instance::instanceId_t targetInstanceId, const std::string &RPCName, const HiCR::frontend::RPCEngine::RPCArgument_t argument = 0)
253 auto tempBufferSlot = _memoryManager.registerLocalMemorySlot(_bufferMemorySpace, (void *)&RPCPayload, sizeof(RPCPayload_t));
268 auto sourceBufferSlot = _memoryManager.registerLocalMemorySlot(_bufferMemorySpace, pointer, size);
303 _communicationManager.memcpy(tempBufferSlot, 0, _returnValueConsumerChannel->getPayloadBufferMemorySlot(), msgOffset, msgSize);
320 [[nodiscard]] __INLINE__ HiCR::CommunicationManager *getCommunicationManager() const { return &_communicationManager; }
327 [[nodiscard]] __INLINE__ HiCR::InstanceManager *getInstanceManager() const { return &_instanceManager; }
334 [[nodiscard]] __INLINE__ HiCR::MemoryManager *getMemoryManager() const { return &_memoryManager; }
341 [[nodiscard]] __INLINE__ HiCR::ComputeManager *getComputeManager() const { return &_computeManager; }
351 static RPCTargetIndex_t getRPCTargetIndexFromString(const std::string &name) { return std::hash<std::string>()(name); }
360 if (_RPCTargetMap.contains(rpcIdx) == false) HICR_THROW_RUNTIME("Attempting to run an RPC target (Hash: %lu) that was not defined in this instance (0x%lX).\n", rpcIdx, this);
392 auto tokenBufferSize = HiCR::channel::fixedSize::Base::getTokenBufferSize(tokenSize, _HICR_RPC_ENGINE_CHANNEL_COUNT_CAPACITY);
416 auto tokenBufferSlot = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, tokenBufferSize);
418 // consumer needs to allocate #producers consumer side coordination buffers for #producers SPSCs
419 auto consumerCoordinationBuffer = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
423 auto producerCoordinationBuffer = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
435 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_TOKEN_BUFFER_TAG, tokenBuffers);
439 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_PRODUCER_COORDINATION_BUFFER_TAG, producerCoordinationBuffers);
443 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_TAG, consumerCoordinationBuffers);
456 const HiCR::GlobalMemorySlot::globalKey_t remoteSlotKey = i * instanceCount + currentInstanceId;
458 auto globalConsumerTokenBufferSlot = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_TOKEN_BUFFER_TAG, localSlotKey);
461 auto producerCoordinationBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_PRODUCER_COORDINATION_BUFFER_TAG, remoteSlotKey);
464 auto consumerCoordinationBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_TAG, localSlotKey);
468 _RPCConsumerChannel = std::make_shared<HiCR::channel::fixedSize::MPSC::nonlocking::Consumer>(_communicationManager,
488 const HiCR::GlobalMemorySlot::globalKey_t remoteSlotKey = i * instanceCount + currentInstanceId;
490 auto globalConsumerTokenBufferSlot = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_TOKEN_BUFFER_TAG, remoteSlotKey);
493 auto producerCoordinationBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_PRODUCER_COORDINATION_BUFFER_TAG, localSlotKey);
496 auto consumerCoordinationBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_TAG, remoteSlotKey);
526 const uint64_t _HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_PAYLOADS_TAG = _baseTag + 3;
534 auto tokenSizeBufferSize = HiCR::channel::variableSize::Base::getTokenBufferSize(sizeof(size_t), _HICR_RPC_ENGINE_CHANNEL_COUNT_CAPACITY);
537 auto tokenSizeBufferSlot = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, tokenSizeBufferSize);
540 auto payloadBufferSlot = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, _HICR_RPC_ENGINE_CHANNEL_PAYLOAD_CAPACITY);
546 auto localConsumerCoordinationBufferMessageSizes = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
547 auto localConsumerCoordinationBufferMessagePayloads = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
550 HiCR::channel::variableSize::Base::initializeCoordinationBuffer(localConsumerCoordinationBufferMessageSizes);
551 HiCR::channel::variableSize::Base::initializeCoordinationBuffer(localConsumerCoordinationBufferMessagePayloads);
554 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_SIZES_BUFFER_TAG, {{currentInstanceId, tokenSizeBufferSlot}});
557 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_PAYLOAD_BUFFER_TAG, {{currentInstanceId, payloadBufferSlot}});
560 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_SIZES_TAG,
564 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_PAYLOADS_TAG,
566 _communicationManager.fence(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_PAYLOADS_TAG);
569 auto consumerMessagePayloadBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_PAYLOAD_BUFFER_TAG, currentInstanceId);
570 auto consumerMessageSizesBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_SIZES_BUFFER_TAG, currentInstanceId);
571 auto consumerCoodinationPayloadsBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_PAYLOADS_TAG, currentInstanceId);
572 auto consumerCoordinationSizesBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_SIZES_TAG, currentInstanceId);
575 _returnValueConsumerChannel = std::make_shared<HiCR::channel::variableSize::MPSC::locking::Consumer>(_communicationManager,
593 auto localProducerSizeInfoBuffer = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, sizeof(size_t));
594 auto localProducerCoordinationBufferMessageSizes = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
595 auto localProducerCoordinationBufferMessagePayloads = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
598 HiCR::channel::variableSize::Base::initializeCoordinationBuffer(localProducerCoordinationBufferMessageSizes);
599 HiCR::channel::variableSize::Base::initializeCoordinationBuffer(localProducerCoordinationBufferMessagePayloads);
602 auto consumerMessagePayloadBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_PAYLOAD_BUFFER_TAG, consumerInstanceId);
603 auto consumerMessageSizesBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_SIZES_BUFFER_TAG, consumerInstanceId);
604 auto consumerPayloadConsumerBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_PAYLOADS_TAG, consumerInstanceId);
605 auto consumerSizesConsumerBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_SIZES_TAG, consumerInstanceId);
608 _returnValueProducerChannels[consumerInstanceId] = std::make_shared<HiCR::channel::variableSize::MPSC::locking::Producer>(_communicationManager,
661 std::shared_ptr<HiCR::channel::variableSize::MPSC::locking::Consumer> _returnValueConsumerChannel;
666 std::map<HiCR::Instance::instanceId_t, std::shared_ptr<HiCR::channel::variableSize::MPSC::locking::Producer>> _returnValueProducerChannels;
#define _HICR_RPC_ENGINE_CHANNEL_PAYLOAD_CAPACITY
Definition RPCEngine.hpp:38
#define _HICR_RPC_ENGINE_CHANNEL_BASE_TAG
Definition RPCEngine.hpp:48
#define _HICR_RPC_ENGINE_CHANNEL_COUNT_CAPACITY
Definition RPCEngine.hpp:43
Definition communicationManager.hpp:54
__INLINE__ std::shared_ptr< GlobalMemorySlot > getGlobalMemorySlot(GlobalMemorySlot::tag_t tag, GlobalMemorySlot::globalKey_t globalKey)
Definition communicationManager.hpp:98
__INLINE__ void memcpy(const std::shared_ptr< LocalMemorySlot > &destination, size_t dst_offset, const std::shared_ptr< LocalMemorySlot > &source, size_t src_offset, size_t size)
Definition communicationManager.hpp:248
__INLINE__ void fence(GlobalMemorySlot::tag_t tag)
Definition communicationManager.hpp:358
__INLINE__ void exchangeGlobalMemorySlots(GlobalMemorySlot::tag_t tag, const std::vector< globalKeyMemorySlotPair_t > &memorySlots)
Definition communicationManager.hpp:83
Definition computeManager.hpp:48
virtual std::unique_ptr< HiCR::ProcessingUnit > createProcessingUnit(std::shared_ptr< HiCR::ComputeResource > resource) const =0
__INLINE__ void initialize(std::unique_ptr< HiCR::ProcessingUnit > &processingUnit)
Definition computeManager.hpp:93
__INLINE__ void start(std::unique_ptr< HiCR::ProcessingUnit > &processingUnit, std::unique_ptr< HiCR::ExecutionState > &executionState)
Definition computeManager.hpp:115
virtual std::unique_ptr< HiCR::ExecutionState > createExecutionState(std::shared_ptr< HiCR::ExecutionUnit > executionUnit, void *const argument=nullptr) const =0
__INLINE__ void await(std::unique_ptr< HiCR::ProcessingUnit > &processingUnit)
Definition computeManager.hpp:189
Definition instanceManager.hpp:57
__INLINE__ std::shared_ptr< HiCR::Instance > getCurrentInstance() const
Definition instanceManager.hpp:85
__INLINE__ instanceList_t & getInstances()
Definition instanceManager.hpp:79
Definition memoryManager.hpp:51
__INLINE__ void freeLocalMemorySlot(const std::shared_ptr< HiCR::LocalMemorySlot > &memorySlot)
Definition memoryManager.hpp:129
virtual std::shared_ptr< LocalMemorySlot > registerLocalMemorySlot(const std::shared_ptr< HiCR::MemorySpace > &memorySpace, void *const ptr, const size_t size)
Definition memoryManager.hpp:83
__INLINE__ std::shared_ptr< LocalMemorySlot > allocateLocalMemorySlot(const std::shared_ptr< MemorySpace > &memorySpace, const size_t size)
Definition memoryManager.hpp:66
static __INLINE__ size_t getTokenBufferSize(const size_t tokenSize, const size_t capacity) noexcept
Definition base.hpp:123
static __INLINE__ void initializeCoordinationBuffer(const std::shared_ptr< LocalMemorySlot > &coordinationBuffer)
Definition base.hpp:100
static __INLINE__ size_t getCoordinationBufferSize() noexcept
Definition base.hpp:92
Definition RPCEngine.hpp:61
__INLINE__ void addRPCTarget(const std::string &RPCName, const std::shared_ptr< HiCR::ExecutionUnit > e)
Definition RPCEngine.hpp:147
virtual void requestRPC(HiCR::Instance::instanceId_t targetInstanceId, const std::string &RPCName, const HiCR::frontend::RPCEngine::RPCArgument_t argument=0)
Definition RPCEngine.hpp:245
__INLINE__ HiCR::ComputeManager * getComputeManager() const
Definition RPCEngine.hpp:341
__INLINE__ HiCR::CommunicationManager * getCommunicationManager() const
Definition RPCEngine.hpp:320
RPCEngine(CommunicationManager &communicationManager, InstanceManager &instanceManager, MemoryManager &memoryManager, ComputeManager &computeManager, std::shared_ptr< MemorySpace > bufferMemorySpace, std::shared_ptr< ComputeResource > computeResource, const uint64_t baseTag=_HICR_RPC_ENGINE_CHANNEL_BASE_TAG)
Definition RPCEngine.hpp:109
__INLINE__ bool hasPendingRPCs()
Definition RPCEngine.hpp:161
__INLINE__ void parseAndExecuteRPC()
Definition RPCEngine.hpp:206
~RPCEngine()=default
__INLINE__ HiCR::InstanceManager * getInstanceManager() const
Definition RPCEngine.hpp:327
__INLINE__ void submitReturnValue(void *pointer, const size_t size)
Definition RPCEngine.hpp:264
__INLINE__ std::shared_ptr< HiCR::Instance > getRPCRequester()
Definition RPCEngine.hpp:230
__INLINE__ std::shared_ptr< HiCR::LocalMemorySlot > getReturnValue() const
Definition RPCEngine.hpp:287
__INLINE__ HiCR::MemoryManager * getMemoryManager() const
Definition RPCEngine.hpp:334
__INLINE__ const RPCArgument_t getRPCArgument()
Definition RPCEngine.hpp:237
Provides a definition for the base backend's communication manager class.
Provides a definition for the abstract instance manager class.
Provides a definition for the base backend's memory manager class.
Provides a definition for the abstract device manager class.
Provides Consumer functionality for MPSC based on SPSC, that is without global locks.
Provides prod.
Definition RPCEngine.hpp:78
RPCTargetIndex_t index
Index of the RPC.
Definition RPCEngine.hpp:80
RPCArgument_t argument
Optional argument to the RPC.
Definition RPCEngine.hpp:83
Definition RPCEngine.hpp:90
std::shared_ptr< HiCR::ExecutionUnit > executionUnit
RPC execution unit.
Definition RPCEngine.hpp:95
Provides variable-sized MPSC consumer channel, locking version.
Provides variable-sized MPSC producer channel, locking version.
Generated by