/home/runner/work/HiCR/HiCR/include/hicr/frontends/tasking/worker.hpp Source File

HiCR: /home/runner/work/HiCR/HiCR/include/hicr/frontends/tasking/worker.hpp Source File
HiCR
worker.hpp
Go to the documentation of this file.
1/*
2 * Copyright 2025 Huawei Technologies Co., Ltd.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
24#pragma once
25
26#include <thread>
27#include <memory>
28#include <utility>
29#include <vector>
30#include <set>
31#include <unistd.h>
32#include <hicr/core/definitions.hpp>
37#include "task.hpp"
38
39namespace HiCR::tasking
40{
41
45constexpr size_t _DEFAULT_SUSPEND_INTERVAL_MS = 1000;
46
50constexpr size_t _MILISECONDS_PER_SECOND = 1000;
51
55using pullFunction_t = std::function<HiCR::tasking::Task *()>;
56
64class Worker
65{
66 public:
67
98
103
149
158 Worker(HiCR::ComputeManager *executionStateComputeManager,
159 HiCR::ComputeManager *processingUnitComputeManager,
160 pullFunction_t pullFunction,
161 workerCallbackMap_t *callbackMap = nullptr)
162 : _executionStateComputeManager(executionStateComputeManager),
163 _processingUnitComputeManager(processingUnitComputeManager),
164 _pullFunction(std::move(pullFunction)),
165 _callbackMap(callbackMap)
166 {}
167
168 virtual ~Worker() = default;
169
175 __INLINE__ const state_t getState() { return _state; }
176
182 __INLINE__ void setCallbackMap(workerCallbackMap_t *callbackMap) { _callbackMap = callbackMap; }
183
189 __INLINE__ workerCallbackMap_t *getCallbackMap() { return _callbackMap; }
190
196 __INLINE__ HiCR::tasking::Task *getCurrentTask() { return _currentTask; }
197
201 __INLINE__ void initialize()
202 {
203 // Grabbing state value
204 auto prevState = _state.load();
205
206 // Checking we have at least one assigned resource
207 if (_processingUnits.empty()) HICR_THROW_LOGIC("Attempting to initialize worker without any assigned resources");
208
209 // Checking state
210 if (prevState != state_t::uninitialized && prevState != state_t::terminated) HICR_THROW_RUNTIME("Attempting to initialize already initialized worker");
211
212 // Initializing all resources
213 for (auto &r : _processingUnits) _processingUnitComputeManager->initialize(r);
214
215 // Transitioning state
216 _state = state_t::ready;
217 }
218
222 __INLINE__ void start()
223 {
224 // Grabbing state value
225 auto prevState = _state.load();
226
227 // Checking state
228 if (prevState != state_t::ready) HICR_THROW_RUNTIME("Attempting to start worker that is not in the 'initialized' state");
229
230 // Setting state
231 _state = state_t::running;
232
233 // Creating new execution unit (the processing unit must support an execution unit of 'host' type)
234 auto executionUnit = HiCR::backend::pthreads::ComputeManager::createExecutionUnit([](void *worker) { static_cast<HiCR::tasking::Worker *>(worker)->mainLoop(); });
235
236 // Creating worker's execution state
237 auto executionState = _executionStateComputeManager->createExecutionState(executionUnit, this);
238
239 // Launching worker in the lead resource (first one to be added)
240 _processingUnitComputeManager->start(_processingUnits[0], executionState);
241 }
242
249 __INLINE__ bool suspend()
250 {
251 // Doing an atomic exchange
252 state_t expected = state_t::running;
253 bool succeeded = _state.compare_exchange_weak(expected, state_t::suspending);
254
255 // Checking exchange
256 return succeeded;
257 }
258
264 __INLINE__ bool resume()
265 {
266 // Doing an atomic exchange
267 state_t expected = state_t::suspended;
268 bool succeeded = _state.compare_exchange_weak(expected, state_t::resuming);
269
270 // Checking exchange
271 return succeeded;
272 }
273
277 __INLINE__ void terminate()
278 {
279 // Transitioning state
280 auto prevState = _state.exchange(state_t::terminating);
281
282 // Checking state
283 if (prevState != state_t::running && prevState != state_t::suspending) HICR_THROW_RUNTIME("Attempting to stop worker that is not in a terminate-able state");
284 }
285
289 __INLINE__ void await()
290 {
291 // Getting state
292 auto prevState = _state.load();
293
294 if (prevState != state_t::terminating && prevState != state_t::running && prevState != state_t::suspended && prevState != state_t::suspending && prevState != state_t::resuming)
295 HICR_THROW_RUNTIME("Attempting to wait for a worker that has not yet started or has already terminated");
296
297 // Wait for the resources to free up
298 for (auto &p : _processingUnits) _processingUnitComputeManager->await(p);
299
300 // Transitioning state
301 _state = state_t::terminated;
302 }
303
309 __INLINE__ void addProcessingUnit(std::unique_ptr<HiCR::ProcessingUnit> pu) { _processingUnits.push_back(std::move(pu)); }
310
316 __INLINE__ std::vector<std::unique_ptr<HiCR::ProcessingUnit>> &getProcessingUnits() { return _processingUnits; }
317
323 __INLINE__ void setSuspendInterval(size_t suspendIntervalMs) { _suspendIntervalMs = suspendIntervalMs; }
324
325 protected:
326
332 __INLINE__ virtual bool checkResumeConditions() { return _state == state_t::resuming; }
333
334 private:
335
339 HiCR::ComputeManager *const _executionStateComputeManager;
340
344 HiCR::ComputeManager *const _processingUnitComputeManager;
345
349 HiCR::tasking::Task *_currentTask = nullptr;
350
354 const pullFunction_t _pullFunction;
355
359 size_t _suspendIntervalMs = _DEFAULT_SUSPEND_INTERVAL_MS;
360
364 std::atomic<state_t> _state = state_t::uninitialized;
365
369 std::vector<std::unique_ptr<HiCR::ProcessingUnit>> _processingUnits;
370
374 workerCallbackMap_t *_callbackMap = nullptr;
375
379 __INLINE__ void mainLoop()
380 {
381 // Calling appropriate callback
382 if (_callbackMap != nullptr) _callbackMap->trigger(this, callback_t::onWorkerStart);
383
384 // Start main worker loop (run until terminated)
385 while (true)
386 {
387 // Attempt to get a task by executing the pull function
388 _currentTask = _pullFunction();
389
390 // Calling appropriate callback
391 if (_callbackMap != nullptr) _callbackMap->trigger(this, callback_t::onWorkerTaskPulled);
392
393 // If a task was returned, then start or execute it
394 if (_currentTask != nullptr) [[likely]]
395 {
396 // If the task hasn't been initialized yet, we need to do it now
398 {
399 // First, create new execution state for the processing unit
400 auto executionState = _executionStateComputeManager->createExecutionState(_currentTask->getExecutionUnit(), _currentTask);
401
402 // Then initialize the task with the new execution state
403 _currentTask->initialize(std::move(executionState));
404 }
405
406 // Now actually run the task
407 _currentTask->run();
408 }
409
410 // Requesting processing units to terminate as soon as possible
411 if (_state == state_t::suspending) [[unlikely]]
412 {
413 // Setting state as suspended
414 _state = state_t::suspended;
415
416 // Calling appropriate callback
417 if (_callbackMap != nullptr) _callbackMap->trigger(this, callback_t::onWorkerSuspend);
418
419 // Suspending other processing units
420 for (size_t i = 1; i < _processingUnits.size(); i++) _processingUnitComputeManager->suspend(_processingUnits[i]);
421
422 // Putting current processing unit to check every so often
423 while (checkResumeConditions() == false) usleep(_suspendIntervalMs * _MILISECONDS_PER_SECOND);
424
425 // Calling appropriate callback
426 if (_callbackMap != nullptr) _callbackMap->trigger(this, callback_t::onWorkerResume);
427
428 // Resuming other processing units
429 for (size_t i = 1; i < _processingUnits.size(); i++) _processingUnitComputeManager->resume(_processingUnits[i]);
430
431 // Setting worker as running
432 _state = state_t::running;
433 }
434
435 // Requesting processing units to terminate as soon as possible
436 if (_state == state_t::terminating) [[unlikely]]
437 {
438 // Calling appropriate callback
439 if (_callbackMap != nullptr) _callbackMap->trigger(this, callback_t::onWorkerTerminate);
440
441 // Terminate secondary processing units first
442 for (size_t i = 1; i < _processingUnits.size(); i++) _processingUnitComputeManager->terminate(_processingUnits[i]);
443
444 // Then terminate current processing unit
445 _processingUnitComputeManager->terminate(_processingUnits[0]);
446
447 // Return immediately
448 return;
449 }
450 }
451 }
452}; // class Worker
453
454} // namespace HiCR::tasking
This file implements the compute manager for Pthreads backend.
Definition computeManager.hpp:48
__INLINE__ void initialize(std::unique_ptr< HiCR::ProcessingUnit > &processingUnit)
Definition computeManager.hpp:81
__INLINE__ void suspend(std::unique_ptr< HiCR::ProcessingUnit > &processingUnit)
Definition computeManager.hpp:123
__INLINE__ void start(std::unique_ptr< HiCR::ProcessingUnit > &processingUnit, std::unique_ptr< HiCR::ExecutionState > &executionState)
Definition computeManager.hpp:103
virtual std::unique_ptr< HiCR::ExecutionState > createExecutionState(std::shared_ptr< HiCR::ExecutionUnit > executionUnit, void *const argument=nullptr) const =0
__INLINE__ void terminate(std::unique_ptr< HiCR::ProcessingUnit > &processingUnit)
Definition computeManager.hpp:163
__INLINE__ void await(std::unique_ptr< HiCR::ProcessingUnit > &processingUnit)
Definition computeManager.hpp:177
__INLINE__ void resume(std::unique_ptr< HiCR::ProcessingUnit > &processingUnit)
Definition computeManager.hpp:143
@ uninitialized
Definition executionState.hpp:49
static __INLINE__ std::shared_ptr< HiCR::ExecutionUnit > createExecutionUnit(const pthreadFc_t &threadFunction)
Definition computeManager.hpp:67
Definition callbackMap.hpp:40
Definition task.hpp:57
__INLINE__ std::shared_ptr< HiCR::ExecutionUnit > getExecutionUnit() const
Definition task.hpp:152
__INLINE__ void run()
Definition task.hpp:173
__INLINE__ void initialize(std::unique_ptr< HiCR::ExecutionState > executionState)
Definition task.hpp:159
__INLINE__ const HiCR::ExecutionState::state_t getState()
Definition task.hpp:131
Definition worker.hpp:65
HiCR::tasking::CallbackMap< Worker *, callback_t > workerCallbackMap_t
Definition worker.hpp:102
__INLINE__ bool suspend()
Definition worker.hpp:249
__INLINE__ void setSuspendInterval(size_t suspendIntervalMs)
Definition worker.hpp:323
__INLINE__ std::vector< std::unique_ptr< HiCR::ProcessingUnit > > & getProcessingUnits()
Definition worker.hpp:316
Worker(HiCR::ComputeManager *executionStateComputeManager, HiCR::ComputeManager *processingUnitComputeManager, pullFunction_t pullFunction, workerCallbackMap_t *callbackMap=nullptr)
Definition worker.hpp:158
__INLINE__ HiCR::tasking::Task * getCurrentTask()
Definition worker.hpp:196
callback_t
Definition worker.hpp:72
@ onWorkerStart
Definition worker.hpp:76
@ onWorkerTaskPulled
Definition worker.hpp:81
@ onWorkerSuspend
Definition worker.hpp:86
@ onWorkerResume
Definition worker.hpp:91
@ onWorkerTerminate
Definition worker.hpp:96
__INLINE__ workerCallbackMap_t * getCallbackMap()
Definition worker.hpp:189
__INLINE__ void start()
Definition worker.hpp:222
__INLINE__ void initialize()
Definition worker.hpp:201
__INLINE__ void addProcessingUnit(std::unique_ptr< HiCR::ProcessingUnit > pu)
Definition worker.hpp:309
__INLINE__ void terminate()
Definition worker.hpp:277
state_t
Definition worker.hpp:108
@ suspending
Definition worker.hpp:127
@ running
Definition worker.hpp:122
@ uninitialized
Definition worker.hpp:112
@ suspended
Definition worker.hpp:132
@ terminated
Definition worker.hpp:147
@ resuming
Definition worker.hpp:137
@ ready
Definition worker.hpp:117
@ terminating
Definition worker.hpp:142
__INLINE__ void await()
Definition worker.hpp:289
__INLINE__ const state_t getState()
Definition worker.hpp:175
virtual __INLINE__ bool checkResumeConditions()
Definition worker.hpp:332
__INLINE__ void setCallbackMap(workerCallbackMap_t *callbackMap)
Definition worker.hpp:182
__INLINE__ bool resume()
Definition worker.hpp:264
Provides a definition for the abstract compute manager class.
Provides a definition for a HiCR ProcessingUnit class.
Provides a failure model and corresponding exception classes.
#define HICR_THROW_RUNTIME(...)
Definition exceptions.hpp:74
#define HICR_THROW_LOGIC(...)
Definition exceptions.hpp:67
This file implements the HiCR task class.
constexpr size_t _MILISECONDS_PER_SECOND
Definition worker.hpp:50
std::function< HiCR::tasking::Task *()> pullFunction_t
Definition worker.hpp:55
constexpr size_t _DEFAULT_SUSPEND_INTERVAL_MS
Definition worker.hpp:45