Skip to content

Commit fd6acde

Browse files
authored
add snapshot check and retry (#159)
1 parent 2e9127c commit fd6acde

10 files changed

+211
-87
lines changed

include/raft/rocksdb_file_system_adaptor.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,8 @@ friend class RocksdbFileSystemAdaptor;
130130
bool _is_meta_reader = false;
131131
bool _closed = true;
132132
size_t _num_lines = 0;
133+
butil::IOPortal _last_package;
134+
off_t _last_offset = 0;
133135
};
134136

135137
class SstWriterAdaptor : public braft::FileAdaptor {
@@ -153,7 +155,7 @@ friend class RocksdbFileSystemAdaptor;
153155

154156
private:
155157
bool finish_sst();
156-
int parse_from_iobuf(const butil::IOBuf& data, std::vector<std::string>& keys, std::vector<std::string>& values);
158+
int iobuf_to_sst(butil::IOBuf data);
157159
int64_t _region_id;
158160
SmartRegion _region_ptr;
159161
std::string _path;

include/store/region.h

+14
Original file line numberDiff line numberDiff line change
@@ -735,6 +735,18 @@ friend class Backup;
735735
std::shared_ptr<Region> get_ptr() {
736736
return shared_from_this();
737737
}
738+
uint64_t snapshot_data_size() const {
739+
return _snapshot_data_size;
740+
}
741+
void set_snapshot_data_size(size_t size) {
742+
_snapshot_data_size = size;
743+
}
744+
uint64_t snapshot_meta_size() const {
745+
return _snapshot_meta_size;
746+
}
747+
void set_snapshot_meta_size(size_t size) {
748+
_snapshot_meta_size = size;
749+
}
738750
uint64_t get_approx_size() const {
739751
//分裂后一段时间每超过10分钟,或者超过10%的数据量diff则需要重新获取
740752
if (_approx_info.time_cost.get_time() > 10 * 60 * 1000 * 1000LL &&
@@ -899,6 +911,8 @@ friend class Backup;
899911
std::mutex _region_lock;
900912
//split后缓存分裂出去的region信息供baikaldb使用
901913
std::vector<pb::RegionInfo> _new_region_infos;
914+
size_t _snapshot_data_size = 0;
915+
size_t _snapshot_meta_size = 0;
902916
pb::RegionInfo _new_region_info;
903917
int64_t _region_id;
904918

include/store/rpc_sender.h

+2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ class RpcSender {
2525
int64_t request_version);
2626

2727
static int64_t get_peer_applied_index(const std::string& peer, int64_t region_id);
28+
static void get_peer_snapshot_size(const std::string& peer, int64_t region_id,
29+
uint64_t* data_size, uint64_t* meta_size);
2830
static int send_query_method(const pb::StoreReq& request,
2931
const std::string& instance,
3032
int64_t receive_region_id);

proto/store.interface.proto

+8
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,12 @@ message IndexRecords {
121121
repeated bytes records = 2;
122122
};
123123

124+
message RegionRaftStat {
125+
optional int64 applied_index = 1;
126+
optional uint64 snapshot_data_size = 2;
127+
optional uint64 snapshot_meta_size = 3;
128+
};
129+
124130
message StoreRes {
125131
required ErrCode errcode = 1;
126132
optional bytes errmsg = 2;
@@ -145,6 +151,7 @@ message StoreRes {
145151
repeated int64 commit_ts = 20; //与上面的binlog一一对应
146152
optional PeerStatus region_status = 21;
147153
optional int64 last_insert_id = 22;
154+
optional RegionRaftStat region_raft_stat = 23;
148155
};
149156
message InitRegion {
150157
required RegionInfo region_info = 1;
@@ -222,6 +229,7 @@ service StoreService {
222229
rpc add_peer(AddPeer) returns (StoreRes);
223230

224231
//get region的applied_index, 用来分裂完成后优化选择transfer leader的对象
232+
//复用,获取snapshot数据大小,用作校验
225233
rpc get_applied_index(GetAppliedIndex) returns (StoreRes);
226234

227235
rpc compact_region(RegionIds) returns (StoreRes);

src/meta_server/region_manager.cpp

+8
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,23 @@
2121
#include "meta_util.h"
2222
#include "table_manager.h"
2323
#include "meta_rocksdb.h"
24+
#ifdef BAIDU_INTERNAL
25+
#include "baidu/rpc/reloadable_flags.h"
26+
#else
2427
#include "brpc/reloadable_flags.h"
28+
#endif
2529

2630
namespace baikaldb {
2731
DECLARE_int32(concurrency_num);
2832
DECLARE_int64(store_heart_beat_interval_us);
2933
DECLARE_int32(store_dead_interval_times);
3034
DECLARE_int32(region_faulty_interval_times);
3135
DEFINE_int32(balance_add_peer_num, 10, "add peer num each time, default(10)");
36+
#ifdef BAIDU_INTERNAL
37+
BAIDU_RPC_VALIDATE_GFLAG(balance_add_peer_num, brpc::PositiveInteger);
38+
#else
3239
BRPC_VALIDATE_GFLAG(balance_add_peer_num, brpc::PositiveInteger);
40+
#endif
3341

3442
//增加或者更新region信息
3543
//如果是增加,则需要更新表信息, 只有leader的上报会调用该接口

0 commit comments

Comments
 (0)