안녕하세요. Narvis2 입니다.
이번 포스팅에서는 StateFlow, SharedFlow, Coroutine Channel 에 대하여 알아보고자 합니다.
아직 Flow에 대하여 잘 모르시면 Flow 포스팅을 참고하시길 바랍니다. 👉 Flow에 관하여
Flow는 Cold Stream입니다. 즉, collector 를 공유할 수 없어 collect를 할때마다 각기 다른 데이터가 수집됩니다.
StateFlow, SharedFlow는 Flow의 단점을 극복하기 위해 나왔습니다.
StateFlow, SharedFlow는 Hot Stream으로 collector를 공유 합니다. 자세한건 밑에서 알아보도록 하겠습니다.
🍀 Flow의 단점
Flow는 스스로Android의Lifecycle에 대해 알지 못합니다.- ✅ 즉,
Lifecycle에 따른 중지나 재개가 어렵습니다.
- ✅ 즉,
Flow는 상태가 없어 값이 할당된 것인지, 현재 값은 무엇인지 알기 어렵습니다.Flow는Cold Stream방식으로, 연속해서 계속 들어오는 데이터를 처리할 수 없으며,Collect되었을 때만 생성되고 값을 반환 합니다.참고 👉 만약, 하나의
flow builder에 대해 다수의Collector가 있다면Collector하나마다 하나씩 데이터를 호출 하기 때문에UpSteam이 비싼 비용을 요구하는DB 접근이나서버 통신등이라면 여러 번 리소스 요청 을 하게 될 수 있습니다.
🍀 StateFlow
Hot Stream방식입니다. 즉,collect가 공유 되어 오직 하나의Flow만을 실행 하게 합니다.✅ 참고 👉
Flow와 다르게 하나의StateFlow를 통해DB에 접근할 때 여러개의Collect를 쓰더라도DB에는 한번만 접근 합니다.- 초기 데이터(기본 값)이 항상 존재 해야 합니다.
- 마지막 값의 개념이 있으며 생성하자 마자 활성화 됩니다.
- 값이 업데이트 된 경우에만 반환하고 동일한 값을 반환하지 않습니다.
Flow의distinctUtilChanged()가 항상 포함되어 있다고 생각하시면 됩니다. .value를 사용하여 현재 값에 접근 할 수 있습니다.SharedFlow의replay값이1로 고정된 경우와 같습니다.참고 👉 새로운
subscriber가 등록될 때 바로 최신의 값을 가져 옵니다.1️⃣ 예제 )
StateFlow생성 👇🏾1 2 3 4 5 6 7 8 9 10 11
@HiltViewModel class MainViewModel @Inject constructor() : ViewModel() { // StateFlow 생성 private val _isAdult = MutableStateFlow(false) val isAdult: StateFlow<Boolean> = _isAudult.asStateFlow() // StateFlow에 값 넣기 fun onIsAdultClick() { _isAdult.value = !isAdult.value } }
2️⃣ 예제 )
StateFlowCollect👇🏾1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
@AndroidEntryPoint class MainActivity : AppCompatActivity() { private lateinit var binding: ActivityMainBinding private val mainViewModel: MainViewModel by viewModels() override fun onCreate(saveInstanceState: Bundle?) { binding = DataBindingUtil.setContentView(this, R.layout.activity_main) binding.lifecycleOwner = this lifecycleScope.launchWhenStarted { mainViewModel.isAdult.collect { it: Boolean // TODO : true, false 동작 넣기 } } } }
🍀 SharedFlow
Hot Stream방식으로 여러개의Collector를 공유하여 오직 하나의Flow만을 실행하게 합니다.( 아무리 많은Collector가 존재해도 오직 하나의Flow만 실행 )firstOrNull을 통해 현재 값에 접근 가능 합니다.- 초기 값이 필요없으며, 중복된 값이 필요한 경우 사용
✅ 참고 👉 중복 값이 필요한 경우란
Event를 말하는데 예를들어 API를 호출하였을 때 실패의 경우 연속 적으로 실패할 수 있으므로 이럴때는SharedFlow를 사용합니다. - 만약,
SharedFlow한 개를 정의하고Database결과값을 공유하게 한 뒤, 여러 개의Collector를 달아 준다면Database접근은 오직 한 번만 일어납니다. SharedFlow파라미터 👇replay:Collect시 전달받을 이전 데이터의 개수 지정,replay가0이면Collect시점에 담겨있던 데이터부터 전달 받습니다.replay가1이면collect시점 직전의 데이터부터 전달 받으며 시작(최신 데이터)합니다.0:collect이후의 데이터를 전달 받습니다. (새로운 구독자에게 이전 이벤트를 전달하지 않습니다.)1:collect시점 직전의 데이터부터 전달받으며 시작합니다.( 최신 데이터 )2: 현재 데이터 이전 두개의 데이터 부터 전달받으면서 시작합니다.
extraBufferCapacity:buffer의 개수를 설정합니다.flow의emit이 빠르고collect가 느릴 때 지정된 개수만큼buffer에 저장되고, 저장된 개수가 넘어가면'onBufferOverFlow'에 설정된 정책에 따라 동작 합니다.onBufferOverFlow:Buffer가 꽉 찼을 때 동작을 정의합니다.SUSPEND:buffer가 꽉 찼을 때emit을 수행하면emit코드가blocking됩니다. 즉,buffer의 빈자리가 생겨야emit코드 이후의 코드가 수행 될 수 있습니다.DROP_OLDSET:buffer가 꽉 찼을 때emit을 수행하면 오래된 데이터부터 삭제하면서 새로운 데이터를 넣습니다.DROP_LATEST:buffer가 꽉 찼을 때emit을 수행하면 최근 데이터를 삭제하고 새로운 데이터를 넣습니다.
🍀 StateIn / SharedIn
Flow Builder로 만든Clod Flow를Hot Flow로 변경 할 수 있는 확장함수 입니다.- 하나의
Flow에서 방출된 값을 여러개의Collector에서 받아야할 경우 에 유용하게 사용됩니다. stateIn()/sharedIn()파라미터scope👉 공유가 시작되는Coroutine Scope를 설정합니다.started👉 공유가 시작 및 중지되는 시기를 제어 하는 전략을 설정합니다.Eagerly👇Collector가 존재하지 않더라도 바로Sharing이 시작되며, 중간에 중지되지 않습니다.- 값이
replay크기 보다 많이 들어오면 바로 삭제됩니다. 즉, 즉시 시작되며Scope가 취소되면 중지됩니다.
Lazily👇Collector가 등록된 이후부터Sharing이 시작되며 중간에 중지되지 않습니다.- 첫 번째
Collector는 그 동안emit된 모든 값들을 얻으며, 이후에Collector는replay개수 만큼 값을 얻어갑니다. - ✅ 즉,
Collector가 모두 없어지더라도Sharing동작을 유지되며replay개수만큼Cache하고Sopce가 취소되면 중지됩니다.
WhileSubscribed👇Collector가 등록되면 바로Sharing을 시작하며,Collector가 전부 없어지면 바로Sharing을 중지 합니다.- 이때
replay개수만큼Cache처리 됩니다.stopTimeOutMillis👇collector가 모두 사라진 이후에 정지할delay를 넣습니다. 즉,Collect가 사라지고 몇 초 후에Sharing을 중지할지 설정 합니다.0이면Collector가 모두 사라지는 순간에 바로 정지합니다.5,000을 사용하면Configuration(구성요소)변경과 같은 특정 상황에서 이득을 볼 수 있습니다.
replayExpirationMillis👇cache한 값을 유지할 시간을 정합니다.- 시간이 지나면
stateIn의 초기 값으로 복원 됩니다. - 기본 값 👉
replay cache를 영구적으로 유지하며buffer를 재 생성하지 않습니다. 0을 사용 👉cache를 즉각적으로 만료 시킬 수 있습니다.
initialValue👉stataIn사용 시 초기 값을 설정 합니다.replay👇sharedIn사용 시 사용되며(StateFlow는replace값이1로 고정되어 있습니다. ),Collect시 전달받을 이전 데이터의 개수 지정 지정replay가0👉Collect시점에 담겨있던 데이터부터 전달 받습니다.replay가1👉collect시점 직전의 데이터부터 전달 받으며 시작합니다.( 최신 데이터 )- ❗️주의❗️ 👉
stateIn,sharedIn을 함수로 만들면 매번 재 사용되지 않는 새로운instance를 만들게 됩니다. - 아래는 해당 코드의 샘플 예제 입니다. 👇🏾
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
@HiltViewModel class MainViewModel @Inject constructor( private val getCastInfoUseCase: GetCastInfoUseCase ) : ViewModel() { // 해당 Result Sealed Class 는 보다 이해를 돕기위해 ViewModel에서 생성하였습니다. sealed class Result<T>( val data: T? = null, val message: String? = null ) { class Loading<T>: Result<T>() class Success<T>(data: T) : Result<T>(data) class NetworkError<T>(message: String?, data: T? = null) : Result<T>(data, message) class Error<T>(message: String?, data: T? = null) : Result<T>(data, message) } private val getCastInfo: StateFlow<Result<CastInfoResponseModel>> = getCastInfoUseCase() .stateIn( scope = viewModelScope, started = SharingStarted.WhileSubscribed(5000), initialValue = Result.Loading() ) fun requestCastInfo() = viewModelScope.launch { getCastInfo().collect { result -> when (result) { is Result.Loading -> { showLoadingDialog() } is Result.Success -> { hideLoadingDialog() // TODO : API 요청 성공일 때 동작 넣기 } is Result.NetworkError -> { hideLoadingDialog() // TODO : Network 오류 발생 때 동작 넣기 } is Result.Error -> { hideLoadingDialog() // TODO : API ERROR 발생 때 동작 넣기 } } } } }
- ❗️주의❗️ 👉
🍀 Coroutine Channel
Channel은 단방향Observing이라고 생각하시면 쉽습니다.✅ 참고
LiveData에서는 단방향 옵저빙을 위해SingleLiveEvent혹은Event Wrapper를 사용합니다.Coroutine Channel은Flow의 단방향 옵저빙 이라고 생각하시면 됩니다.
Channel은 정확히 한 번만 처리해야하는 이벤트 를 처리하는 데 사용됩니다. 이는 일반적으로 단일 구독자 가있는 이벤트 유형의 설계에서 사용됩니다.✅ 참고
- 예를 들면 Click Event Listener 가 이에 해당합니다.
Click Event Listener는 항상Observing할 필요없이 눌렀을 때만Observing하면 되므로 이럴 때Channel을 사용합니다.
send값이 오지않으면Observing을 하지않고,send값이 들어오면Observing시작합니다.Channel의Buffer TypeRendezvous(Unbuffered) : 기본 타입으로 버퍼가 없습니다.Conflated: 크기가 1인 고정 버퍼가 있는 채널이 생성 됩니다. 만약에 수신하는Coroutine이 송신하는Coroutine을 따라잡지 못했다면, 송신하는 쪽은 새로운 값을Buffer의 마지막 아이템에 덮어씌웁니다. 즉, 최신 값을 받습니다.Buffered: 이 모드는 고정된 크기의 버퍼를 생성 (Buffer는Array형식)합니다.송신Coroutine은Buffer가 꽉 차있으면 새로운 값을 보내는 걸 중단 합니다.수신 Coroutine은Buffer가 빌때까지 계속해서 꺼내서 수행합니다.Unlimited: 제한 없는 크기의 버퍼를 생성 (Buffer는LinkedList형식)합니다. 만약에Buffer가 소비되지 않았다면 메모리가 힘들어할때까지 계속해서 아이템을 착착 채우고 결국엔OutOfMemeoryException을 일으키게 됩니다.
1️⃣ 다음은
Coroutine Channel예제 입니다. 👇🏾1 2 3 4 5 6 7 8 9
@HiltViewModel class MainViewModel @Inject constructor() : ViewModel() { private val _actionLogin = Channel<Unit>(Channel.CONFLATED) val actionLogin = _actionLogin.receiveAsFlow() fun onLoginClick() = viewModelScope.launch { _actionLogin.send(Unit) } }
2️⃣ 다음은
수신을 받아서 사용하는 쪽 예제 입니다.1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
@AndroidEntryPoint class MainActivity : AppCompatActivity() { private lateinit var binding: ActivityMainBinding private val mainViewModel: MainViewModel by viewModels() override fun onCreate(saveInstanceState: Bundle?) { binding = DataBindingUtil.setContentView(this, R.layout.activity_main) binding.lifecycleOwner = this lifecycleScope.launchWhenStarted { mainViewModel.actionLogin.collect { // TODO: 로그인 버튼 클릭 시 동작할 코드 넣기 } } } }
🍀 Flow Observer
Flow는 스스로Android의Lifecycle에 대해 알지 못합니다. (Lifecycle에 따른 중지나 재개가 어렵습니다. ) 따라서 해당 문제를 해결하기 위해Custom Class를 만듭니다.해당 코드는
lifecycle이onStart가 되면 구독을시작하고,onStop이 되면 구독취소합니다.1️⃣ 다음은
FlowObserver예제 코드 입니다. 👇1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
class FlowObserverInStop<T>( lifecycleOwner: LifecycleOwner, private val flow: Flow<T>, private val collector: suspend (T) -> Unit ) { private var job: Job? = null init { lifecycleOwner.lifecycle.addObserver(LifecycleEventObserver { source, event -> when (event) { Lifecycle.Event.ON_START -> { job = source.lifecycleScope.launch { flow.collect { collector(it) } } } Lifecycle.Event.ON_STOP -> { job?.cancel() job = null } else -> {} } }) } } inline fun <reified T> Flow<T>.observeOnLifecycleStop( lifecycleOwner: LifecycleOwner, noinline collector: suspend (T) -> Unit ) = FlowObserverInStop(lifecycleOwner, this, collector) // .onEach{ } 사용할때 사용 inline fun <reified T> Flow<T>.observeInLifecycleStop( lifecycleOwner: LifecycleOwner ) = FlowObserverInStop(lifecycleOwner, this) {}
✅ 설명 👇
observeOnLifecycleStop()의 매개변수에는lifecycle을 넣습니다.Activity의 경우this이고,Fragment인 경우viewLifecycleOwner를 넣어주시면 됩니다.
2️⃣
FlowObserver사용 예제 입니다. 👇🏾1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
@AndroidEntryPoint class MainActivity : AppCompatActivity() { private lateinit var binding: ActivityMainBinding private val mainViewModel: MainViewModel by viewModels() override fun onCreate(saveInstanceState: Bundle?) { binding = DataBindingUtil.setContentView(this, R.layout.activity_main) binding.lifecycleOwner = this // StateFlow Observing mainViewModel.actionLogin.observeOnLifecycleStop(this) { // TODO : true, false 동작 넣기 } // Coroutine Channel Observing mainViewModel.actionLogin.onEach { // TODO: 로그인 버튼 클릭 시 동작할 코드 넣기 }.observeInLifecycleStop(this) } }
🍀 마치며
이번 포스팅에서는 StateFlow, SharedFlow, Coroutine Channel 에 대하여 알아보았습니다.
LiveData를 사용하고 Event Wrapper를 통해 단방향 Observing을 처리해도 되지만 문제는 Clean Architecture에 있습니다.
LiveData는 UI에 밀접하게 연관되어 있기 때문에 Clean Architecture로 Project를 구성하면 Domain, Data layer에서 비동기 방식으로 데이터를 처리하기에 자연스러운 방법이 없습니다.
또한 LiveData 는 안드로이드 플랫폼에 속해 있기 때문에 순수 Java / Kotlin 을 사용해야 하는 Domain Layer 에서 사용하기에 적합하지 않습니다.
이럴 경우 LiveData를 대체하여 StateFlow 나 SharedFlow 또는 Coroutine Channel을 사용 할 수 있습니다.