/home/runner/work/HiCR/HiCR/include/hicr/frontends/RPCEngine/RPCEngine.hpp Source File
HiCR
|
RPCEngine.hpp
Go to the documentation of this file.
147 __INLINE__ void addRPCTarget(const std::string &RPCName, const std::shared_ptr<HiCR::ExecutionUnit> e)
181 RPCPayload_t *buffer = (RPCPayload_t *)(_RPCConsumerChannel->getTokenBuffers()[requester]->getSourceLocalMemorySlot()->getPointer());
200 __INLINE__ std::shared_ptr<HiCR::Instance> getRPCRequester() { return _instanceManager.getInstances()[_requesterInstanceIdx]; }
215 virtual void requestRPC(HiCR::Instance &instance, const std::string &RPCName, const HiCR::frontend::RPCEngine::RPCArgument_t argument = 0)
225 auto tempBufferSlot = _memoryManager.registerLocalMemorySlot(_bufferMemorySpace, (void *)&RPCPayload, sizeof(RPCPayload_t));
240 auto sourceBufferSlot = _memoryManager.registerLocalMemorySlot(_bufferMemorySpace, pointer, size);
260 __INLINE__ std::shared_ptr<HiCR::LocalMemorySlot> getReturnValue(HiCR::Instance &instance) const
276 _communicationManager.memcpy(tempBufferSlot, 0, _returnValueConsumerChannel->getPayloadBufferMemorySlot(), msgOffset, msgSize);
293 [[nodiscard]] __INLINE__ HiCR::CommunicationManager *getCommunicationManager() const { return &_communicationManager; }
300 [[nodiscard]] __INLINE__ HiCR::InstanceManager *getInstanceManager() const { return &_instanceManager; }
307 [[nodiscard]] __INLINE__ HiCR::MemoryManager *getMemoryManager() const { return &_memoryManager; }
314 [[nodiscard]] __INLINE__ HiCR::ComputeManager *getComputeManager() const { return &_computeManager; }
324 static RPCTargetIndex_t getRPCTargetIndexFromString(const std::string &name) { return std::hash<std::string>()(name); }
333 if (_RPCTargetMap.contains(rpcIdx) == false) HICR_THROW_RUNTIME("Attempting to run an RPC target (Hash: %lu) that was not defined in this instance (0x%lX).\n", rpcIdx, this);
365 auto tokenBufferSize = HiCR::channel::fixedSize::Base::getTokenBufferSize(tokenSize, _HICR_RPC_ENGINE_CHANNEL_COUNT_CAPACITY);
389 auto tokenBufferSlot = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, tokenBufferSize);
391 // consumer needs to allocate #producers consumer side coordination buffers for #producers SPSCs
392 auto consumerCoordinationBuffer = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
396 auto producerCoordinationBuffer = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
408 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_TOKEN_BUFFER_TAG, tokenBuffers);
412 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_PRODUCER_COORDINATION_BUFFER_TAG, producerCoordinationBuffers);
416 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_TAG, consumerCoordinationBuffers);
429 const HiCR::GlobalMemorySlot::globalKey_t remoteSlotKey = i * instanceCount + currentInstanceId;
431 auto globalConsumerTokenBufferSlot = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_TOKEN_BUFFER_TAG, localSlotKey);
434 auto producerCoordinationBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_PRODUCER_COORDINATION_BUFFER_TAG, remoteSlotKey);
437 auto consumerCoordinationBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_TAG, localSlotKey);
442 _communicationManager, globalConsumerTokenBuffers, localConsumerCoordinationBuffers, globalProducerCoordinationBuffers, tokenSize, _HICR_RPC_ENGINE_CHANNEL_COUNT_CAPACITY);
456 const HiCR::GlobalMemorySlot::globalKey_t remoteSlotKey = i * instanceCount + currentInstanceId;
458 auto globalConsumerTokenBufferSlot = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_TOKEN_BUFFER_TAG, remoteSlotKey);
461 auto producerCoordinationBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_PRODUCER_COORDINATION_BUFFER_TAG, localSlotKey);
464 auto consumerCoordinationBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_TAG, remoteSlotKey);
493 const uint64_t _HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_PAYLOADS_TAG = _baseTag + 3;
501 auto tokenSizeBufferSize = HiCR::channel::variableSize::Base::getTokenBufferSize(sizeof(size_t), _HICR_RPC_ENGINE_CHANNEL_COUNT_CAPACITY);
504 auto tokenSizeBufferSlot = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, tokenSizeBufferSize);
507 auto payloadBufferSlot = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, _HICR_RPC_ENGINE_CHANNEL_PAYLOAD_CAPACITY);
513 auto localConsumerCoordinationBufferMessageSizes = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
514 auto localConsumerCoordinationBufferMessagePayloads = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
517 HiCR::channel::variableSize::Base::initializeCoordinationBuffer(localConsumerCoordinationBufferMessageSizes);
518 HiCR::channel::variableSize::Base::initializeCoordinationBuffer(localConsumerCoordinationBufferMessagePayloads);
521 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_SIZES_BUFFER_TAG, {{currentInstanceId, tokenSizeBufferSlot}});
524 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_PAYLOAD_BUFFER_TAG, {{currentInstanceId, payloadBufferSlot}});
527 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_SIZES_TAG,
531 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_PAYLOADS_TAG,
533 _communicationManager.fence(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_PAYLOADS_TAG);
536 auto consumerMessagePayloadBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_PAYLOAD_BUFFER_TAG, currentInstanceId);
537 auto consumerMessageSizesBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_SIZES_BUFFER_TAG, currentInstanceId);
538 auto consumerCoodinationPayloadsBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_PAYLOADS_TAG, currentInstanceId);
539 auto consumerCoordinationSizesBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_SIZES_TAG, currentInstanceId);
542 _returnValueConsumerChannel = std::make_shared<HiCR::channel::variableSize::MPSC::locking::Consumer>(_communicationManager,
559 auto localProducerSizeInfoBuffer = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, sizeof(size_t));
560 auto localProducerCoordinationBufferMessageSizes = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
561 auto localProducerCoordinationBufferMessagePayloads = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
564 HiCR::channel::variableSize::Base::initializeCoordinationBuffer(localProducerCoordinationBufferMessageSizes);
565 HiCR::channel::variableSize::Base::initializeCoordinationBuffer(localProducerCoordinationBufferMessagePayloads);
568 auto consumerMessagePayloadBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_PAYLOAD_BUFFER_TAG, consumerInstanceId);
569 auto consumerMessageSizesBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_SIZES_BUFFER_TAG, consumerInstanceId);
570 auto consumerPayloadConsumerBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_PAYLOADS_TAG, consumerInstanceId);
571 auto consumerSizesConsumerBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_SIZES_TAG, consumerInstanceId);
574 _returnValueProducerChannels[consumerInstanceId] = std::make_shared<HiCR::channel::variableSize::MPSC::locking::Producer>(_communicationManager,
626 std::shared_ptr<HiCR::channel::variableSize::MPSC::locking::Consumer> _returnValueConsumerChannel;
631 std::map<HiCR::Instance::instanceId_t, std::shared_ptr<HiCR::channel::variableSize::MPSC::locking::Producer>> _returnValueProducerChannels;
#define _HICR_RPC_ENGINE_CHANNEL_PAYLOAD_CAPACITY
Definition RPCEngine.hpp:38
#define _HICR_RPC_ENGINE_CHANNEL_BASE_TAG
Definition RPCEngine.hpp:48
#define _HICR_RPC_ENGINE_CHANNEL_COUNT_CAPACITY
Definition RPCEngine.hpp:43
Definition communicationManager.hpp:54
__INLINE__ std::shared_ptr< GlobalMemorySlot > getGlobalMemorySlot(GlobalMemorySlot::tag_t tag, GlobalMemorySlot::globalKey_t globalKey)
Definition communicationManager.hpp:115
__INLINE__ void memcpy(const std::shared_ptr< LocalMemorySlot > &destination, size_t dst_offset, const std::shared_ptr< LocalMemorySlot > &source, size_t src_offset, size_t size)
Definition communicationManager.hpp:267
__INLINE__ void fence(GlobalMemorySlot::tag_t tag)
Definition communicationManager.hpp:377
__INLINE__ void exchangeGlobalMemorySlots(GlobalMemorySlot::tag_t tag, const std::vector< globalKeyMemorySlotPair_t > &memorySlots)
Definition communicationManager.hpp:100
Definition computeManager.hpp:48
virtual std::unique_ptr< HiCR::ProcessingUnit > createProcessingUnit(std::shared_ptr< HiCR::ComputeResource > resource) const =0
__INLINE__ void initialize(std::unique_ptr< HiCR::ProcessingUnit > &processingUnit)
Definition computeManager.hpp:81
__INLINE__ void start(std::unique_ptr< HiCR::ProcessingUnit > &processingUnit, std::unique_ptr< HiCR::ExecutionState > &executionState)
Definition computeManager.hpp:103
virtual std::unique_ptr< HiCR::ExecutionState > createExecutionState(std::shared_ptr< HiCR::ExecutionUnit > executionUnit, void *const argument=nullptr) const =0
__INLINE__ void await(std::unique_ptr< HiCR::ProcessingUnit > &processingUnit)
Definition computeManager.hpp:177
Definition instanceManager.hpp:57
__INLINE__ std::shared_ptr< HiCR::Instance > getCurrentInstance() const
Definition instanceManager.hpp:95
__INLINE__ instanceList_t & getInstances()
Definition instanceManager.hpp:89
Definition instance.hpp:38
Definition memoryManager.hpp:51
__INLINE__ void freeLocalMemorySlot(const std::shared_ptr< HiCR::LocalMemorySlot > &memorySlot)
Definition memoryManager.hpp:138
virtual std::shared_ptr< LocalMemorySlot > registerLocalMemorySlot(const std::shared_ptr< HiCR::MemorySpace > &memorySpace, void *const ptr, const size_t size)
Definition memoryManager.hpp:86
__INLINE__ std::shared_ptr< LocalMemorySlot > allocateLocalMemorySlot(const std::shared_ptr< MemorySpace > &memorySpace, const size_t size)
Definition memoryManager.hpp:66
static __INLINE__ size_t getTokenBufferSize(const size_t tokenSize, const size_t capacity) noexcept
Definition base.hpp:123
static __INLINE__ void initializeCoordinationBuffer(const std::shared_ptr< LocalMemorySlot > &coordinationBuffer)
Definition base.hpp:100
static __INLINE__ size_t getCoordinationBufferSize() noexcept
Definition base.hpp:92
Definition RPCEngine.hpp:61
__INLINE__ void addRPCTarget(const std::string &RPCName, const std::shared_ptr< HiCR::ExecutionUnit > e)
Definition RPCEngine.hpp:147
__INLINE__ HiCR::ComputeManager * getComputeManager() const
Definition RPCEngine.hpp:314
__INLINE__ HiCR::CommunicationManager * getCommunicationManager() const
Definition RPCEngine.hpp:293
RPCEngine(CommunicationManager &communicationManager, InstanceManager &instanceManager, MemoryManager &memoryManager, ComputeManager &computeManager, std::shared_ptr< MemorySpace > bufferMemorySpace, std::shared_ptr< ComputeResource > computeResource, const uint64_t baseTag=_HICR_RPC_ENGINE_CHANNEL_BASE_TAG)
Definition RPCEngine.hpp:109
virtual void requestRPC(HiCR::Instance &instance, const std::string &RPCName, const HiCR::frontend::RPCEngine::RPCArgument_t argument=0)
Definition RPCEngine.hpp:215
__INLINE__ bool hasPendingRPCs()
Definition RPCEngine.hpp:161
~RPCEngine()=default
__INLINE__ HiCR::InstanceManager * getInstanceManager() const
Definition RPCEngine.hpp:300
__INLINE__ std::shared_ptr< HiCR::LocalMemorySlot > getReturnValue(HiCR::Instance &instance) const
Definition RPCEngine.hpp:260
__INLINE__ void submitReturnValue(void *pointer, const size_t size)
Definition RPCEngine.hpp:236
__INLINE__ std::shared_ptr< HiCR::Instance > getRPCRequester()
Definition RPCEngine.hpp:200
__INLINE__ HiCR::MemoryManager * getMemoryManager() const
Definition RPCEngine.hpp:307
__INLINE__ const RPCArgument_t getRPCArgument()
Definition RPCEngine.hpp:207
Provides a definition for the base backend's communication manager class.
Provides a definition for the abstract instance manager class.
Provides a definition for the base backend's memory manager class.
Provides a definition for the abstract device manager class.
Provides Consumer functionality for MPSC based on SPSC, that is without global locks.
Provides prod.
Definition RPCEngine.hpp:78
RPCTargetIndex_t index
Index of the RPC.
Definition RPCEngine.hpp:80
RPCArgument_t argument
Optional argument to the RPC.
Definition RPCEngine.hpp:83
Definition RPCEngine.hpp:90
std::shared_ptr< HiCR::ExecutionUnit > executionUnit
RPC execution unit.
Definition RPCEngine.hpp:95
Provides variable-sized MPSC consumer channel, locking version.
Provides variable-sized MPSC producer channel, locking version.
Generated by