Skip to content

Commit 84ef9c2

Browse files
authored
04_19 fix binlog & add date, hour (#223)
* 04_19 fix binlog & add date, hour * 04_19 fix binlog & add date, hour * 04_19 fix binlog & add date, hour * fix truncate
1 parent b5ff95b commit 84ef9c2

14 files changed

+163
-59
lines changed

BUILD

-9
Original file line numberDiff line numberDiff line change
@@ -822,12 +822,3 @@ baikaldb_proto_library(
822822
include = "proto",
823823
visibility = ["//visibility:public"],
824824
)
825-
826-
cc_binary(
827-
name = "test_date_time",
828-
srcs = ["test/test_date_time.cpp"],
829-
copts = ["-Iexternal/gtest/include"],
830-
deps = [
831-
":common",
832-
],
833-
)

include/common/datetime.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ extern time_t str_to_timestamp(const char* str_time);
2323
// encode DATETIME to string format
2424
// ref: https://dev.mysql.com/doc/internals/en/date-and-time-data-type-representation.html
2525
extern std::string datetime_to_str(uint64_t datetime);
26-
extern uint64_t str_to_datetime(const char* str_time);
26+
extern uint64_t str_to_datetime(const char* str_time, bool* is_full_datetime = nullptr);
2727

2828
extern time_t datetime_to_timestamp(uint64_t datetime);
2929
extern uint64_t timestamp_to_datetime(time_t timestamp);

include/exec/fetcher_store.h

+2-5
Original file line numberDiff line numberDiff line change
@@ -536,10 +536,8 @@ class FetcherStore {
536536
if (client_conn->need_send_binlog()) {
537537
return true;
538538
}
539-
} else if (op_type == pb::OP_ROLLBACK) {
540-
if (state->open_binlog() && binlog_prepare_success) {
541-
return true;
542-
}
539+
} else if (op_type == pb::OP_ROLLBACK && state->open_binlog()) {
540+
return true;
543541
}
544542
return false;
545543
}
@@ -577,7 +575,6 @@ class FetcherStore {
577575
bool is_cancelled = false;
578576
BthreadCond binlog_cond;
579577
NetworkSocket* client_conn = nullptr;
580-
bool binlog_prepare_success = false;
581578
std::atomic<bool> primary_timestamp_updated{false};
582579
std::set<int64_t> no_copy_cache_plan_set;
583580
int64_t dynamic_timeout_ms = -1;

include/expr/internal_functions.h

+2
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ ExprValue curtime(const std::vector<ExprValue>& input);
8484
ExprValue current_time(const std::vector<ExprValue>& input);
8585
ExprValue current_timestamp(const std::vector<ExprValue>& input);
8686
ExprValue timestamp(const std::vector<ExprValue>& input);
87+
ExprValue date(const std::vector<ExprValue>& input);
88+
ExprValue hour(const std::vector<ExprValue>& input);
8789
ExprValue day(const std::vector<ExprValue>& input);
8890
ExprValue dayname(const std::vector<ExprValue>& input);
8991
ExprValue dayofweek(const std::vector<ExprValue>& input);

include/sqlparser/sql_parse.y

+1-1
Original file line numberDiff line numberDiff line change
@@ -2035,7 +2035,7 @@ FunctionaNameCurdate:
20352035
;
20362036

20372037
FunctionaNameDateRelate:
2038-
DAY | MONTH | YEAR
2038+
HOUR | DAY | MONTH | YEAR | DATE
20392039
;
20402040

20412041
FunctionCallNonKeyword:

src/common/datetime.cpp

+21-1
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,12 @@ std::string datetime_to_str(uint64_t datetime) {
6161
return std::string(buf);
6262
}
6363

64-
uint64_t str_to_datetime(const char* str_time) {
64+
uint64_t str_to_datetime(const char* str_time, bool* is_full_datetime) {
6565
//[YY]YY-MM-DD HH:MM:SS.xxxxxx
6666
//[YY]YYMMDDHHMMSS.xxxxxx
67+
68+
bool is_full = false;
69+
6770
while (*str_time == ' ') {
6871
str_time++;
6972
}
@@ -112,6 +115,7 @@ uint64_t str_to_datetime(const char* str_time) {
112115
sscanf(buf, "%4lu%*[^0-9a-z]%2lu%*[^0-9a-z]%2lu"
113116
"%*[^0-9a-z]%2lu%*[^0-9a-z]%2lu%*[^0-9a-z]%2lu.%6lu",
114117
&year, &month, &day, &hour, &minute, &second, &macrosec);
118+
is_full = true;
115119
} else {
116120
if (idx <= 6) {
117121
sscanf(buf, "%2lu%2lu%2lu", &year, &month, &day);
@@ -120,11 +124,14 @@ uint64_t str_to_datetime(const char* str_time) {
120124
} else if (idx == 12) {
121125
sscanf(buf, "%2lu%2lu%2lu%2lu%2lu%2lu.%6lu",
122126
&year, &month, &day, &hour, &minute, &second, &macrosec);
127+
is_full = true;
123128
} else if (idx <= 13) {
124129
sscanf(buf, "%2lu%2lu%2lu%2lu%2lu%2lu", &year, &month, &day, &hour, &minute, &second);
130+
is_full = true;
125131
} else if (idx >= 14) {
126132
sscanf(buf, "%4lu%2lu%2lu%2lu%2lu%2lu.%6lu",
127133
&year, &month, &day, &hour, &minute, &second, &macrosec);
134+
is_full = true;
128135
} else {
129136
return 0;
130137
}
@@ -144,6 +151,10 @@ uint64_t str_to_datetime(const char* str_time) {
144151
return 0;
145152
}
146153

154+
if (is_full_datetime != nullptr) {
155+
*is_full_datetime = is_full;
156+
}
157+
147158
//datetime中间计算时会转化成int64, 最高位必须为0
148159
uint64_t datetime = 0;
149160
uint64_t year_month = year * 13 + month;
@@ -363,6 +374,15 @@ int32_t str_to_time(const char* str_time) {
363374
}
364375
}
365376

377+
// 先判断是否是完整的datetime类型字符串, 12为YYMMDDHHMMSS
378+
if (idx >= 12) {
379+
bool is_full_datetime = false;
380+
uint64_t datetime = str_to_datetime(str_time, &is_full_datetime);
381+
if (is_full_datetime) {
382+
return datetime_to_time(datetime);
383+
}
384+
}
385+
366386
if (has_blank) {
367387
sscanf(str_time, "%d %u:%2u:%2u",
368388
&day, &hour, &minute, &second);

src/engine/transaction.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -1509,8 +1509,10 @@ int Transaction::remove_columns(const TableKey& primary_key) {
15091509

15101510
static TimeCost print_lock_last_time;
15111511
void Transaction::print_txninfo_holding_lock(const std::string& key) {
1512+
return; //pthread会卡bthread,基本不用这个追case了
15121513
//内部有pthread锁
15131514
if (print_lock_last_time.get_time() > 10 * 1000 * 1000) {
1515+
print_lock_last_time.reset();
15141516
auto lock_info = _db->get_db()->GetLockStatusData();
15151517
for (auto& it : lock_info) {
15161518
if (it.second.key.size() == key.size() && it.second.key == key) {
@@ -1521,7 +1523,6 @@ void Transaction::print_txninfo_holding_lock(const std::string& key) {
15211523
break;
15221524
}
15231525
}
1524-
print_lock_last_time.reset();
15251526
}
15261527
}
15271528
} //nanespace baikaldb

src/exec/fetcher_store.cpp

+32-33
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ namespace baikaldb {
3131
DEFINE_int64(retry_interval_us, 500 * 1000, "retry interval ");
3232
DEFINE_int32(single_store_concurrency, 20, "max request for one store");
3333
DEFINE_int64(max_select_rows, 10000000, "query will be fail when select too much rows");
34-
DEFINE_int64(max_affected_rows, 10000000, "query will be fail when select too much rows");
34+
DEFINE_int64(max_affected_rows, 10000000, "query will be fail when affect too much rows");
3535
DEFINE_int64(print_time_us, 10000, "print log when time_cost > print_time_us(us)");
3636
DEFINE_int64(baikaldb_alive_time_s, 10 * 60, "obervation time length in baikaldb, default:10 min");
3737
BRPC_VALIDATE_GFLAG(print_time_us, brpc::NonNegativeInteger);
@@ -828,7 +828,8 @@ ErrorType OnRPCDone::handle_response(const std::string& remote_side) {
828828
if (_op_type != pb::OP_SELECT && _op_type != pb::OP_SELECT_FOR_UPDATE && _op_type != pb::OP_ROLLBACK) {
829829
_fetcher_store->affected_rows += _response.affected_rows();
830830
_client_conn->txn_affected_rows += _response.affected_rows();
831-
if (_client_conn->txn_affected_rows > FLAGS_max_affected_rows) {
831+
// 事务限制affected_rows,非事务限制会导致部分成功
832+
if (_client_conn->txn_affected_rows > FLAGS_max_affected_rows && _state->txn_id != 0) {
832833
DB_DONE(FATAL, "_affected_row:%ld > %ld FLAGS_max_affected_rows",
833834
_client_conn->txn_affected_rows.load(), FLAGS_max_affected_rows);
834835
return E_BIG_SQL;
@@ -1025,39 +1026,37 @@ ErrorType FetcherStore::process_binlog_start(RuntimeState* state, pb::OpType op_
10251026
if (need_process_binlog(state, op_type)) {
10261027
auto binlog_ctx = client_conn->get_binlog_ctx();
10271028
uint64_t log_id = state->log_id();
1028-
if (op_type == pb::OP_PREPARE || binlog_prepare_success) {
1029-
binlog_cond.increase();
1030-
auto write_binlog_func = [this, state, binlog_ctx, op_type, log_id]() {
1031-
ON_SCOPE_EXIT([this]() {
1032-
binlog_cond.decrease_signal();
1033-
});
1034-
if (op_type == pb::OP_PREPARE) {
1035-
int64_t timestamp = TsoFetcher::get_instance()->get_tso(binlog_ctx->tso_count());
1036-
if (timestamp < 0) {
1037-
DB_WARNING("get tso failed log_id: %lu txn_id:%lu op_type:%s", log_id, state->txn_id,
1038-
pb::OpType_Name(op_type).c_str());
1039-
error = E_FATAL;
1040-
return;
1041-
}
1042-
write_binlog_param.txn_id = state->txn_id;
1043-
write_binlog_param.log_id = log_id;
1044-
write_binlog_param.primary_region_id = client_conn->primary_region_id;
1045-
write_binlog_param.global_conn_id = client_conn->get_global_conn_id();
1046-
write_binlog_param.username = client_conn->user_info->username;
1047-
write_binlog_param.ip = client_conn->ip;
1048-
write_binlog_param.client_conn = client_conn;
1049-
write_binlog_param.fetcher_store = this;
1050-
binlog_ctx->set_start_ts(timestamp);
1051-
}
1052-
write_binlog_param.op_type = op_type;
1053-
auto ret = binlog_ctx->write_binlog(&write_binlog_param);
1054-
if (ret != E_OK) {
1029+
binlog_cond.increase();
1030+
auto write_binlog_func = [this, state, binlog_ctx, op_type, log_id]() {
1031+
ON_SCOPE_EXIT([this]() {
1032+
binlog_cond.decrease_signal();
1033+
});
1034+
if (op_type == pb::OP_PREPARE) {
1035+
int64_t timestamp = TsoFetcher::get_instance()->get_tso(binlog_ctx->tso_count());
1036+
if (timestamp < 0) {
1037+
DB_WARNING("get tso failed log_id: %lu txn_id:%lu op_type:%s", log_id, state->txn_id,
1038+
pb::OpType_Name(op_type).c_str());
10551039
error = E_FATAL;
1040+
return;
10561041
}
1057-
};
1058-
Bthread bth(&BTHREAD_ATTR_SMALL);
1059-
bth.run(write_binlog_func);
1060-
}
1042+
write_binlog_param.txn_id = state->txn_id;
1043+
write_binlog_param.log_id = log_id;
1044+
write_binlog_param.primary_region_id = client_conn->primary_region_id;
1045+
write_binlog_param.global_conn_id = client_conn->get_global_conn_id();
1046+
write_binlog_param.username = client_conn->user_info->username;
1047+
write_binlog_param.ip = client_conn->ip;
1048+
write_binlog_param.client_conn = client_conn;
1049+
write_binlog_param.fetcher_store = this;
1050+
binlog_ctx->set_start_ts(timestamp);
1051+
}
1052+
write_binlog_param.op_type = op_type;
1053+
auto ret = binlog_ctx->write_binlog(&write_binlog_param);
1054+
if (ret != E_OK) {
1055+
error = E_FATAL;
1056+
}
1057+
};
1058+
Bthread bth(&BTHREAD_ATTR_SMALL);
1059+
bth.run(write_binlog_func);
10611060
return E_OK;
10621061
}
10631062
return E_OK;

src/expr/fn_manager.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,8 @@ void FunctionManager::register_operators() {
176176
register_object_ret("current_time", current_time, pb::TIME);
177177
register_object_ret("current_timestamp", current_timestamp, pb::TIMESTAMP);
178178
register_object_ret("timestamp", timestamp, pb::TIMESTAMP);
179+
register_object_ret("date", date, pb::DATE);
180+
register_object_ret("hour", hour, pb::UINT32);
179181
register_object_ret("day", day, pb::UINT32);
180182
register_object_ret("dayname", dayname, pb::STRING);
181183
register_object_ret("dayofweek", dayofweek, pb::UINT32);

src/expr/internal_functions.cpp

+27
Original file line numberDiff line numberDiff line change
@@ -1077,6 +1077,33 @@ ExprValue curtime(const std::vector<ExprValue>& input) {
10771077
ExprValue current_time(const std::vector<ExprValue>& input) {
10781078
return curtime(input);
10791079
}
1080+
ExprValue date(const std::vector<ExprValue>& input) {
1081+
if (input.size() == 0 || input[0].is_null()) {
1082+
return ExprValue::Null();
1083+
}
1084+
ExprValue in = input[0];
1085+
if (in.type == pb::INT64) {
1086+
in.cast_to(pb::STRING);
1087+
}
1088+
ExprValue tmp(pb::DATE);
1089+
uint64_t dt = in.cast_to(pb::DATETIME)._u.uint64_val;
1090+
tmp._u.uint32_val = datetime_to_date(dt);
1091+
return tmp;
1092+
}
1093+
ExprValue hour(const std::vector<ExprValue>& input) {
1094+
// Mysql最大值为838,BaikalDB最大值为1023
1095+
if (input.size() == 0 || input[0].is_null()) {
1096+
return ExprValue::Null();
1097+
}
1098+
ExprValue in = input[0];
1099+
time_t t = in.cast_to(pb::TIME)._u.int32_val;
1100+
if (t < 0) {
1101+
t = -t;
1102+
}
1103+
ExprValue tmp(pb::UINT32);
1104+
tmp._u.uint32_val = (t >> 12) & 0x3FF;
1105+
return tmp;
1106+
}
10801107
ExprValue day(const std::vector<ExprValue>& input) {
10811108
if (input.size() == 0 || input[0].is_null()) {
10821109
return ExprValue::Null();

src/session/binlog_context.cpp

+3-2
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,7 @@ int BinlogContext::send_binlog_data(const WriteBinlogParam* param, const SmartPa
531531
req.set_region_id(region_id);
532532
req.set_region_version(info.version());
533533
int retry_times = 0;
534+
bool binlog_prepare_success = false;
534535
do {
535536
brpc::Channel channel;
536537
brpc::Controller cntl;
@@ -603,7 +604,7 @@ int BinlogContext::send_binlog_data(const WriteBinlogParam* param, const SmartPa
603604
return -1;
604605
} else {
605606
// success
606-
param->fetcher_store->binlog_prepare_success = true;
607+
binlog_prepare_success = true;
607608
break;
608609
}
609610
} while (retry_times < 5);
@@ -612,7 +613,7 @@ int BinlogContext::send_binlog_data(const WriteBinlogParam* param, const SmartPa
612613
DB_WARNING("write binlog region_id:%ld log_id:%lu txn_id:%ld cost time:%ld op_type:%s ip:%s",
613614
region_id, param->log_id, param->txn_id, query_cost, pb::OpType_Name(param->op_type).c_str(), info.leader().c_str());
614615
}
615-
if (param->fetcher_store->binlog_prepare_success) {
616+
if (binlog_prepare_success) {
616617
if (param->op_type == pb::OP_PREPARE) {
617618
partition_binlog_ptr->binlog_prewrite_time.reset();
618619
} else if (param->op_type == pb::OP_COMMIT) {

src/store/region.cpp

+3-2
Original file line numberDiff line numberDiff line change
@@ -833,7 +833,7 @@ void Region::exec_in_txn_query(google::protobuf::RpcController* controller,
833833
remote_side, _region_id, txn_id, pb::OpType_Name(op_type).c_str(), last_seq, seq_id, log_id);
834834
// OP_SELECT_FOR_UPDATE强制KV模式保证一致
835835
// 不走幂等,重新执行一遍,这样db那边能拿到数据
836-
if (op_type != pb::OP_SELECT_FOR_UPDATE) {
836+
if (op_type != pb::OP_SELECT_FOR_UPDATE && op_type != pb::OP_SELECT) {
837837
txn->load_last_response(*response);
838838
response->set_affected_rows(txn->dml_num_affected_rows);
839839
response->set_errcode(txn->err_code);
@@ -2827,7 +2827,8 @@ int Region::select(const pb::StoreReq& request,
28272827
}
28282828
txn->push_cmd_to_cache(seq_id, plan_item);
28292829
//DB_WARNING("put txn cmd to cache: region_id: %ld, txn_id: %lu:%d", _region_id, txn_info.txn_id(), seq_id);
2830-
txn->save_last_response(response);
2830+
// OP_SELECT_FOR_UPDATE幂等重新执行一次,不保存response
2831+
//txn->save_last_response(response);
28312832
}
28322833

28332834
//DB_NOTICE("select rows:%d", rows);

src/store/region_binlog.cpp

+9-3
Original file line numberDiff line numberDiff line change
@@ -686,9 +686,15 @@ int Region::binlog_update_map_when_apply(const std::map<std::string, ExprValue>&
686686

687687
auto iter = _binlog_param.ts_binlog_map.find(start_ts);
688688
if (iter == _binlog_param.ts_binlog_map.end()) {
689-
DB_FATAL("region_id: %ld, type: %s, txn_id: %ld, commit_ts: %ld, %s, start_ts: %ld, %s can not find in map, remote_side: %s",
690-
_region_id, binlog_type_name(type), txn_id, ts, ts_to_datetime_str(ts).c_str(), start_ts, ts_to_datetime_str(start_ts).c_str(),
691-
remote_side.c_str());
689+
if (type == COMMIT_BINLOG) {
690+
DB_FATAL("region_id: %ld, type: %s, txn_id: %ld, commit_ts: %ld, %s, start_ts: %ld, %s can not find in map, remote_side: %s",
691+
_region_id, binlog_type_name(type), txn_id, ts, ts_to_datetime_str(ts).c_str(), start_ts, ts_to_datetime_str(start_ts).c_str(),
692+
remote_side.c_str());
693+
} else {
694+
DB_WARNING("region_id: %ld, type: %s, txn_id: %ld, commit_ts: %ld, %s, start_ts: %ld, %s can not find in map, remote_side: %s",
695+
_region_id, binlog_type_name(type), txn_id, ts, ts_to_datetime_str(ts).c_str(), start_ts, ts_to_datetime_str(start_ts).c_str(),
696+
remote_side.c_str());
697+
}
692698
return 0;
693699
} else {
694700
bool repeated_commit = false;

0 commit comments

Comments
 (0)