Произносится как "Репликс"
Golang библиотека для мульти-мастер репликации целых чисел (int64) с поддержкой TTL
todo
func main() {
r = rplx.New(
rplx.WithRemoteNodesProvider(remoteNodes()),
)
ln, err := net.Listen("tcp4", "127.0.0.1:3001")
if err != nil {
panic(err)
}
if err := r.StartReplicationServer(ln); err != nil {
panic(err)
}
}
func remoteNodes() []*rplx.RemoteNodeOption {
nodes := make([]*rplx.RemoteNodeOption, 0)
nodes = append(nodes, &rplx.DefaultRemoteNodeOption("127.0.0.1:3002"))
return nodes
}
Также смотрите примеры в папке test
данного репозитория
Get(name string) (int64, error)
Возвращает значение переменной или ошибку, если переменная просрочена или не существует
Ошибки:
- ErrVariableNotExists
- ErrVariableExpired
Delete(name string) error
Удаляет переменную
Ошибки:
- ErrVariableNotExists
По факту этот метод устанавливает для переменной TTL в значение "Сейчас минус 1 секунда", отправляет эту информацию на репликацию и удаляет переменную из локального кеша
UpdateTTL(name string, ttl time.Time) error
Обновление TTL для переменной
Ошибки:
- ErrVariableNotExists
Upsert(name string, delta int64)
Обновление значения переменной на указанную дельту. Либо создание переменной, если она не существует
All() (notExpired map[string]int64, expired map[string]int64)
Возвращает две карты (map), где ключ - имя переменной, а значение - значение переменной
Первая карта - активные, не просроченные переменные Вторая карта - просроченные, но еще не удаленные из локального кеша перееменные
docker-compose up -d
docker build -t client -f ./test/client/Dockerfile .
docker run --rm --net host client
docker-compose down -v
Причины появления
Кейс в одной из рабочих систем:
Работают десятки/сотни воркеров (stateless экземпляров приложений)
Требуется иметь единое хранилище числовых переменных, к которому обращаются воркеры при каждом запросе и получают значения переменных, либо меняют (создают/удаляют) их.
Значения этих числовых переменных обновляется с очень большой частотой. От единиц до 100к обновлений в секунду. При этом, само количество переменных меняется не сильно на протяжении работы.
При использовании сторонних инструментов со стандартной репликацией, с записью wal лога, проявлялись проблемы синхронизации огромного количества мелких обновлений.
В то же время, исходя из нашей бизнес-логики, было допустимо знать только итоговое значение переменной. То есть, нет необходимости знать все этапы обновления. Плюс, мы были готовы на некоторые задержки (несколько секунд) при получении актуального значения. Но, конечно, чем задержки меньше, тем лучше.
Не хранится история изменения переменной. Только ее текущее значение.
Каждый экземпляр rplx
кластера имеет свое "место" хранения значения в каждой переменной.
Подробнее:
Каждая переменная внутри rplx
описывается следующей структурой:
type variable struct {
name string
self *variableItem
ttl int64
ttlVersion int64
remoteItemsMx sync.RWMutex
remoteItems map[string]*variableItem
}
type variableItem struct {
val int64
ver int64
}
Каждый инстанс rplx
хранит внутри перменной свое значение отдельно от других.
Например, у нас есть три экземпляра приложения с rplx
, которые сообщаются между собой многие-ко-многим (full mesh).
В экземпляре приложения #1 (далее - service1
) мы изменили значение переменной var1
на +100
.
(плюс пишем для того, чтобы обратить внимание, что ниже будет минус).
В service2
эту же переменную изменили на +170
, а в service3
изменили на -90
.
После обмена данными между экземплярами приложения, переменная в service1
будет выглядеть примерно так:
{
name: "var1",
... // пропускаем поля, относящиеся к TTL
self: {
val: 100,
ver: 1,
},
remoteItems: {
"service2": {
val: 170,
ver: 1,
},
"service3": {
val: -90,
ver: 1,
},
},
}
Итоговое значение переменной рассчитываеися путем сложения всех значений из remoteItems, плюc значение из self. 100 + 170 - 90 = 180
В поле self хранится значение текущего экземпляра. Например, после примера выше, нужная нам переменная на service3
будет выглядеть примерно так:
{
name: "var1",
... // пропускаем поля, относящиеся к TTL
self: {
val: -90,
ver: 1,
},
remoteItems: {
"service1": {
val: 100,
ver: 1,
},
"service2": {
val: 170,
ver: 1,
},
},
}
Такой способ хранения позволяет нам безопасно обновлять переменные, не ожидая конфликтов. Разумеется, должно быть гарантировано, что экземпляры имеют уникальные имена.
В примерах выше можно увидеть поле ver
(version
) в структуре variableItem
Это счетчик, который увеличивается на 1 при каждом изменении значения переменной в соответствующем экземпляре.
Учет версий позволяет не отправлять на другие ноды кластера данные, которые уже были отправлены. За это ответственно поле replicatedVersions
в структуре node
type node struct {
...
replicatedVersions map[string]int64
...
}
Ключом карты является строка, строящаяся по следующему принципу: <var_name>@<node_id>
, а значением - является как раз версия переменной.
При отправке данных синхронизации на другую ноду кластера происходит следующее:
- наша нода имеет id
service1
- мы отправляем данные о переменной
var1
на нодуservice2
- переменная
var1
на нашей нодеservice1
выглядит примерно так:
{
name: "var1",
... // пропускаем поля, относящиеся к TTL
self: { // значения нашей локальной ноды service1
val: -90,
ver: 10,
},
remoteItems: {
"service20": {
val: 100,
ver: 20,
},
"service30": {
val: 170,
ver: 30,
},
},
}
- смотрим в replicatedVersions значение "var1@service1" - то есть, "какую версию значения нашей переменной мы отправляли последний раз"?
- если это значение равно текущему (10 - в поле self), то это значение мы не включаем в отправку.
- то же самое делаем для всех элементов из
remoteItems
Таким образом, на другие ноды кластера уходят только те данные, которые были обновлены и еще не отправлялись на конкретную ноду