Библиотека для реализации асинхронной согласованности
Данная библиотека предназначения для реализации паттерна асинхронной согласованности для микросервисной архитектуры
Принцип достаточно прост:
1. Происходит какое либо изменение
2. Генерируется событие изменения для п.1
3. Событие сохраняется на сервисе источнике
4. Событие на сервисе источнике переправляется в шину событий (Например Apache Kafka)
5. Событие принимается на сервисе-приемнике
6. Событие обрабатывается
7. Для данного события устанавливается изменение метки принятых событий
1. Потенциальные проблеммы
1. События должны гарантированно доставляться, иначе будут дыры в потоке и сам паттерн потеряет смысл.
2. События должны доставляться быстро. Если это не так, то зависимые сервисы будут сильно отставать.
3. Постоянный рассинхрон сервисов. По сути из-за того, что события доставляются асинхронно, целевые реплики будут всегда отставать. Сколько будет данное отставание - заранее не известно.
4. События должны приниматься быстро. Если это не так, то сервис будет сильно отставать от потока в шине.
5. Шина должна быть отказоустойчивой. В противном случае потеряются события и цепочка разорвется.
6. События должны доставляться даже после отвала различных частей концепта
2. Решения
1. Отказы/остановки. Для них надо использовать правильную остановку. При этом необходимо учитывать, что события либо должны обработаться полностью, либо не обработаться вовсе.
2. Использовать отказоустойчивую шину.
3. Параллелить обработку событий по возможности, либо выносить их в отдельные шины.
3. Использование библиотеки
Импортируем ее:
import "bitbucket.org/asynchronous-consistency/edispatcher/v2"
Создаем диспетчер, передавая все необходимые компоненты:
dispatcher := edispatcher.NewEventsDispatcher(
&channel{},
&store{},
&receiveStore{},
&txManager{},
&busManager{},
50,
)
Последний параметр тут это размер буффера очереди событий. По сути он необходимо для того, чтоб наполнять его
заранее, когда есть такая возможность. Это обеспечит ускорение обработки событий, т.к. не придется ждать после
каждой обработки вызова шины для получения следующего события.
Создаем серверы для работы диспетчера или реализуем свои:
service := multiservice.MultiService(
map[string]multiservice.ServiceInterface{
"Dispatch-Service": edispatcher.NewDispatchService(dispatcher, isLeaderCallback),
"Receive-Service": edispatcher.NewReceiveService(dispatcher, isLeaderCallback, []edispatcher.EventReceiverInterface{
&receiver{},
}),
},
39000,
)
3.1. Настраиваемые таймауты
Для каждой имплементации составных частей сервиса добавлена дополнительная фабрика с суффиксом WithTimeout
, которая
позволяет задавать таймаут ожидания событий.
Логика работы:
- Для отправки - оиждание поступления событий. Каждый раз, когда диспетчер отправляет событие, он обращается к стору, который в свою очередь дергает БД. Для минимизации пустых дерганий добавлен таймаут
- Для приема - если событий нет, то нет необходимости каждый раз проверять, пришли они или нет, лучше подождать немного. Поэтому добавлен таймаут.
Пример:
dispatcher := edispatcher.NewEventsDispatcherWithTimeout(
&channel{bus: make(chan string, 500)},
&store{},
&receiveStore{},
&txManager{},
&busManager{},
50,
500,
)
Таймаут по умолчанию составляет 50 ms!