Coroutine Flow
Flow
Coroutine의 suspend 함수는 비동기적으로 하나 값을 반환합니다. 그리고 지속적으로 혹은 한 번에 여러개의 값들을 반환하는 경우를 대응하기 위해서 Coroutine의 비동기 스트림 API flow를 사용합니다.

Coroutine Flow를 이용해서 연속적인 데이터 스트림을 구현하기 위해서는 Producer(생산자), Intermediary(중간 연산자), Consumer(소비자) 3가지가 필요합니다.
Producer
class WeatherDataSource( | |
private val weatherApi: WeatherApi | |
) { | |
fun getWeatherInfoFlow(): Flow<List<WeatherInfo>> = flow { | |
while(true) { | |
val weatherInfo = weatherApi.fetchLastedWeatherInfo() | |
emit(weatherInfo) | |
delay(60000) | |
} | |
} | |
} |
1. flow { } 블록 안에서 비동기로 동작
2. 날씨 정보를 서버로부터 받아온다.
3. Producer가 데이터를 방출한다.(emit)
4. 2~3번의 과정을 60초마다 반복한다.
Intermediary
class WeatherRepository( | |
private val weatherDataSource: WeatherDataSource, | |
) { | |
fun getWeatherItem(): Flow<List<WeatherItem>> { | |
return weatherDataSource.getWeatherInfoFlow().map { | |
it.map { weatherInfo -> | |
WeatherItem( | |
id = weatherInfo.id, | |
date = weatherInfo.date, | |
speed = weatherInfo.speed, | |
clouds = weatherInfo.clouds, | |
temp = weatherInfo.temp | |
) | |
} | |
} | |
} | |
} |
1. Produce에서 생성한 값에서 필요한 형태로 변환한다.
2. 변환 연산자는 map(데이터 변형), filter(데이터 필터링), onEach(모든 데이터마다 연산 수행) 등이 있다.
Consumer
@HiltViewModel | |
class WeatherViewModel @Inject constructor( | |
private val weatherRepository: WeatherRepository | |
): ViewModel() { | |
private val _weatherItems = MutableSharedFlow<List<WeatherItem>>() | |
val weatherItems = _weatherItems.asSharedFlow() | |
fun getWeatherItem() { | |
viewModelScope.launch { | |
weatherRepository.getWeatherItem().collect { | |
_weatherItems.emit(it) | |
} | |
} | |
} | |
} |
1. 소비자는 flow의 collect() 메소드를 이용하여 중간 연산자에서 변형된 데이터를 전달받을 수 있다.
2. 전달 받은 데이터를 Live Data, StateFlow 등을 이용하여 데이터의 상태를 업데이트 한다.
UI
@AndroidEntryPoint | |
class MainActivity : AppCompatActivity() { | |
private val viewModel by viewModels<WeatherViewModel>() | |
override fun onCreate(savedInstanceState: Bundle?) { | |
super.onCreate(savedInstanceState) | |
setContentView(R.layout.activity_main) | |
startObserver() | |
viewModel.getWeatherItem() | |
} | |
private fun startObserver() { | |
lifecycleScope.launch { | |
repeatOnLifecycle(Lifecycle.State.STARTED) { | |
viewModel.weatherItems.collect { | |
// Update UI | |
} | |
} | |
} | |
} | |
} |
1. 액티비티에서는 StateFlow 혹은 SharedFlow 등으로 업데이트 된 데이터를 전달받아서 UI를 갱신할 수 있다.
Flow의 Cancel
1. withTimeoutOrNull
withTimeoutOrNull을 이용하여 stream을 소비하는 측에서 timeout을 설정할 수 있습니다.
아래 코드에서는 11초 후 부터 collect { } 블럭 내부에 값이 들어오지 않습니다.
@HiltViewModel | |
class WeatherViewModel @Inject constructor( | |
private val weatherRepository: WeatherRepository | |
): ViewModel() { | |
private val _weatherItems = MutableSharedFlow<List<WeatherItem>>() | |
val weatherItems = _weatherItems.asSharedFlow() | |
fun getWeatherItem() { | |
viewModelScope.launch { | |
withTimeout(11000) { | |
weatherRepository.getWeatherItem().collect { | |
_weatherItems.emit(it) | |
} | |
} | |
} | |
} | |
} |
2. cancel()
cancle() 메소드를 이용하여 collect { } 블럭 내부에서 값을 수신하지 않도록 할 수 있습니다.
@HiltViewModel | |
class WeatherViewModel @Inject constructor( | |
private val weatherRepository: WeatherRepository | |
): ViewModel() { | |
private val _weatherItems = MutableSharedFlow<List<WeatherItem>>() | |
val weatherItems = _weatherItems.asSharedFlow() | |
fun getWeatherItem() { | |
viewModelScope.launch { | |
withTimeout(11000) { | |
weatherRepository.getWeatherItem().collect { | |
if (it.isEmpty()) { | |
cancel() | |
} | |
_weatherItems.emit(it) | |
} | |
} | |
} | |
} | |
} |
주의
이는 flow { } 빌더에서만 동작하고,
아래와 같이 asFlow같은 확장함수를 사용하는 경우는 동작하지 않는다고 합니다.
viewModelScope.launch { | |
(0..10).asFlow().collect { | |
if (it == 5) { | |
cancel() | |
} | |
println(it) | |
} | |
} |
Flow 빌더
1. flow
flow { }를 통해서 새로운 flow를 생성할 수 있다.
2. flowOf
flowOf 빌더는 정해진 값을 반복문을 돌려서 방출해준다.
viewModelScope.launch { | |
flowOf(0, 1, 2, 3, 5).collect { | |
println(it) | |
} | |
} |
3. asFlow
asFlow 코틀린 Collection이나 Sequence를 Flow로 변환하는 함수이다.
viewModelScope.launch { | |
(0..10).asFlow().collect { | |
println(it) | |
} | |
} |
연산자
1. map
stream에서 흘러들어온 데이터를 변형할 때 사용하는 연산자
fun getWeatherItem(): Flow<List<WeatherItem>> { | |
return weatherDataSource.getWeatherInfoFlow().map { | |
it.map { weatherInfo -> | |
WeatherItem( | |
id = weatherInfo.id, | |
date = weatherInfo.date, | |
speed = weatherInfo.speed, | |
clouds = weatherInfo.clouds, | |
temp = weatherInfo.temp | |
) | |
} | |
} | |
} |
2. filter
filter는 filter { } 블럭 내부의 조건에 맞는 값만 내보내는 필터 함수입니다.
이와 반대로 조건에 맞지 않는 값만 내보내는 filterNot { } 함수가 있습니다.
fun getWeatherItem() { | |
(0..10) | |
.asFlow() | |
.filter { it % 2 == 0 } | |
.collect { | |
// 짝수만 출력 됨 | |
} | |
} |
3. onEach
onEach는 업스트림의 아이템들이 다운스트림으로 emit 되기전에 주어진 action을 처리하는 함수입니다.
fun getWeatherItem() { | |
(0..10) | |
.asFlow() | |
.onEach { | |
delay(1000) | |
} | |
.collect { | |
// 1, 2, 3 이 1초간격으로 방출됨 | |
} | |
} |
4. transform
transform은 업스트림에서 흘러들어온 아이템을 변형시켜서 방출하거나 아에 새로운 값을 방출할 수 있습니다. 또한 여러 개의 값을 방출할 수도 있습니다.
fun getWeatherItem() { | |
viewModelScope.launch { | |
(0..3) | |
.asFlow() | |
.transform { | |
emit(listOf((it + 1).toString(), (it + 2).toString())) | |
emit(it + 1) | |
} | |
.collect { result -> | |
Log.e("TEST", "result : $result") | |
} | |
} | |
} | |
//결과 - result : [1, 2] | |
// result : 1 | |
//결과 - result : [2, 3] | |
// result : 2 | |
//결과 - result : [3, 4] | |
// result : 3 | |
//결과 - result : [4, 5] | |
// result : 4 |
5. take
take는 몇 개의 값을 방출할 지 설정하는 함수 입니다.
fun getWeatherItem() { | |
(0..3) | |
.asFlow() | |
.take(2) | |
.collect { | |
// 0 | |
// 1 | |
} | |
} |
6. drop
drop은 처음 몇개의 값을 방출하지 않을지 설정하는 함수입니다.
fun getWeatherItem() { | |
(0..3) | |
.asFlow() | |
.drop(2) | |
.collect { | |
// 2, 3 방출됨 | |
} | |
} |