/home/runner/work/HiCR/HiCR/include/hicr/frontends/RPCEngine/RPCEngine.hpp Source File

HiCR: /home/runner/work/HiCR/HiCR/include/hicr/frontends/RPCEngine/RPCEngine.hpp Source File
HiCR
RPCEngine.hpp
Go to the documentation of this file.
1/*
2 * Copyright 2025 Huawei Technologies Co., Ltd.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
24#pragma once
25
34
38#define _HICR_RPC_ENGINE_CHANNEL_PAYLOAD_CAPACITY 1048576
39
43#define _HICR_RPC_ENGINE_CHANNEL_COUNT_CAPACITY 1024
44
48#define _HICR_RPC_ENGINE_CHANNEL_BASE_TAG 0xF0000000
49
50namespace HiCR::frontend
51{
52
61{
62 public:
63
67 using RPCTargetIndex_t = uint64_t;
68
72 using RPCArgument_t = uint64_t;
73
85
90 {
92 std::string name;
93
95 std::shared_ptr<HiCR::ExecutionUnit> executionUnit;
96 };
97
109 RPCEngine(CommunicationManager &communicationManager,
110 InstanceManager &instanceManager,
111 MemoryManager &memoryManager,
112 ComputeManager &computeManager,
113 std::shared_ptr<MemorySpace> bufferMemorySpace,
114 std::shared_ptr<ComputeResource> computeResource,
115 const uint64_t baseTag = _HICR_RPC_ENGINE_CHANNEL_BASE_TAG)
116 : _communicationManager(communicationManager),
117 _instanceManager(instanceManager),
118 _memoryManager(memoryManager),
119 _computeManager(computeManager),
120 _bufferMemorySpace(bufferMemorySpace),
121 _computeResource(computeResource),
122 _baseTag(baseTag)
123 {}
124
130 __INLINE__ void initialize()
131 {
132 // Creating MPSC channels to receive RPC requests
133 initializeRPCChannels();
134 initializeReturnValueChannels();
135 }
136
140 ~RPCEngine() = default;
141
147 __INLINE__ void addRPCTarget(const std::string &RPCName, const std::shared_ptr<HiCR::ExecutionUnit> e)
148 {
149 // Obtaining hash from the RPC name
150 const auto idx = getRPCTargetIndexFromString(RPCName);
151
152 // Inserting the new entry
153 _RPCTargetMap[idx] = {RPCName, e};
154 }
155
161 __INLINE__ bool hasPendingRPCs()
162 {
163 // Updating depth
164 _RPCConsumerChannel->updateDepth();
165
166 // Checking if empty
167 return _RPCConsumerChannel->isEmpty() == false;
168 }
169
173 __INLINE__ void listen()
174 {
175 // Calling the backend-specific implementation of the listen function
176 while (_RPCConsumerChannel->getDepth() == 0) _RPCConsumerChannel->updateDepth();
177
178 // Once a request has arrived, gather its value from the channel
179 auto request = _RPCConsumerChannel->peek();
180 auto requester = request[0];
181 RPCPayload_t *buffer = (RPCPayload_t *)(_RPCConsumerChannel->getTokenBuffers()[requester]->getSourceLocalMemorySlot()->getPointer());
182 RPCPayload_t payload = buffer[request[1]];
183 _RPCConsumerChannel->pop();
184
185 // Setting requester instance index
186 _requesterInstanceIdx = requester;
187
188 // Storing rpc argument
189 _currentRPCArgument = payload.argument;
190
191 // Execute RPC
192 executeRPC(payload.index);
193 }
194
200 __INLINE__ std::shared_ptr<HiCR::Instance> getRPCRequester() { return _instanceManager.getInstances()[_requesterInstanceIdx]; }
201
207 [[nodiscard]] __INLINE__ const RPCArgument_t getRPCArgument() { return _currentRPCArgument; }
208
215 virtual void requestRPC(HiCR::Instance &instance, const std::string &RPCName, const HiCR::frontend::RPCEngine::RPCArgument_t argument = 0)
216 {
217 const auto targetInstanceId = instance.getId();
218
219 // Creating message payload
220 RPCPayload_t RPCPayload;
221 RPCPayload.index = getRPCTargetIndexFromString(RPCName);
222 RPCPayload.argument = argument;
223
224 // Registering source buffer
225 auto tempBufferSlot = _memoryManager.registerLocalMemorySlot(_bufferMemorySpace, (void *)&RPCPayload, sizeof(RPCPayload_t));
226
227 // Sending source buffer
228 _RPCProducerChannels.at(targetInstanceId)->push(tempBufferSlot);
229 }
230
236 __INLINE__ void submitReturnValue(void *pointer, const size_t size)
237 {
238 // Getting source buffers
239 auto tempBufferSlot = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, size);
240 auto sourceBufferSlot = _memoryManager.registerLocalMemorySlot(_bufferMemorySpace, pointer, size);
241
242 // Copying data
243 _communicationManager.memcpy(tempBufferSlot, 0, sourceBufferSlot, 0, size);
244
245 // Waiting for communication to end
246 _communicationManager.fence(tempBufferSlot, 1, 0);
247
248 // Sending return value data
249 _returnValueProducerChannels.at(_requesterInstanceIdx)->push(tempBufferSlot);
250
251 // Freeing up local memory slot
252 _memoryManager.freeLocalMemorySlot(tempBufferSlot);
253 }
254
260 __INLINE__ std::shared_ptr<HiCR::LocalMemorySlot> getReturnValue(HiCR::Instance &instance) const
261 {
262 // Calling the backend-specific implementation of the listen function
263 while (_returnValueConsumerChannel->isEmpty());
264
265 // Calling backend-specific implementation of this function
266 auto returnValue = _returnValueConsumerChannel->peek();
267
268 // Getting message info
269 auto msgOffset = returnValue[0];
270 auto msgSize = returnValue[1];
271
272 // Creating local buffer
273 auto tempBufferSlot = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, msgSize);
274
275 // Copying data
276 _communicationManager.memcpy(tempBufferSlot, 0, _returnValueConsumerChannel->getPayloadBufferMemorySlot(), msgOffset, msgSize);
277
278 // Waiting for communication to end
279 _communicationManager.fence(tempBufferSlot, 1, 0);
280
281 // Freeing up channel
282 _returnValueConsumerChannel->pop();
283
284 // Returning internal buffer
285 return tempBufferSlot;
286 }
287
293 [[nodiscard]] __INLINE__ HiCR::CommunicationManager *getCommunicationManager() const { return &_communicationManager; }
294
300 [[nodiscard]] __INLINE__ HiCR::InstanceManager *getInstanceManager() const { return &_instanceManager; }
301
307 [[nodiscard]] __INLINE__ HiCR::MemoryManager *getMemoryManager() const { return &_memoryManager; }
308
314 [[nodiscard]] __INLINE__ HiCR::ComputeManager *getComputeManager() const { return &_computeManager; }
315
316 private:
317
324 static RPCTargetIndex_t getRPCTargetIndexFromString(const std::string &name) { return std::hash<std::string>()(name); }
325
330 __INLINE__ void executeRPC(const RPCTargetIndex_t rpcIdx) const
331 {
332 // Getting RPC target from the index
333 if (_RPCTargetMap.contains(rpcIdx) == false) HICR_THROW_RUNTIME("Attempting to run an RPC target (Hash: %lu) that was not defined in this instance (0x%lX).\n", rpcIdx, this);
334 auto &target = _RPCTargetMap.at(rpcIdx);
335 auto &targetName = target.name;
336 auto &targetExecutionUnit = target.executionUnit;
337
338 printf("Running: %s\n", targetName.c_str());
339
340 // Creating new processing unit to execute the RPC
341 auto p = _computeManager.createProcessingUnit(_computeResource);
342 _computeManager.initialize(p);
343
344 // Creating execution state
345 auto s = _computeManager.createExecutionState(targetExecutionUnit);
346
347 // Executing RPC
348 _computeManager.start(p, s);
349
350 // Waiting for execution to finish
351 _computeManager.await(p);
352 }
353
354 __INLINE__ void initializeRPCChannels()
355 {
356 // Defining tag values
357 const uint64_t _HICR_RPC_ENGINE_CHANNEL_CONSUMER_TOKEN_BUFFER_TAG = _baseTag + 4;
358 const uint64_t _HICR_RPC_ENGINE_CHANNEL_PRODUCER_COORDINATION_BUFFER_TAG = _baseTag + 5;
359 const uint64_t _HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_TAG = _baseTag + 6;
360
361 // Getting required buffer sizes
362 auto tokenSize = sizeof(RPCPayload_t);
363
364 // Getting required buffer sizes
366
367 // Getting my current instance
368 const auto currentInstanceId = _instanceManager.getCurrentInstance()->getId();
369
370 // Getting total instance count
371 const auto instanceCount = _instanceManager.getInstances().size();
372
373 // Creating and exchanging buffers
374
375 std::vector<HiCR::CommunicationManager::globalKeyMemorySlotPair_t> tokenBuffers;
376 std::vector<HiCR::CommunicationManager::globalKeyMemorySlotPair_t> consumerCoordinationBuffers;
377 std::vector<HiCR::CommunicationManager::globalKeyMemorySlotPair_t> producerCoordinationBuffers;
378 std::vector<std::shared_ptr<HiCR::LocalMemorySlot>> localConsumerCoordinationBuffers;
379 std::vector<std::shared_ptr<HiCR::LocalMemorySlot>> localProducerCoordinationBuffers;
380
381 auto coordinationBufferSize = HiCR::channel::fixedSize::Base::getCoordinationBufferSize();
382
383 for (size_t i = 0; i < instanceCount; i++)
384 {
385 // Calculating these particular slots' key
386 const HiCR::GlobalMemorySlot::globalKey_t slotkey = currentInstanceId * instanceCount + i;
387
388 // consumer needs to allocate #producers token buffers for #producers SPSCs
389 auto tokenBufferSlot = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, tokenBufferSize);
390
391 // consumer needs to allocate #producers consumer side coordination buffers for #producers SPSCs
392 auto consumerCoordinationBuffer = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
394
395 // producer needs to allocate #consumers producer-side coordination buffers for #consumer SPSCs
396 auto producerCoordinationBuffer = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
398
399 // Adding to collections
400 localConsumerCoordinationBuffers.push_back(consumerCoordinationBuffer);
401 localProducerCoordinationBuffers.push_back(producerCoordinationBuffer);
402 tokenBuffers.push_back(std::make_pair(slotkey, tokenBufferSlot));
403 consumerCoordinationBuffers.push_back(std::make_pair(slotkey, consumerCoordinationBuffer));
404 producerCoordinationBuffers.push_back(std::make_pair(slotkey, producerCoordinationBuffer));
405 }
406
407 // communicate to producers the token buffer references
408 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_TOKEN_BUFFER_TAG, tokenBuffers);
409 _communicationManager.fence(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_TOKEN_BUFFER_TAG);
410
411 // get from producers their coordination buffer references
412 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_PRODUCER_COORDINATION_BUFFER_TAG, producerCoordinationBuffers);
413 _communicationManager.fence(_HICR_RPC_ENGINE_CHANNEL_PRODUCER_COORDINATION_BUFFER_TAG);
414
415 // communicate to producers the consumer buffers
416 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_TAG, consumerCoordinationBuffers);
417 _communicationManager.fence(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_TAG);
418
420 {
421 std::vector<std::shared_ptr<HiCR::GlobalMemorySlot>> globalConsumerTokenBuffers;
422 std::vector<std::shared_ptr<HiCR::GlobalMemorySlot>> globalConsumerCoordinationBuffers;
423 std::vector<std::shared_ptr<HiCR::GlobalMemorySlot>> globalProducerCoordinationBuffers;
424
425 for (size_t i = 0; i < instanceCount; i++)
426 {
427 // Calculating these particular slots' key
428 const HiCR::GlobalMemorySlot::globalKey_t localSlotKey = currentInstanceId * instanceCount + i;
429 const HiCR::GlobalMemorySlot::globalKey_t remoteSlotKey = i * instanceCount + currentInstanceId;
430
431 auto globalConsumerTokenBufferSlot = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_TOKEN_BUFFER_TAG, localSlotKey);
432 globalConsumerTokenBuffers.push_back(globalConsumerTokenBufferSlot);
433
434 auto producerCoordinationBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_PRODUCER_COORDINATION_BUFFER_TAG, remoteSlotKey);
435 globalProducerCoordinationBuffers.push_back(producerCoordinationBuffer);
436
437 auto consumerCoordinationBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_TAG, localSlotKey);
438 globalConsumerCoordinationBuffers.push_back(consumerCoordinationBuffer);
439 }
440
441 _RPCConsumerChannel = std::make_shared<HiCR::channel::fixedSize::MPSC::nonlocking::Consumer>(
442 _communicationManager, globalConsumerTokenBuffers, localConsumerCoordinationBuffers, globalProducerCoordinationBuffers, tokenSize, _HICR_RPC_ENGINE_CHANNEL_COUNT_CAPACITY);
443 }
444
446
447 {
448 std::vector<std::shared_ptr<HiCR::GlobalMemorySlot>> globalConsumerTokenBuffers;
449 std::vector<std::shared_ptr<HiCR::GlobalMemorySlot>> globalConsumerCoordinationBuffers;
450 std::vector<std::shared_ptr<HiCR::GlobalMemorySlot>> globalProducerCoordinationBuffers;
451
452 for (size_t i = 0; i < instanceCount; i++)
453 {
454 // Calculating these particular slots' key
455 const HiCR::GlobalMemorySlot::globalKey_t localSlotKey = currentInstanceId * instanceCount + i;
456 const HiCR::GlobalMemorySlot::globalKey_t remoteSlotKey = i * instanceCount + currentInstanceId;
457
458 auto globalConsumerTokenBufferSlot = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_TOKEN_BUFFER_TAG, remoteSlotKey);
459 globalConsumerTokenBuffers.push_back(globalConsumerTokenBufferSlot);
460
461 auto producerCoordinationBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_PRODUCER_COORDINATION_BUFFER_TAG, localSlotKey);
462 globalProducerCoordinationBuffers.push_back(producerCoordinationBuffer);
463
464 auto consumerCoordinationBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_TAG, remoteSlotKey);
465 globalConsumerCoordinationBuffers.push_back(consumerCoordinationBuffer);
466 }
467
468 for (size_t i = 0; i < instanceCount; i++)
469 {
470 // Getting consumer instance id
471 const auto consumerInstanceId = _instanceManager.getInstances()[i]->getId();
472
473 // Creating producer channel
474 // This call does the same as the SPSC Producer constructor, as
475 // the producer of MPSC::nonlocking has the same view
476 _RPCProducerChannels[consumerInstanceId] =
477 std::make_shared<HiCR::channel::fixedSize::MPSC::nonlocking::Producer>(_communicationManager,
478 globalConsumerTokenBuffers[i],
479 globalProducerCoordinationBuffers[i]->getSourceLocalMemorySlot(),
480 globalConsumerCoordinationBuffers[i],
481 tokenSize,
483 }
484 }
485 }
486
487 __INLINE__ void initializeReturnValueChannels()
488 {
489 // Defining tag values
490 const uint64_t _HICR_RPC_ENGINE_CHANNEL_CONSUMER_SIZES_BUFFER_TAG = _baseTag + 0;
491 const uint64_t _HICR_RPC_ENGINE_CHANNEL_CONSUMER_PAYLOAD_BUFFER_TAG = _baseTag + 1;
492 const uint64_t _HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_SIZES_TAG = _baseTag + 2;
493 const uint64_t _HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_PAYLOADS_TAG = _baseTag + 3;
494
495 // Getting my current instance
496 const auto currentInstanceId = _instanceManager.getCurrentInstance()->getId();
497
499
500 // Getting required buffer sizes
502
503 // Allocating token size buffer as a local memory slot
504 auto tokenSizeBufferSlot = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, tokenSizeBufferSize);
505
506 // Allocating token size buffer as a local memory slot
507 auto payloadBufferSlot = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, _HICR_RPC_ENGINE_CHANNEL_PAYLOAD_CAPACITY);
508
509 // Getting required buffer size for coordination buffers
511
512 // Allocating coordination buffers
513 auto localConsumerCoordinationBufferMessageSizes = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
514 auto localConsumerCoordinationBufferMessagePayloads = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
515
516 // Initializing coordination buffers
517 HiCR::channel::variableSize::Base::initializeCoordinationBuffer(localConsumerCoordinationBufferMessageSizes);
518 HiCR::channel::variableSize::Base::initializeCoordinationBuffer(localConsumerCoordinationBufferMessagePayloads);
519
520 // Exchanging local memory slots to become global for them to be used by the remote end
521 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_SIZES_BUFFER_TAG, {{currentInstanceId, tokenSizeBufferSlot}});
522 _communicationManager.fence(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_SIZES_BUFFER_TAG);
523
524 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_PAYLOAD_BUFFER_TAG, {{currentInstanceId, payloadBufferSlot}});
525 _communicationManager.fence(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_PAYLOAD_BUFFER_TAG);
526
527 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_SIZES_TAG,
528 {{currentInstanceId, localConsumerCoordinationBufferMessageSizes}});
529 _communicationManager.fence(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_SIZES_TAG);
530
531 _communicationManager.exchangeGlobalMemorySlots(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_PAYLOADS_TAG,
532 {{currentInstanceId, localConsumerCoordinationBufferMessagePayloads}});
533 _communicationManager.fence(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_PAYLOADS_TAG);
534
535 // Obtaining the globally exchanged memory slots
536 auto consumerMessagePayloadBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_PAYLOAD_BUFFER_TAG, currentInstanceId);
537 auto consumerMessageSizesBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_SIZES_BUFFER_TAG, currentInstanceId);
538 auto consumerCoodinationPayloadsBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_PAYLOADS_TAG, currentInstanceId);
539 auto consumerCoordinationSizesBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_SIZES_TAG, currentInstanceId);
540
541 // Creating channel
542 _returnValueConsumerChannel = std::make_shared<HiCR::channel::variableSize::MPSC::locking::Consumer>(_communicationManager,
543 consumerMessagePayloadBuffer,
544 consumerMessageSizesBuffer,
545 localConsumerCoordinationBufferMessageSizes,
546 localConsumerCoordinationBufferMessagePayloads,
547 consumerCoordinationSizesBuffer,
548 consumerCoodinationPayloadsBuffer,
551
552 // Creating producer channels
553 for (const auto &instance : _instanceManager.getInstances())
554 {
555 // Getting consumer instance id
556 const auto consumerInstanceId = instance->getId();
557
558 // Allocating coordination buffers
559 auto localProducerSizeInfoBuffer = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, sizeof(size_t));
560 auto localProducerCoordinationBufferMessageSizes = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
561 auto localProducerCoordinationBufferMessagePayloads = _memoryManager.allocateLocalMemorySlot(_bufferMemorySpace, coordinationBufferSize);
562
563 // Initializing coordination buffers
564 HiCR::channel::variableSize::Base::initializeCoordinationBuffer(localProducerCoordinationBufferMessageSizes);
565 HiCR::channel::variableSize::Base::initializeCoordinationBuffer(localProducerCoordinationBufferMessagePayloads);
566
567 // Obtaining the globally exchanged memory slots
568 auto consumerMessagePayloadBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_PAYLOAD_BUFFER_TAG, consumerInstanceId);
569 auto consumerMessageSizesBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_SIZES_BUFFER_TAG, consumerInstanceId);
570 auto consumerPayloadConsumerBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_PAYLOADS_TAG, consumerInstanceId);
571 auto consumerSizesConsumerBuffer = _communicationManager.getGlobalMemorySlot(_HICR_RPC_ENGINE_CHANNEL_CONSUMER_COORDINATION_BUFFER_SIZES_TAG, consumerInstanceId);
572
573 // Creating channel
574 _returnValueProducerChannels[consumerInstanceId] = std::make_shared<HiCR::channel::variableSize::MPSC::locking::Producer>(_communicationManager,
575 localProducerSizeInfoBuffer,
576 consumerMessagePayloadBuffer,
577 consumerMessageSizesBuffer,
578 localProducerCoordinationBufferMessageSizes,
579 localProducerCoordinationBufferMessagePayloads,
580 consumerSizesConsumerBuffer,
581 consumerPayloadConsumerBuffer,
583 sizeof(uint8_t),
585 }
586 }
587
591 CommunicationManager &_communicationManager;
592
596 InstanceManager &_instanceManager;
597
601 MemoryManager &_memoryManager;
602
606 ComputeManager &_computeManager;
607
611 const std::shared_ptr<HiCR::MemorySpace> _bufferMemorySpace;
612
616 const std::shared_ptr<HiCR::ComputeResource> _computeResource;
617
618 const uint64_t _baseTag;
619
620 size_t _requesterInstanceIdx;
621 RPCArgument_t _currentRPCArgument;
622
626 std::shared_ptr<HiCR::channel::variableSize::MPSC::locking::Consumer> _returnValueConsumerChannel;
627
631 std::map<HiCR::Instance::instanceId_t, std::shared_ptr<HiCR::channel::variableSize::MPSC::locking::Producer>> _returnValueProducerChannels;
632
636 std::shared_ptr<HiCR::channel::fixedSize::MPSC::nonlocking::Consumer> _RPCConsumerChannel;
637
641 std::map<HiCR::Instance::instanceId_t, std::shared_ptr<HiCR::channel::fixedSize::MPSC::nonlocking::Producer>> _RPCProducerChannels;
642
646 std::map<RPCTargetIndex_t, RPCTarget_t> _RPCTargetMap;
647};
648
649} // namespace HiCR::frontend
#define _HICR_RPC_ENGINE_CHANNEL_PAYLOAD_CAPACITY
Definition RPCEngine.hpp:38
#define _HICR_RPC_ENGINE_CHANNEL_BASE_TAG
Definition RPCEngine.hpp:48
#define _HICR_RPC_ENGINE_CHANNEL_COUNT_CAPACITY
Definition RPCEngine.hpp:43
Definition communicationManager.hpp:54
__INLINE__ std::shared_ptr< GlobalMemorySlot > getGlobalMemorySlot(GlobalMemorySlot::tag_t tag, GlobalMemorySlot::globalKey_t globalKey)
Definition communicationManager.hpp:115
__INLINE__ void memcpy(const std::shared_ptr< LocalMemorySlot > &destination, size_t dst_offset, const std::shared_ptr< LocalMemorySlot > &source, size_t src_offset, size_t size)
Definition communicationManager.hpp:267
__INLINE__ void fence(GlobalMemorySlot::tag_t tag)
Definition communicationManager.hpp:377
__INLINE__ void exchangeGlobalMemorySlots(GlobalMemorySlot::tag_t tag, const std::vector< globalKeyMemorySlotPair_t > &memorySlots)
Definition communicationManager.hpp:100
Definition computeManager.hpp:48
virtual std::unique_ptr< HiCR::ProcessingUnit > createProcessingUnit(std::shared_ptr< HiCR::ComputeResource > resource) const =0
__INLINE__ void initialize(std::unique_ptr< HiCR::ProcessingUnit > &processingUnit)
Definition computeManager.hpp:81
__INLINE__ void start(std::unique_ptr< HiCR::ProcessingUnit > &processingUnit, std::unique_ptr< HiCR::ExecutionState > &executionState)
Definition computeManager.hpp:103
virtual std::unique_ptr< HiCR::ExecutionState > createExecutionState(std::shared_ptr< HiCR::ExecutionUnit > executionUnit, void *const argument=nullptr) const =0
__INLINE__ void await(std::unique_ptr< HiCR::ProcessingUnit > &processingUnit)
Definition computeManager.hpp:177
uint64_t globalKey_t
Definition globalMemorySlot.hpp:44
Definition instanceManager.hpp:57
__INLINE__ std::shared_ptr< HiCR::Instance > getCurrentInstance() const
Definition instanceManager.hpp:95
__INLINE__ instanceList_t & getInstances()
Definition instanceManager.hpp:89
Definition instance.hpp:38
__INLINE__ instanceId_t getId() const
Definition instance.hpp:55
Definition memoryManager.hpp:51
__INLINE__ void freeLocalMemorySlot(const std::shared_ptr< HiCR::LocalMemorySlot > &memorySlot)
Definition memoryManager.hpp:138
virtual std::shared_ptr< LocalMemorySlot > registerLocalMemorySlot(const std::shared_ptr< HiCR::MemorySpace > &memorySpace, void *const ptr, const size_t size)
Definition memoryManager.hpp:86
__INLINE__ std::shared_ptr< LocalMemorySlot > allocateLocalMemorySlot(const std::shared_ptr< MemorySpace > &memorySpace, const size_t size)
Definition memoryManager.hpp:66
static __INLINE__ size_t getTokenBufferSize(const size_t tokenSize, const size_t capacity) noexcept
Definition base.hpp:123
static __INLINE__ void initializeCoordinationBuffer(const std::shared_ptr< LocalMemorySlot > &coordinationBuffer)
Definition base.hpp:100
static __INLINE__ size_t getCoordinationBufferSize() noexcept
Definition base.hpp:92
Definition RPCEngine.hpp:61
uint64_t RPCArgument_t
Definition RPCEngine.hpp:72
__INLINE__ void listen()
Definition RPCEngine.hpp:173
__INLINE__ void addRPCTarget(const std::string &RPCName, const std::shared_ptr< HiCR::ExecutionUnit > e)
Definition RPCEngine.hpp:147
__INLINE__ HiCR::ComputeManager * getComputeManager() const
Definition RPCEngine.hpp:314
__INLINE__ HiCR::CommunicationManager * getCommunicationManager() const
Definition RPCEngine.hpp:293
RPCEngine(CommunicationManager &communicationManager, InstanceManager &instanceManager, MemoryManager &memoryManager, ComputeManager &computeManager, std::shared_ptr< MemorySpace > bufferMemorySpace, std::shared_ptr< ComputeResource > computeResource, const uint64_t baseTag=_HICR_RPC_ENGINE_CHANNEL_BASE_TAG)
Definition RPCEngine.hpp:109
virtual void requestRPC(HiCR::Instance &instance, const std::string &RPCName, const HiCR::frontend::RPCEngine::RPCArgument_t argument=0)
Definition RPCEngine.hpp:215
__INLINE__ bool hasPendingRPCs()
Definition RPCEngine.hpp:161
uint64_t RPCTargetIndex_t
Definition RPCEngine.hpp:67
__INLINE__ HiCR::InstanceManager * getInstanceManager() const
Definition RPCEngine.hpp:300
__INLINE__ void initialize()
Definition RPCEngine.hpp:130
__INLINE__ std::shared_ptr< HiCR::LocalMemorySlot > getReturnValue(HiCR::Instance &instance) const
Definition RPCEngine.hpp:260
__INLINE__ void submitReturnValue(void *pointer, const size_t size)
Definition RPCEngine.hpp:236
__INLINE__ std::shared_ptr< HiCR::Instance > getRPCRequester()
Definition RPCEngine.hpp:200
__INLINE__ HiCR::MemoryManager * getMemoryManager() const
Definition RPCEngine.hpp:307
__INLINE__ const RPCArgument_t getRPCArgument()
Definition RPCEngine.hpp:207
Provides a definition for the base backend's communication manager class.
Provides a definition for the abstract instance manager class.
Provides a definition for the base backend's memory manager class.
Provides a definition for the abstract device manager class.
#define HICR_THROW_RUNTIME(...)
Definition exceptions.hpp:74
Provides Consumer functionality for MPSC based on SPSC, that is without global locks.
Provides prod.
Definition RPCEngine.hpp:78
RPCTargetIndex_t index
Index of the RPC.
Definition RPCEngine.hpp:80
RPCArgument_t argument
Optional argument to the RPC.
Definition RPCEngine.hpp:83
Definition RPCEngine.hpp:90
std::shared_ptr< HiCR::ExecutionUnit > executionUnit
RPC execution unit.
Definition RPCEngine.hpp:95
std::string name
RPC name.
Definition RPCEngine.hpp:92
Provides variable-sized MPSC consumer channel, locking version.
Provides variable-sized MPSC producer channel, locking version.