Пакет asyncutils
Функции
SliceAction
Используется для асинхронной обработки слайса данных.
Функция использует паттерн асинхронной обработка данных fan-out/fan-in.
В данной реализации, что все возникшие ошибки будут логироваться в функции action.
Входные параметры:
- goroutinesCount: кол-во горутин в которых будет производиться обработка.
- channelBufferSize: размер буфера для канала
- input: канал с входными данными
- action: функция которая производит обработку элемента из слайса данных
Выходные параметры:
- output: канал с выходными данными
Примеры использования:
func EntityList() []Entity {
entities := GetEntitiesFromDB()
input := make(chan Entity, chanBufferSize)
go func() {
defer close(input)
for idx := range entities {
input <- entities[idx]
}
}()
output := SliceAction(goroutinesCount, chanBufferSize, input, func(entity Entity) Entity {
title, err := GetTitleFromOtherSource(entity.ID)
if err != nil {
logrus.Errorf("failed to get title from other source: %v", err)
} else {
entity.Title = title
}
return entity
})
entitiesResponse := make([]Entity, 0, len(entities))
for entity := range output {
entitiesResponse = append(entitiesResponse, entity)
}
return entitiesResponse
}
Также есть пример, когда мы передаем канал с id, а на выходе получаем канал сущностей.
func EntityList() []Entity {
entityIDs := GetEntitiesFromDB()
input := make(chan uint64, chanBufferSize)
go func() {
defer close(input)
for idx := range entityIDs {
input <- entityIDs[idx]
}
}()
output := SliceAction(goroutinesCount, chanBufferSize, input, func(entityID uint64) Entity {
entity, err := GetEntityFromOtherSource(entityID)
if err != nil {
logrus.Errorf("failed to get entity from other source: %v", err)
}
return entity
})
entitiesResponse := make([]Entity, 0, len(entityIDs))
for entity := range output {
entitiesResponse = append(entitiesResponse, entity)
}
return entitiesResponse
}