/home/runner/work/HiCR/HiCR/include/hicr/frontends/RPCEngine/RPCEngine.hpp Source File

HiCR: /home/runner/work/HiCR/HiCR/include/hicr/frontends/RPCEngine/RPCEngine.hpp Source File
HiCR
RPCEngine.hpp
Go to the documentation of this file.
1/*
2 * Copyright 2025 Huawei Technologies Co., Ltd.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
24#pragma once
25
34
38#define _HICR_RPC_ENGINE_CHANNEL_PAYLOAD_CAPACITY 1048576
39
43#define _HICR_RPC_ENGINE_CHANNEL_COUNT_CAPACITY 1024
44
48#define _HICR_RPC_ENGINE_CHANNEL_BASE_TAG 0xF0000000
49
50namespace HiCR::frontend
51{
52
61{
62 public:
63
67 using RPCTargetIndex_t = uint64_t;
68
72 using RPCArgument_t = uint64_t;
73
85
90 {
92 std::string name;
93
95 std::shared_ptr<HiCR::ExecutionUnit> executionUnit;
96 };
97
109 RPCEngine(CommunicationManager &communicationManager,
110 InstanceManager &instanceManager,
111 MemoryManager &memoryManager,
112 ComputeManager &computeManager,
113 std::shared_ptr<MemorySpace> bufferMemorySpace,
114 std::shared_ptr<ComputeResource> computeResource,
115 const uint64_t baseTag = _HICR_RPC_ENGINE_CHANNEL_BASE_TAG)
116 : _communicationManager(communicationManager),
117 _instanceManager(instanceManager),
118 _memoryManager(memoryManager),
119 _computeManager(computeManager),
120 _bufferMemorySpace(bufferMemorySpace),
121 _computeResource(computeResource),
122 _baseTag(baseTag)
123 {}
124
130 __INLINE__ void initialize()
131 {
132 // Creating MPSC channels to receive RPC requests
133 initializeRPCChannels();
134 initializeReturnValueChannels();
135 }
136
140 ~RPCEngine() = default;
141
147 __INLINE__ void addRPCTarget(const std::string &RPCName, const std::shared_ptr<HiCR::ExecutionUnit> e)
148 {
149 // Obtaining hash from the RPC name
150 const auto idx = getRPCTargetIndexFromString(RPCName);
151
152 // Inserting the new entry
153 _RPCTargetMap[idx] = {RPCName, e};
154 }
155
161 __INLINE__ bool hasPendingRPCs()
162 {
163 // Updating depth
164 _RPCConsumerChannel->updateDepth();
165
166 // Checking if empty
167 return _RPCConsumerChannel->isEmpty() == false;
168 }
169
173 __INLINE__ void listen()
174 {
175 // Calling the backend-specific implementation of the listen function
176 while (_RPCConsumerChannel->getDepth() == 0) _RPCConsumerChannel->updateDepth();
177
178 // Once a request has arrived, gather its value from the channel
179 auto request = _RPCConsumerChannel->peek();
180 auto requester = request[0];
181 RPCPayload_t *buffer = (RPCPayload_t *)(_RPCConsumerChannel->getTokenBuffers()[requester]->getSourceLocalMemorySlot()->getPointer());
182 RPCPayload_t payload = buffer[request[1]];
183 _RPCConsumerChannel->pop();
184
185 // Setting requester instance index
186 _requesterInstanceIdx = requester;
187
188 // Storing rpc argument
189 _currentRPCArgument = payload.argument;
190
191 // Execute RPC
192 executeRPC(payload.index);
193 }
194
200 __INLINE__ std::shared_ptr<HiCR::Instance> getRPCRequester() { return _instanceManager.getInstances()[_requesterInstanceIdx]; }
201
207 [[nodiscard]] __INLINE__ const RPCArgument_t getRPCArgument() { return _currentRPCArgument; }
208
215 virtual void requestRPC(HiCR::Instance::instanceId_t targetInstanceId, const std::string &RPCName, const HiCR::frontend::RPCEngine::RPCArgument_t argument = 0)
216 {
217 // Creating message payload
218 RPCPayload_t RPCPayload;
219 RPCPayload.index = getRPCTargetIndexFromString(RPCName);
220 RPCPayload.argument = argument;
221
222 // Registering source buffer
223 auto tempBufferSlot = _memoryManager.registerLocalMemorySlot(_bufferMemorySpace, (void *)&RPCPayload, sizeof(RPCPayload_t));
224
225 // Sending source buffer
226 _RPCProducerChannels.at(targetInstanceId)->push(tempBufferSlot);
227 }
228
234 __INLINE__ void submitReturnValue(void *pointer, const size_t size)
235 {
236 // Getting source buffers
237 auto tempBufferSlot = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, size);
238 auto sourceBufferSlot = _memoryManager.registerLocalMemorySlot(_bufferMemorySpace, pointer, size);
239
240 // Copying data
241 _communicationManager.memcpy(tempBufferSlot, 0, sourceBufferSlot, 0, size);
242
243 // Waiting for communication to end
244 _communicationManager.fence(tempBufferSlot, 1, 0);
245
246 // Sending return value data
247 _returnValueProducerChannels.at(_requesterInstanceIdx)->push(tempBufferSlot);
248
249 // Freeing up local memory slot
250 _memoryManager.freeLocalMemorySlot(tempBufferSlot);
251 }
252
257 __INLINE__ std::shared_ptr<HiCR::LocalMemorySlot> getReturnValue() const
258 {
259 // Calling the backend-specific implementation of the listen function
260 while (_returnValueConsumerChannel->isEmpty());
261
262 // Calling backend-specific implementation of this function
263 auto returnValue = _returnValueConsumerChannel->peek();
264
265 // Getting message info
266 auto msgOffset = returnValue[0];
267 auto msgSize = returnValue[1];
268
269 // Creating local buffer
270 auto tempBufferSlot = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, msgSize);
271
272 // Copying data
273 _communicationManager.memcpy(tempBufferSlot, 0, _returnValueConsumerChannel->getPayloadBufferMemorySlot(), msgOffset, msgSize);
274
275 // Waiting for communication to end
276 _communicationManager.fence(tempBufferSlot, 1, 0);
277
278 // Freeing up channel
279 _returnValueConsumerChannel->pop();
280
281 // Returning internal buffer
282 return tempBufferSlot;
283 }
284
290 [[nodiscard]] __INLINE__ HiCR::CommunicationManager *getCommunicationManager() const { return &_communicationManager; }
291
297 [[nodiscard]] __INLINE__ HiCR::InstanceManager *getInstanceManager() const { return &_instanceManager; }
298
304 [[nodiscard]] __INLINE__ HiCR::MemoryManager *getMemoryManager() const { return &_memoryManager; }
305
311 [[nodiscard]] __INLINE__ HiCR::ComputeManager *getComputeManager() const { return &_computeManager; }
312
313 private:
314
321 static RPCTargetIndex_t getRPCTargetIndexFromString(const std::string &name) { return std::hash<std::string>()(name); }
322
327 __INLINE__ void executeRPC(const RPCTargetIndex_t rpcIdx) const
328 {
329 // Getting RPC target from the index
330 if (_RPCTargetMap.contains(rpcIdx) == false) HICR_THROW_RUNTIME("Attempting to run an RPC target (Hash: %lu) that was not defined in this instance (0x%lX).\n", rpcIdx, this);
331 auto &target = _RPCTargetMap.at(rpcIdx);
332 // auto &targetName = target.name;
333 auto &targetExecutionUnit = target.executionUnit;
334
335 // printf("Running: %s\n", targetName.c_str());
336
337 // Creating new processing unit to execute the RPC
338 auto p = _computeManager.createProcessingUnit(_computeResource);
339 _computeManager.initialize(p);
340
341 // Creating execution state
342 auto s = _computeManager.createExecutionState(targetExecutionUnit);
343
344 // Executing RPC
345 _computeManager.start(p, s);
346
347 // Waiting for execution to finish
348 _computeManager.await(p);
349 }
350
351 __INLINE__ void initializeRPCChannels()
352 {
353 // Defining tag values
354 const uint64_t _HICR_RPC_ENGINE_CHANNEL_CONSUMER_TOKEN_BUFFER_TAG = _baseTag + 4;
355 const uint64_t _HICR_RPC_ENGINE_CHANNEL_PRODUCER_COORDINATION_BUFFER_TAG = _baseTag + 5;
356 const uint64_t _HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_TAG = _baseTag + 6;
357
358 // Getting required buffer sizes
359 auto tokenSize = sizeof(RPCPayload_t);
360
361 // Getting required buffer sizes
363
364 // Getting my current instance
365 const auto currentInstanceId = _instanceManager.getCurrentInstance()->getId();
366
367 // Getting total instance count
368 const auto instanceCount = _instanceManager.getInstances().size();
369
370 // Creating and exchanging buffers
371
372 std::vector<HiCR::CommunicationManager::globalKeyMemorySlotPair_t> tokenBuffers;
373 std::vector<HiCR::CommunicationManager::globalKeyMemorySlotPair_t> consumerCoordinationBuffers;
374 std::vector<HiCR::CommunicationManager::globalKeyMemorySlotPair_t> producerCoordinationBuffers;
375 std::vector<std::shared_ptr<HiCR::LocalMemorySlot>> localConsumerCoordinationBuffers;
376 std::vector<std::shared_ptr<HiCR::LocalMemorySlot>> localProducerCoordinationBuffers;
377
378 auto coordinationBufferSize = HiCR::channel::fixedSize::Base::getCoordinationBufferSize();
379
380 for (size_t i = 0; i < instanceCount; i++)
381 {
382 // Calculating these particular slots' key
383 const HiCR::GlobalMemorySlot::globalKey_t slotkey = currentInstanceId * instanceCount + i;
384
385 // consumer needs to allocate #producers token buffers for #producers SPSCs
386 auto tokenBufferSlot = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, tokenBufferSize);
387
388 // consumer needs to allocate #producers consumer side coordination buffers for #producers SPSCs
389 auto consumerCoordinationBuffer = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
391
392 // producer needs to allocate #consumers producer-side coordination buffers for #consumer SPSCs
393 auto producerCoordinationBuffer = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
395
396 // Adding to collections
397 localConsumerCoordinationBuffers.push_back(consumerCoordinationBuffer);
398 localProducerCoordinationBuffers.push_back(producerCoordinationBuffer);
399 tokenBuffers.push_back(std::make_pair(slotkey, tokenBufferSlot));
400 consumerCoordinationBuffers.push_back(std::make_pair(slotkey, consumerCoordinationBuffer));
401 producerCoordinationBuffers.push_back(std::make_pair(slotkey, producerCoordinationBuffer));
402 }
403
404 // communicate to producers the token buffer references
405 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_TOKEN_BUFFER_TAG, tokenBuffers);
406 _communicationManager.fence(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_TOKEN_BUFFER_TAG);
407
408 // get from producers their coordination buffer references
409 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_PRODUCER_COORDINATION_BUFFER_TAG, producerCoordinationBuffers);
410 _communicationManager.fence(_HICR_RPC_ENGINE_CHANNEL_PRODUCER_COORDINATION_BUFFER_TAG);
411
412 // communicate to producers the consumer buffers
413 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_TAG, consumerCoordinationBuffers);
414 _communicationManager.fence(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_TAG);
415
417 {
418 std::vector<std::shared_ptr<HiCR::GlobalMemorySlot>> globalConsumerTokenBuffers;
419 std::vector<std::shared_ptr<HiCR::GlobalMemorySlot>> globalConsumerCoordinationBuffers;
420 std::vector<std::shared_ptr<HiCR::GlobalMemorySlot>> globalProducerCoordinationBuffers;
421
422 for (size_t i = 0; i < instanceCount; i++)
423 {
424 // Calculating these particular slots' key
425 const HiCR::GlobalMemorySlot::globalKey_t localSlotKey = currentInstanceId * instanceCount + i;
426 const HiCR::GlobalMemorySlot::globalKey_t remoteSlotKey = i * instanceCount + currentInstanceId;
427
428 auto globalConsumerTokenBufferSlot = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_TOKEN_BUFFER_TAG, localSlotKey);
429 globalConsumerTokenBuffers.push_back(globalConsumerTokenBufferSlot);
430
431 auto producerCoordinationBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_PRODUCER_COORDINATION_BUFFER_TAG, remoteSlotKey);
432 globalProducerCoordinationBuffers.push_back(producerCoordinationBuffer);
433
434 auto consumerCoordinationBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_TAG, localSlotKey);
435 globalConsumerCoordinationBuffers.push_back(consumerCoordinationBuffer);
436 }
437
438 _RPCConsumerChannel = std::make_shared<HiCR::channel::fixedSize::MPSC::nonlocking::Consumer>(
439 _communicationManager, globalConsumerTokenBuffers, localConsumerCoordinationBuffers, globalProducerCoordinationBuffers, tokenSize, _HICR_RPC_ENGINE_CHANNEL_COUNT_CAPACITY);
440 }
441
443
444 {
445 std::vector<std::shared_ptr<HiCR::GlobalMemorySlot>> globalConsumerTokenBuffers;
446 std::vector<std::shared_ptr<HiCR::GlobalMemorySlot>> globalConsumerCoordinationBuffers;
447 std::vector<std::shared_ptr<HiCR::GlobalMemorySlot>> globalProducerCoordinationBuffers;
448
449 for (size_t i = 0; i < instanceCount; i++)
450 {
451 // Calculating these particular slots' key
452 const HiCR::GlobalMemorySlot::globalKey_t localSlotKey = currentInstanceId * instanceCount + i;
453 const HiCR::GlobalMemorySlot::globalKey_t remoteSlotKey = i * instanceCount + currentInstanceId;
454
455 auto globalConsumerTokenBufferSlot = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_TOKEN_BUFFER_TAG, remoteSlotKey);
456 globalConsumerTokenBuffers.push_back(globalConsumerTokenBufferSlot);
457
458 auto producerCoordinationBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_PRODUCER_COORDINATION_BUFFER_TAG, localSlotKey);
459 globalProducerCoordinationBuffers.push_back(producerCoordinationBuffer);
460
461 auto consumerCoordinationBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_TAG, remoteSlotKey);
462 globalConsumerCoordinationBuffers.push_back(consumerCoordinationBuffer);
463 }
464
465 for (size_t i = 0; i < instanceCount; i++)
466 {
467 // Getting consumer instance id
468 const auto consumerInstanceId = _instanceManager.getInstances()[i]->getId();
469
470 // Creating producer channel
471 // This call does the same as the SPSC Producer constructor, as
472 // the producer of MPSC::nonlocking has the same view
473 _RPCProducerChannels[consumerInstanceId] =
474 std::make_shared<HiCR::channel::fixedSize::MPSC::nonlocking::Producer>(_communicationManager,
475 globalConsumerTokenBuffers[i],
476 globalProducerCoordinationBuffers[i]->getSourceLocalMemorySlot(),
477 globalConsumerCoordinationBuffers[i],
478 tokenSize,
480 }
481 }
482 }
483
484 __INLINE__ void initializeReturnValueChannels()
485 {
486 // Defining tag values
487 const uint64_t _HICR_RPC_ENGINE_CHANNEL_CONSUMER_SIZES_BUFFER_TAG = _baseTag + 0;
488 const uint64_t _HICR_RPC_ENGINE_CHANNEL_CONSUMER_PAYLOAD_BUFFER_TAG = _baseTag + 1;
489 const uint64_t _HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_SIZES_TAG = _baseTag + 2;
490 const uint64_t _HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_PAYLOADS_TAG = _baseTag + 3;
491
492 // Getting my current instance
493 const auto currentInstanceId = _instanceManager.getCurrentInstance()->getId();
494
496
497 // Getting required buffer sizes
499
500 // Allocating token size buffer as a local memory slot
501 auto tokenSizeBufferSlot = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, tokenSizeBufferSize);
502
503 // Allocating token size buffer as a local memory slot
504 auto payloadBufferSlot = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, _HICR_RPC_ENGINE_CHANNEL_PAYLOAD_CAPACITY);
505
506 // Getting required buffer size for coordination buffers
508
509 // Allocating coordination buffers
510 auto localConsumerCoordinationBufferMessageSizes = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
511 auto localConsumerCoordinationBufferMessagePayloads = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
512
513 // Initializing coordination buffers
514 HiCR::channel::variableSize::Base::initializeCoordinationBuffer(localConsumerCoordinationBufferMessageSizes);
515 HiCR::channel::variableSize::Base::initializeCoordinationBuffer(localConsumerCoordinationBufferMessagePayloads);
516
517 // Exchanging local memory slots to become global for them to be used by the remote end
518 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_SIZES_BUFFER_TAG, {{currentInstanceId, tokenSizeBufferSlot}});
519 _communicationManager.fence(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_SIZES_BUFFER_TAG);
520
521 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_PAYLOAD_BUFFER_TAG, {{currentInstanceId, payloadBufferSlot}});
522 _communicationManager.fence(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_PAYLOAD_BUFFER_TAG);
523
524 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_SIZES_TAG,
525 {{currentInstanceId, localConsumerCoordinationBufferMessageSizes}});
526 _communicationManager.fence(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_SIZES_TAG);
527
528 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_PAYLOADS_TAG,
529 {{currentInstanceId, localConsumerCoordinationBufferMessagePayloads}});
530 _communicationManager.fence(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_PAYLOADS_TAG);
531
532 // Obtaining the globally exchanged memory slots
533 auto consumerMessagePayloadBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_PAYLOAD_BUFFER_TAG, currentInstanceId);
534 auto consumerMessageSizesBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_SIZES_BUFFER_TAG, currentInstanceId);
535 auto consumerCoodinationPayloadsBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_PAYLOADS_TAG, currentInstanceId);
536 auto consumerCoordinationSizesBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_SIZES_TAG, currentInstanceId);
537
538 // Creating channel
539 _returnValueConsumerChannel = std::make_shared<HiCR::channel::variableSize::MPSC::locking::Consumer>(_communicationManager,
540 consumerMessagePayloadBuffer,
541 consumerMessageSizesBuffer,
542 localConsumerCoordinationBufferMessageSizes,
543 localConsumerCoordinationBufferMessagePayloads,
544 consumerCoordinationSizesBuffer,
545 consumerCoodinationPayloadsBuffer,
548
549 // Creating producer channels
550 for (const auto &instance : _instanceManager.getInstances())
551 {
552 // Getting consumer instance id
553 const auto consumerInstanceId = instance->getId();
554
555 // Allocating coordination buffers
556 auto localProducerSizeInfoBuffer = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, sizeof(size_t));
557 auto localProducerCoordinationBufferMessageSizes = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
558 auto localProducerCoordinationBufferMessagePayloads = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
559
560 // Initializing coordination buffers
561 HiCR::channel::variableSize::Base::initializeCoordinationBuffer(localProducerCoordinationBufferMessageSizes);
562 HiCR::channel::variableSize::Base::initializeCoordinationBuffer(localProducerCoordinationBufferMessagePayloads);
563
564 // Obtaining the globally exchanged memory slots
565 auto consumerMessagePayloadBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_PAYLOAD_BUFFER_TAG, consumerInstanceId);
566 auto consumerMessageSizesBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_SIZES_BUFFER_TAG, consumerInstanceId);
567 auto consumerPayloadConsumerBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_PAYLOADS_TAG, consumerInstanceId);
568 auto consumerSizesConsumerBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_SIZES_TAG, consumerInstanceId);
569
570 // Creating channel
571 _returnValueProducerChannels[consumerInstanceId] = std::make_shared<HiCR::channel::variableSize::MPSC::locking::Producer>(_communicationManager,
572 localProducerSizeInfoBuffer,
573 consumerMessagePayloadBuffer,
574 consumerMessageSizesBuffer,
575 localProducerCoordinationBufferMessageSizes,
576 localProducerCoordinationBufferMessagePayloads,
577 consumerSizesConsumerBuffer,
578 consumerPayloadConsumerBuffer,
580 sizeof(uint8_t),
582 }
583 }
584
588 CommunicationManager &_communicationManager;
589
593 InstanceManager &_instanceManager;
594
598 MemoryManager &_memoryManager;
599
603 ComputeManager &_computeManager;
604
608 const std::shared_ptr<HiCR::MemorySpace> _bufferMemorySpace;
609
613 const std::shared_ptr<HiCR::ComputeResource> _computeResource;
614
615 const uint64_t _baseTag;
616
617 size_t _requesterInstanceIdx;
618 RPCArgument_t _currentRPCArgument;
619
623 std::shared_ptr<HiCR::channel::variableSize::MPSC::locking::Consumer> _returnValueConsumerChannel;
624
628 std::map<HiCR::Instance::instanceId_t, std::shared_ptr<HiCR::channel::variableSize::MPSC::locking::Producer>> _returnValueProducerChannels;
629
633 std::shared_ptr<HiCR::channel::fixedSize::MPSC::nonlocking::Consumer> _RPCConsumerChannel;
634
638 std::map<HiCR::Instance::instanceId_t, std::shared_ptr<HiCR::channel::fixedSize::MPSC::nonlocking::Producer>> _RPCProducerChannels;
639
643 std::map<RPCTargetIndex_t, RPCTarget_t> _RPCTargetMap;
644};
645
646} // namespace HiCR::frontend
#define _HICR_RPC_ENGINE_CHANNEL_PAYLOAD_CAPACITY
Definition RPCEngine.hpp:38
#define _HICR_RPC_ENGINE_CHANNEL_BASE_TAG
Definition RPCEngine.hpp:48
#define _HICR_RPC_ENGINE_CHANNEL_COUNT_CAPACITY
Definition RPCEngine.hpp:43
Definition communicationManager.hpp:54
__INLINE__ std::shared_ptr< GlobalMemorySlot > getGlobalMemorySlot(GlobalMemorySlot::tag_t tag, GlobalMemorySlot::globalKey_t globalKey)
Definition communicationManager.hpp:98
__INLINE__ void memcpy(const std::shared_ptr< LocalMemorySlot > &destination, size_t dst_offset, const std::shared_ptr< LocalMemorySlot > &source, size_t src_offset, size_t size)
Definition communicationManager.hpp:250
__INLINE__ void fence(GlobalMemorySlot::tag_t tag)
Definition communicationManager.hpp:360
__INLINE__ void exchangeGlobalMemorySlots(GlobalMemorySlot::tag_t tag, const std::vector< globalKeyMemorySlotPair_t > &memorySlots)
Definition communicationManager.hpp:83
Definition computeManager.hpp:48
virtual std::unique_ptr< HiCR::ProcessingUnit > createProcessingUnit(std::shared_ptr< HiCR::ComputeResource > resource) const =0
__INLINE__ void initialize(std::unique_ptr< HiCR::ProcessingUnit > &processingUnit)
Definition computeManager.hpp:81
__INLINE__ void start(std::unique_ptr< HiCR::ProcessingUnit > &processingUnit, std::unique_ptr< HiCR::ExecutionState > &executionState)
Definition computeManager.hpp:103
virtual std::unique_ptr< HiCR::ExecutionState > createExecutionState(std::shared_ptr< HiCR::ExecutionUnit > executionUnit, void *const argument=nullptr) const =0
__INLINE__ void await(std::unique_ptr< HiCR::ProcessingUnit > &processingUnit)
Definition computeManager.hpp:177
uint64_t globalKey_t
Definition globalMemorySlot.hpp:44
Definition instanceManager.hpp:57
__INLINE__ std::shared_ptr< HiCR::Instance > getCurrentInstance() const
Definition instanceManager.hpp:95
__INLINE__ instanceList_t & getInstances()
Definition instanceManager.hpp:89
uint64_t instanceId_t
Definition instance.hpp:44
Definition memoryManager.hpp:51
__INLINE__ void freeLocalMemorySlot(const std::shared_ptr< HiCR::LocalMemorySlot > &memorySlot)
Definition memoryManager.hpp:138
virtual std::shared_ptr< LocalMemorySlot > registerLocalMemorySlot(const std::shared_ptr< HiCR::MemorySpace > &memorySpace, void *const ptr, const size_t size)
Definition memoryManager.hpp:86
__INLINE__ std::shared_ptr< LocalMemorySlot > allocateLocalMemorySlot(const std::shared_ptr< MemorySpace > &memorySpace, const size_t size)
Definition memoryManager.hpp:66
static __INLINE__ size_t getTokenBufferSize(const size_t tokenSize, const size_t capacity) noexcept
Definition base.hpp:123
static __INLINE__ void initializeCoordinationBuffer(const std::shared_ptr< LocalMemorySlot > &coordinationBuffer)
Definition base.hpp:100
static __INLINE__ size_t getCoordinationBufferSize() noexcept
Definition base.hpp:92
Definition RPCEngine.hpp:61
uint64_t RPCArgument_t
Definition RPCEngine.hpp:72
__INLINE__ void listen()
Definition RPCEngine.hpp:173
__INLINE__ void addRPCTarget(const std::string &RPCName, const std::shared_ptr< HiCR::ExecutionUnit > e)
Definition RPCEngine.hpp:147
virtual void requestRPC(HiCR::Instance::instanceId_t targetInstanceId, const std::string &RPCName, const HiCR::frontend::RPCEngine::RPCArgument_t argument=0)
Definition RPCEngine.hpp:215
__INLINE__ HiCR::ComputeManager * getComputeManager() const
Definition RPCEngine.hpp:311
__INLINE__ HiCR::CommunicationManager * getCommunicationManager() const
Definition RPCEngine.hpp:290
RPCEngine(CommunicationManager &communicationManager, InstanceManager &instanceManager, MemoryManager &memoryManager, ComputeManager &computeManager, std::shared_ptr< MemorySpace > bufferMemorySpace, std::shared_ptr< ComputeResource > computeResource, const uint64_t baseTag=_HICR_RPC_ENGINE_CHANNEL_BASE_TAG)
Definition RPCEngine.hpp:109
__INLINE__ bool hasPendingRPCs()
Definition RPCEngine.hpp:161
uint64_t RPCTargetIndex_t
Definition RPCEngine.hpp:67
__INLINE__ HiCR::InstanceManager * getInstanceManager() const
Definition RPCEngine.hpp:297
__INLINE__ void initialize()
Definition RPCEngine.hpp:130
__INLINE__ void submitReturnValue(void *pointer, const size_t size)
Definition RPCEngine.hpp:234
__INLINE__ std::shared_ptr< HiCR::Instance > getRPCRequester()
Definition RPCEngine.hpp:200
__INLINE__ std::shared_ptr< HiCR::LocalMemorySlot > getReturnValue() const
Definition RPCEngine.hpp:257
__INLINE__ HiCR::MemoryManager * getMemoryManager() const
Definition RPCEngine.hpp:304
__INLINE__ const RPCArgument_t getRPCArgument()
Definition RPCEngine.hpp:207
Provides a definition for the base backend's communication manager class.
Provides a definition for the abstract instance manager class.
Provides a definition for the base backend's memory manager class.
Provides a definition for the abstract device manager class.
#define HICR_THROW_RUNTIME(...)
Definition exceptions.hpp:74
Provides Consumer functionality for MPSC based on SPSC, that is without global locks.
Provides prod.
Definition RPCEngine.hpp:78
RPCTargetIndex_t index
Index of the RPC.
Definition RPCEngine.hpp:80
RPCArgument_t argument
Optional argument to the RPC.
Definition RPCEngine.hpp:83
Definition RPCEngine.hpp:90
std::shared_ptr< HiCR::ExecutionUnit > executionUnit
RPC execution unit.
Definition RPCEngine.hpp:95
std::string name
RPC name.
Definition RPCEngine.hpp:92
Provides variable-sized MPSC consumer channel, locking version.
Provides variable-sized MPSC producer channel, locking version.