Skip to content

Commit c8503da

Browse files
chore: refactor background tasks processing (#2031)
Co-authored-by: sangjanai <sang@jan.ai>
1 parent b9e9e15 commit c8503da

File tree

6 files changed

+120
-20
lines changed

6 files changed

+120
-20
lines changed

Diff for: engine/controllers/models.cc

+4-5
Original file line numberDiff line numberDiff line change
@@ -218,11 +218,10 @@ void Models::ListModel(
218218
obj["id"] = model_entry.model;
219219
obj["model"] = model_entry.model;
220220
obj["status"] = "downloaded";
221-
// TODO(sang) Temporarily remove this estimation
222-
// auto es = model_service_->GetEstimation(model_entry.model);
223-
// if (es.has_value() && !!es.value()) {
224-
// obj["recommendation"] = hardware::ToJson(*(es.value()));
225-
// }
221+
auto es = model_service_->GetEstimation(model_entry.model);
222+
if (es.has_value()) {
223+
obj["recommendation"] = hardware::ToJson(*es);
224+
}
226225
data.append(std::move(obj));
227226
yaml_handler.Reset();
228227
} else if (model_config.engine == kPythonEngine) {

Diff for: engine/main.cc

+6-2
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#include "utils/file_manager_utils.h"
3838
#include "utils/logging_utils.h"
3939
#include "utils/system_info_utils.h"
40+
#include "utils/task_queue.h"
4041

4142
#if defined(__APPLE__) && defined(__MACH__)
4243
#include <libgen.h> // for dirname()
@@ -177,8 +178,11 @@ void RunServer(std::optional<std::string> host, std::optional<int> port,
177178
download_service, dylib_path_manager, db_service);
178179
auto inference_svc = std::make_shared<InferenceService>(engine_service);
179180
auto model_src_svc = std::make_shared<ModelSourceService>(db_service);
180-
auto model_service = std::make_shared<ModelService>(
181-
db_service, hw_service, download_service, inference_svc, engine_service);
181+
cortex::TaskQueue task_queue(
182+
std::min(2u, std::thread::hardware_concurrency()), "background_task");
183+
auto model_service =
184+
std::make_shared<ModelService>(db_service, hw_service, download_service,
185+
inference_svc, engine_service, task_queue);
182186
inference_svc->SetModelService(model_service);
183187

184188
auto file_watcher_srv = std::make_shared<FileWatcherService>(

Diff for: engine/services/hardware_service.cc

+1-3
Original file line numberDiff line numberDiff line change
@@ -207,9 +207,6 @@ bool HardwareService::Restart(const std::string& host, int port) {
207207
if (!TryConnectToServer(host, port)) {
208208
return false;
209209
}
210-
std::cout << "Server started" << std::endl;
211-
std::cout << "API Documentation available at: http://" << host << ":"
212-
<< port << std::endl;
213210
}
214211

215212
#endif
@@ -348,6 +345,7 @@ void HardwareService::UpdateHardwareInfos() {
348345
return false;
349346
return true;
350347
};
348+
351349
auto res = db_service_->AddHardwareEntry(
352350
HwEntry{.uuid = gpu.uuid,
353351
.type = "gpu",

Diff for: engine/services/model_service.cc

+51-3
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,21 @@ cpp::result<DownloadTask, std::string> GetDownloadTask(
143143
}
144144
} // namespace
145145

146+
ModelService::ModelService(std::shared_ptr<DatabaseService> db_service,
147+
std::shared_ptr<HardwareService> hw_service,
148+
std::shared_ptr<DownloadService> download_service,
149+
std::shared_ptr<InferenceService> inference_service,
150+
std::shared_ptr<EngineServiceI> engine_svc,
151+
cortex::TaskQueue& task_queue)
152+
: db_service_(db_service),
153+
hw_service_(hw_service),
154+
download_service_{download_service},
155+
inference_svc_(inference_service),
156+
engine_svc_(engine_svc),
157+
task_queue_(task_queue) {
158+
ProcessBgrTasks();
159+
};
160+
146161
void ModelService::ForceIndexingModelList() {
147162
CTL_INF("Force indexing model list");
148163

@@ -331,8 +346,17 @@ cpp::result<DownloadTask, std::string> ModelService::HandleDownloadUrlAsync(
331346
return download_service_->AddTask(downloadTask, on_finished);
332347
}
333348

349+
std::optional<hardware::Estimation> ModelService::GetEstimation(
350+
const std::string& model_handle) {
351+
std::lock_guard l(es_mtx_);
352+
if (auto it = es_.find(model_handle); it != es_.end()) {
353+
return it->second;
354+
}
355+
return std::nullopt;
356+
}
357+
334358
cpp::result<std::optional<hardware::Estimation>, std::string>
335-
ModelService::GetEstimation(const std::string& model_handle,
359+
ModelService::EstimateModel(const std::string& model_handle,
336360
const std::string& kv_cache, int n_batch,
337361
int n_ubatch) {
338362
namespace fs = std::filesystem;
@@ -548,7 +572,7 @@ ModelService::DownloadModelFromCortexsoAsync(
548572
// Close the file
549573
pyvenv_cfg.close();
550574
// Add executable permission to python
551-
set_permission_utils::SetExecutePermissionsRecursive(venv_path);
575+
(void)set_permission_utils::SetExecutePermissionsRecursive(venv_path);
552576
} else {
553577
CTL_ERR("Failed to extract venv.zip");
554578
};
@@ -828,7 +852,7 @@ cpp::result<StartModelResult, std::string> ModelService::StartModel(
828852
CTL_WRN("Error: " + res.error());
829853
for (auto& depend : depends) {
830854
if (depend != model_handle) {
831-
StopModel(depend);
855+
auto sr = StopModel(depend);
832856
}
833857
}
834858
return cpp::fail("Model failed to start dependency '" + depend +
@@ -1390,4 +1414,28 @@ std::string ModelService::GetEngineByModelId(
13901414
auto mc = yaml_handler.GetModelConfig();
13911415
CTL_DBG(mc.engine);
13921416
return mc.engine;
1417+
}
1418+
1419+
void ModelService::ProcessBgrTasks() {
1420+
CTL_INF("Start processing background tasks")
1421+
auto cb = [this] {
1422+
CTL_DBG("Estimate model resource usage");
1423+
auto list_entry = db_service_->LoadModelList();
1424+
if (list_entry) {
1425+
for (const auto& model_entry : list_entry.value()) {
1426+
// Only process local models
1427+
if (model_entry.status == cortex::db::ModelStatus::Downloaded) {
1428+
auto es = EstimateModel(model_entry.model);
1429+
if (es.has_value()) {
1430+
std::lock_guard l(es_mtx_);
1431+
es_[model_entry.model] = es.value();
1432+
}
1433+
}
1434+
}
1435+
}
1436+
};
1437+
1438+
auto clone = cb;
1439+
task_queue_.RunInQueue(std::move(cb));
1440+
task_queue_.RunEvery(std::chrono::seconds(10), std::move(clone));
13931441
}

Diff for: engine/services/model_service.h

+13-7
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include "services/download_service.h"
1111
#include "services/hardware_service.h"
1212
#include "utils/hardware/gguf/gguf_file_estimate.h"
13+
#include "utils/task_queue.h"
1314

1415
class InferenceService;
1516

@@ -35,12 +36,8 @@ class ModelService {
3536
std::shared_ptr<HardwareService> hw_service,
3637
std::shared_ptr<DownloadService> download_service,
3738
std::shared_ptr<InferenceService> inference_service,
38-
std::shared_ptr<EngineServiceI> engine_svc)
39-
: db_service_(db_service),
40-
hw_service_(hw_service),
41-
download_service_{download_service},
42-
inference_svc_(inference_service),
43-
engine_svc_(engine_svc) {};
39+
std::shared_ptr<EngineServiceI> engine_svc,
40+
cortex::TaskQueue& task_queue);
4441

4542
cpp::result<std::string, std::string> AbortDownloadModel(
4643
const std::string& task_id);
@@ -81,7 +78,10 @@ class ModelService {
8178

8279
bool HasModel(const std::string& id) const;
8380

84-
cpp::result<std::optional<hardware::Estimation>, std::string> GetEstimation(
81+
std::optional<hardware::Estimation> GetEstimation(
82+
const std::string& model_handle);
83+
84+
cpp::result<std::optional<hardware::Estimation>, std::string> EstimateModel(
8585
const std::string& model_handle, const std::string& kv_cache = "f16",
8686
int n_batch = 2048, int n_ubatch = 2048);
8787

@@ -112,6 +112,8 @@ class ModelService {
112112
const std::string& model_path, int ngl, int ctx_len, int n_batch = 2048,
113113
int n_ubatch = 2048, const std::string& kv_cache_type = "f16");
114114

115+
void ProcessBgrTasks();
116+
115117
int GetCpuThreads() const;
116118

117119
std::shared_ptr<DatabaseService> db_service_;
@@ -126,4 +128,8 @@ class ModelService {
126128
*/
127129
std::unordered_map<std::string, std::shared_ptr<ModelMetadata>>
128130
loaded_model_metadata_map_;
131+
132+
std::mutex es_mtx_;
133+
std::unordered_map<std::string, std::optional<hardware::Estimation>> es_;
134+
cortex::TaskQueue& task_queue_;
129135
};

Diff for: engine/utils/task_queue.h

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
#pragma once
2+
#include <memory>
3+
#include <string>
4+
#include "trantor/net/EventLoopThreadPool.h"
5+
6+
namespace cortex {
7+
class TaskQueue {
8+
public:
9+
TaskQueue(size_t num_threads, const std::string& name)
10+
: ev_loop_pool_(
11+
std::make_unique<trantor::EventLoopThreadPool>(num_threads, name)) {
12+
ev_loop_pool_->start();
13+
}
14+
~TaskQueue() {}
15+
16+
template <typename Functor>
17+
void RunInQueue(Functor&& f) {
18+
if (ev_loop_pool_) {
19+
ev_loop_pool_->getNextLoop()->runInLoop(std::forward<Functor>(f));
20+
}
21+
}
22+
23+
template <typename Functor>
24+
uint64_t RunEvery(const std::chrono::duration<double>& interval,
25+
Functor&& cb) {
26+
if (ev_loop_pool_) {
27+
return ev_loop_pool_->getNextLoop()->runEvery(interval,
28+
std::forward<Functor>(cb));
29+
}
30+
return 0;
31+
}
32+
33+
template <typename Functor>
34+
uint64_t RunAfter(const std::chrono::duration<double>& delay, Functor&& cb) {
35+
if (ev_loop_pool_) {
36+
return ev_loop_pool_->getNextLoop()->runAfter(delay,
37+
std::forward<Functor>(cb));
38+
}
39+
return 0;
40+
}
41+
42+
private:
43+
std::unique_ptr<trantor::EventLoopThreadPool> ev_loop_pool_ = nullptr;
44+
};
45+
} // namespace cortex

0 commit comments

Comments
 (0)