/home/runner/work/HiCR/HiCR/include/hicr/frontends/RPCEngine/RPCEngine.hpp Source File
HiCR
|
RPCEngine.hpp
Go to the documentation of this file.
147 __INLINE__ void addRPCTarget(const std::string &RPCName, const std::shared_ptr<HiCR::ExecutionUnit> e)
181 RPCPayload_t *buffer = (RPCPayload_t *)(_RPCConsumerChannel->getTokenBuffers()[requester]->getSourceLocalMemorySlot()->getPointer());
200 __INLINE__ std::shared_ptr<HiCR::Instance> getRPCRequester() { return _instanceManager.getInstances()[_requesterInstanceIdx]; }
215 virtual void requestRPC(HiCR::Instance::instanceId_t targetInstanceId, const std::string &RPCName, const HiCR::frontend::RPCEngine::RPCArgument_t argument = 0)
223 auto tempBufferSlot = _memoryManager.registerLocalMemorySlot(_bufferMemorySpace, (void *)&RPCPayload, sizeof(RPCPayload_t));
238 auto sourceBufferSlot = _memoryManager.registerLocalMemorySlot(_bufferMemorySpace, pointer, size);
273 _communicationManager.memcpy(tempBufferSlot, 0, _returnValueConsumerChannel->getPayloadBufferMemorySlot(), msgOffset, msgSize);
290 [[nodiscard]] __INLINE__ HiCR::CommunicationManager *getCommunicationManager() const { return &_communicationManager; }
297 [[nodiscard]] __INLINE__ HiCR::InstanceManager *getInstanceManager() const { return &_instanceManager; }
304 [[nodiscard]] __INLINE__ HiCR::MemoryManager *getMemoryManager() const { return &_memoryManager; }
311 [[nodiscard]] __INLINE__ HiCR::ComputeManager *getComputeManager() const { return &_computeManager; }
321 static RPCTargetIndex_t getRPCTargetIndexFromString(const std::string &name) { return std::hash<std::string>()(name); }
330 if (_RPCTargetMap.contains(rpcIdx) == false) HICR_THROW_RUNTIME("Attempting to run an RPC target (Hash: %lu) that was not defined in this instance (0x%lX).\n", rpcIdx, this);
362 auto tokenBufferSize = HiCR::channel::fixedSize::Base::getTokenBufferSize(tokenSize, _HICR_RPC_ENGINE_CHANNEL_COUNT_CAPACITY);
386 auto tokenBufferSlot = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, tokenBufferSize);
388 // consumer needs to allocate #producers consumer side coordination buffers for #producers SPSCs
389 auto consumerCoordinationBuffer = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
393 auto producerCoordinationBuffer = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
405 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_TOKEN_BUFFER_TAG, tokenBuffers);
409 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_PRODUCER_COORDINATION_BUFFER_TAG, producerCoordinationBuffers);
413 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_TAG, consumerCoordinationBuffers);
426 const HiCR::GlobalMemorySlot::globalKey_t remoteSlotKey = i * instanceCount + currentInstanceId;
428 auto globalConsumerTokenBufferSlot = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_TOKEN_BUFFER_TAG, localSlotKey);
431 auto producerCoordinationBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_PRODUCER_COORDINATION_BUFFER_TAG, remoteSlotKey);
434 auto consumerCoordinationBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_TAG, localSlotKey);
439 _communicationManager, globalConsumerTokenBuffers, localConsumerCoordinationBuffers, globalProducerCoordinationBuffers, tokenSize, _HICR_RPC_ENGINE_CHANNEL_COUNT_CAPACITY);
453 const HiCR::GlobalMemorySlot::globalKey_t remoteSlotKey = i * instanceCount + currentInstanceId;
455 auto globalConsumerTokenBufferSlot = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_TOKEN_BUFFER_TAG, remoteSlotKey);
458 auto producerCoordinationBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_PRODUCER_COORDINATION_BUFFER_TAG, localSlotKey);
461 auto consumerCoordinationBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_TAG, remoteSlotKey);
490 const uint64_t _HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_PAYLOADS_TAG = _baseTag + 3;
498 auto tokenSizeBufferSize = HiCR::channel::variableSize::Base::getTokenBufferSize(sizeof(size_t), _HICR_RPC_ENGINE_CHANNEL_COUNT_CAPACITY);
501 auto tokenSizeBufferSlot = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, tokenSizeBufferSize);
504 auto payloadBufferSlot = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, _HICR_RPC_ENGINE_CHANNEL_PAYLOAD_CAPACITY);
510 auto localConsumerCoordinationBufferMessageSizes = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
511 auto localConsumerCoordinationBufferMessagePayloads = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
514 HiCR::channel::variableSize::Base::initializeCoordinationBuffer(localConsumerCoordinationBufferMessageSizes);
515 HiCR::channel::variableSize::Base::initializeCoordinationBuffer(localConsumerCoordinationBufferMessagePayloads);
518 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_SIZES_BUFFER_TAG, {{currentInstanceId, tokenSizeBufferSlot}});
521 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_PAYLOAD_BUFFER_TAG, {{currentInstanceId, payloadBufferSlot}});
524 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_SIZES_TAG,
528 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_PAYLOADS_TAG,
530 _communicationManager.fence(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_PAYLOADS_TAG);
533 auto consumerMessagePayloadBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_PAYLOAD_BUFFER_TAG, currentInstanceId);
534 auto consumerMessageSizesBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_SIZES_BUFFER_TAG, currentInstanceId);
535 auto consumerCoodinationPayloadsBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_PAYLOADS_TAG, currentInstanceId);
536 auto consumerCoordinationSizesBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_SIZES_TAG, currentInstanceId);
539 _returnValueConsumerChannel = std::make_shared<HiCR::channel::variableSize::MPSC::locking::Consumer>(_communicationManager,
556 auto localProducerSizeInfoBuffer = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, sizeof(size_t));
557 auto localProducerCoordinationBufferMessageSizes = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
558 auto localProducerCoordinationBufferMessagePayloads = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
561 HiCR::channel::variableSize::Base::initializeCoordinationBuffer(localProducerCoordinationBufferMessageSizes);
562 HiCR::channel::variableSize::Base::initializeCoordinationBuffer(localProducerCoordinationBufferMessagePayloads);
565 auto consumerMessagePayloadBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_PAYLOAD_BUFFER_TAG, consumerInstanceId);
566 auto consumerMessageSizesBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_SIZES_BUFFER_TAG, consumerInstanceId);
567 auto consumerPayloadConsumerBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_PAYLOADS_TAG, consumerInstanceId);
568 auto consumerSizesConsumerBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_SIZES_TAG, consumerInstanceId);
571 _returnValueProducerChannels[consumerInstanceId] = std::make_shared<HiCR::channel::variableSize::MPSC::locking::Producer>(_communicationManager,
623 std::shared_ptr<HiCR::channel::variableSize::MPSC::locking::Consumer> _returnValueConsumerChannel;
628 std::map<HiCR::Instance::instanceId_t, std::shared_ptr<HiCR::channel::variableSize::MPSC::locking::Producer>> _returnValueProducerChannels;
#define _HICR_RPC_ENGINE_CHANNEL_PAYLOAD_CAPACITY
Definition RPCEngine.hpp:38
#define _HICR_RPC_ENGINE_CHANNEL_BASE_TAG
Definition RPCEngine.hpp:48
#define _HICR_RPC_ENGINE_CHANNEL_COUNT_CAPACITY
Definition RPCEngine.hpp:43
Definition communicationManager.hpp:54
__INLINE__ std::shared_ptr< GlobalMemorySlot > getGlobalMemorySlot(GlobalMemorySlot::tag_t tag, GlobalMemorySlot::globalKey_t globalKey)
Definition communicationManager.hpp:98
__INLINE__ void memcpy(const std::shared_ptr< LocalMemorySlot > &destination, size_t dst_offset, const std::shared_ptr< LocalMemorySlot > &source, size_t src_offset, size_t size)
Definition communicationManager.hpp:250
__INLINE__ void fence(GlobalMemorySlot::tag_t tag)
Definition communicationManager.hpp:360
__INLINE__ void exchangeGlobalMemorySlots(GlobalMemorySlot::tag_t tag, const std::vector< globalKeyMemorySlotPair_t > &memorySlots)
Definition communicationManager.hpp:83
Definition computeManager.hpp:48
virtual std::unique_ptr< HiCR::ProcessingUnit > createProcessingUnit(std::shared_ptr< HiCR::ComputeResource > resource) const =0
__INLINE__ void initialize(std::unique_ptr< HiCR::ProcessingUnit > &processingUnit)
Definition computeManager.hpp:81
__INLINE__ void start(std::unique_ptr< HiCR::ProcessingUnit > &processingUnit, std::unique_ptr< HiCR::ExecutionState > &executionState)
Definition computeManager.hpp:103
virtual std::unique_ptr< HiCR::ExecutionState > createExecutionState(std::shared_ptr< HiCR::ExecutionUnit > executionUnit, void *const argument=nullptr) const =0
__INLINE__ void await(std::unique_ptr< HiCR::ProcessingUnit > &processingUnit)
Definition computeManager.hpp:177
Definition instanceManager.hpp:57
__INLINE__ std::shared_ptr< HiCR::Instance > getCurrentInstance() const
Definition instanceManager.hpp:95
__INLINE__ instanceList_t & getInstances()
Definition instanceManager.hpp:89
Definition memoryManager.hpp:51
__INLINE__ void freeLocalMemorySlot(const std::shared_ptr< HiCR::LocalMemorySlot > &memorySlot)
Definition memoryManager.hpp:138
virtual std::shared_ptr< LocalMemorySlot > registerLocalMemorySlot(const std::shared_ptr< HiCR::MemorySpace > &memorySpace, void *const ptr, const size_t size)
Definition memoryManager.hpp:86
__INLINE__ std::shared_ptr< LocalMemorySlot > allocateLocalMemorySlot(const std::shared_ptr< MemorySpace > &memorySpace, const size_t size)
Definition memoryManager.hpp:66
static __INLINE__ size_t getTokenBufferSize(const size_t tokenSize, const size_t capacity) noexcept
Definition base.hpp:123
static __INLINE__ void initializeCoordinationBuffer(const std::shared_ptr< LocalMemorySlot > &coordinationBuffer)
Definition base.hpp:100
static __INLINE__ size_t getCoordinationBufferSize() noexcept
Definition base.hpp:92
Definition RPCEngine.hpp:61
__INLINE__ void addRPCTarget(const std::string &RPCName, const std::shared_ptr< HiCR::ExecutionUnit > e)
Definition RPCEngine.hpp:147
virtual void requestRPC(HiCR::Instance::instanceId_t targetInstanceId, const std::string &RPCName, const HiCR::frontend::RPCEngine::RPCArgument_t argument=0)
Definition RPCEngine.hpp:215
__INLINE__ HiCR::ComputeManager * getComputeManager() const
Definition RPCEngine.hpp:311
__INLINE__ HiCR::CommunicationManager * getCommunicationManager() const
Definition RPCEngine.hpp:290
RPCEngine(CommunicationManager &communicationManager, InstanceManager &instanceManager, MemoryManager &memoryManager, ComputeManager &computeManager, std::shared_ptr< MemorySpace > bufferMemorySpace, std::shared_ptr< ComputeResource > computeResource, const uint64_t baseTag=_HICR_RPC_ENGINE_CHANNEL_BASE_TAG)
Definition RPCEngine.hpp:109
__INLINE__ bool hasPendingRPCs()
Definition RPCEngine.hpp:161
~RPCEngine()=default
__INLINE__ HiCR::InstanceManager * getInstanceManager() const
Definition RPCEngine.hpp:297
__INLINE__ void submitReturnValue(void *pointer, const size_t size)
Definition RPCEngine.hpp:234
__INLINE__ std::shared_ptr< HiCR::Instance > getRPCRequester()
Definition RPCEngine.hpp:200
__INLINE__ std::shared_ptr< HiCR::LocalMemorySlot > getReturnValue() const
Definition RPCEngine.hpp:257
__INLINE__ HiCR::MemoryManager * getMemoryManager() const
Definition RPCEngine.hpp:304
__INLINE__ const RPCArgument_t getRPCArgument()
Definition RPCEngine.hpp:207
Provides a definition for the base backend's communication manager class.
Provides a definition for the abstract instance manager class.
Provides a definition for the base backend's memory manager class.
Provides a definition for the abstract device manager class.
Provides Consumer functionality for MPSC based on SPSC, that is without global locks.
Provides prod.
Definition RPCEngine.hpp:78
RPCTargetIndex_t index
Index of the RPC.
Definition RPCEngine.hpp:80
RPCArgument_t argument
Optional argument to the RPC.
Definition RPCEngine.hpp:83
Definition RPCEngine.hpp:90
std::shared_ptr< HiCR::ExecutionUnit > executionUnit
RPC execution unit.
Definition RPCEngine.hpp:95
Provides variable-sized MPSC consumer channel, locking version.
Provides variable-sized MPSC producer channel, locking version.
Generated by