안녕하세요. Narvis2 입니다.
이번 포스팅에서는 StateFlow
, SharedFlow
, Coroutine Channel
에 대하여 알아보고자 합니다.
아직 Flow
에 대하여 잘 모르시면 Flow
포스팅을 참고하시길 바랍니다. 👉 Flow에 관하여
Flow
는 Cold Stream
입니다. 즉, collector
를 공유할 수 없어 collect
를 할때마다 각기 다른 데이터가 수집됩니다.
StateFlow
, SharedFlow
는 Flow
의 단점을 극복하기 위해 나왔습니다.
StateFlow
, SharedFlow
는 Hot Stream
으로 collector
를 공유 합니다. 자세한건 밑에서 알아보도록 하겠습니다.
🍀 Flow의 단점
Flow
는 스스로Android
의Lifecycle
에 대해 알지 못합니다.- ✅ 즉,
Lifecycle
에 따른 중지나 재개가 어렵습니다.
- ✅ 즉,
Flow
는 상태가 없어 값이 할당된 것인지, 현재 값은 무엇인지 알기 어렵습니다.Flow
는Cold Stream
방식으로, 연속해서 계속 들어오는 데이터를 처리할 수 없으며,Collect
되었을 때만 생성되고 값을 반환 합니다.참고 👉 만약, 하나의
flow builder
에 대해 다수의Collector
가 있다면Collector
하나마다 하나씩 데이터를 호출 하기 때문에UpSteam
이 비싼 비용을 요구하는DB 접근
이나서버 통신
등이라면 여러 번 리소스 요청 을 하게 될 수 있습니다.
🍀 StateFlow
Hot Stream
방식입니다. 즉,collect
가 공유 되어 오직 하나의Flow
만을 실행 하게 합니다.✅ 참고 👉
Flow
와 다르게 하나의StateFlow
를 통해DB에 접근
할 때 여러개의Collect
를 쓰더라도DB
에는 한번만 접근 합니다.- 초기 데이터(기본 값)이 항상 존재 해야 합니다.
- 마지막 값의 개념이 있으며 생성하자 마자 활성화 됩니다.
- 값이 업데이트 된 경우에만 반환하고 동일한 값을 반환하지 않습니다.
Flow
의distinctUtilChanged()
가 항상 포함되어 있다고 생각하시면 됩니다. .value
를 사용하여 현재 값에 접근 할 수 있습니다.SharedFlow
의replay
값이1
로 고정된 경우와 같습니다.참고 👉 새로운
subscriber
가 등록될 때 바로 최신의 값을 가져 옵니다.1️⃣ 예제 )
StateFlow
생성 👇🏾1 2 3 4 5 6 7 8 9 10 11
@HiltViewModel class MainViewModel @Inject constructor() : ViewModel() { // StateFlow 생성 private val _isAdult = MutableStateFlow(false) val isAdult: StateFlow<Boolean> = _isAudult.asStateFlow() // StateFlow에 값 넣기 fun onIsAdultClick() { _isAdult.value = !isAdult.value } }
2️⃣ 예제 )
StateFlow
Collect
👇🏾1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
@AndroidEntryPoint class MainActivity : AppCompatActivity() { private lateinit var binding: ActivityMainBinding private val mainViewModel: MainViewModel by viewModels() override fun onCreate(saveInstanceState: Bundle?) { binding = DataBindingUtil.setContentView(this, R.layout.activity_main) binding.lifecycleOwner = this lifecycleScope.launchWhenStarted { mainViewModel.isAdult.collect { it: Boolean // TODO : true, false 동작 넣기 } } } }
🍀 SharedFlow
Hot Stream
방식으로 여러개의Collector
를 공유하여 오직 하나의Flow
만을 실행하게 합니다.( 아무리 많은Collector
가 존재해도 오직 하나의Flow
만 실행 )firstOrNull
을 통해 현재 값에 접근 가능 합니다.- 초기 값이 필요없으며, 중복된 값이 필요한 경우 사용
✅ 참고 👉 중복 값이 필요한 경우란
Event
를 말하는데 예를들어 API를 호출하였을 때 실패의 경우 연속 적으로 실패할 수 있으므로 이럴때는SharedFlow
를 사용합니다. - 만약,
SharedFlow
한 개를 정의하고Database
결과값을 공유하게 한 뒤, 여러 개의Collector
를 달아 준다면Database
접근은 오직 한 번만 일어납니다. SharedFlow
파라미터 👇replay
:Collect
시 전달받을 이전 데이터의 개수 지정,replay
가0
이면Collect
시점에 담겨있던 데이터부터 전달 받습니다.replay
가1
이면collect
시점 직전의 데이터부터 전달 받으며 시작(최신 데이터)합니다.0
:collect
이후의 데이터를 전달 받습니다. (새로운 구독자에게 이전 이벤트를 전달하지 않습니다.)1
:collect
시점 직전의 데이터부터 전달받으며 시작합니다.( 최신 데이터 )2
: 현재 데이터 이전 두개의 데이터 부터 전달받으면서 시작합니다.
extraBufferCapacity
:buffer
의 개수를 설정합니다.flow
의emit
이 빠르고collect
가 느릴 때 지정된 개수만큼buffer
에 저장되고, 저장된 개수가 넘어가면'onBufferOverFlow'
에 설정된 정책에 따라 동작 합니다.onBufferOverFlow
:Buffer
가 꽉 찼을 때 동작을 정의합니다.SUSPEND
:buffer
가 꽉 찼을 때emit
을 수행하면emit
코드가blocking
됩니다. 즉,buffer
의 빈자리가 생겨야emit
코드 이후의 코드가 수행 될 수 있습니다.DROP_OLDSET
:buffer
가 꽉 찼을 때emit
을 수행하면 오래된 데이터부터 삭제하면서 새로운 데이터를 넣습니다.DROP_LATEST
:buffer
가 꽉 찼을 때emit
을 수행하면 최근 데이터를 삭제하고 새로운 데이터를 넣습니다.
🍀 StateIn / SharedIn
Flow Builder
로 만든Clod Flow
를Hot Flow
로 변경 할 수 있는 확장함수 입니다.- 하나의
Flow
에서 방출된 값을 여러개의Collector
에서 받아야할 경우 에 유용하게 사용됩니다. stateIn()
/sharedIn()
파라미터scope
👉 공유가 시작되는Coroutine Scope
를 설정합니다.started
👉 공유가 시작 및 중지되는 시기를 제어 하는 전략을 설정합니다.Eagerly
👇Collector
가 존재하지 않더라도 바로Sharing
이 시작되며, 중간에 중지되지 않습니다.- 값이
replay
크기 보다 많이 들어오면 바로 삭제됩니다. 즉, 즉시 시작되며Scope
가 취소되면 중지됩니다.
Lazily
👇Collector
가 등록된 이후부터Sharing
이 시작되며 중간에 중지되지 않습니다.- 첫 번째
Collector
는 그 동안emit
된 모든 값들을 얻으며, 이후에Collector
는replay
개수 만큼 값을 얻어갑니다. - ✅ 즉,
Collector
가 모두 없어지더라도Sharing
동작을 유지되며replay
개수만큼Cache
하고Sopce
가 취소되면 중지됩니다.
WhileSubscribed
👇Collector
가 등록되면 바로Sharing
을 시작하며,Collector
가 전부 없어지면 바로Sharing
을 중지 합니다.- 이때
replay
개수만큼Cache
처리 됩니다.stopTimeOutMillis
👇collector
가 모두 사라진 이후에 정지할delay
를 넣습니다. 즉,Collect
가 사라지고 몇 초 후에Sharing
을 중지할지 설정 합니다.0
이면Collector
가 모두 사라지는 순간에 바로 정지합니다.5,000
을 사용하면Configuration(구성요소)
변경과 같은 특정 상황에서 이득을 볼 수 있습니다.
replayExpirationMillis
👇cache
한 값을 유지할 시간을 정합니다.- 시간이 지나면
stateIn
의 초기 값으로 복원 됩니다. - 기본 값 👉
replay cache
를 영구적으로 유지하며buffer
를 재 생성하지 않습니다. 0
을 사용 👉cache
를 즉각적으로 만료 시킬 수 있습니다.
initialValue
👉stataIn
사용 시 초기 값을 설정 합니다.replay
👇sharedIn
사용 시 사용되며(StateFlow
는replace
값이1
로 고정되어 있습니다. ),Collect
시 전달받을 이전 데이터의 개수 지정 지정replay
가0
👉Collect
시점에 담겨있던 데이터부터 전달 받습니다.replay
가1
👉collect
시점 직전의 데이터부터 전달 받으며 시작합니다.( 최신 데이터 )- ❗️주의❗️ 👉
stateIn
,sharedIn
을 함수로 만들면 매번 재 사용되지 않는 새로운instance
를 만들게 됩니다. - 아래는 해당 코드의 샘플 예제 입니다. 👇🏾
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
@HiltViewModel class MainViewModel @Inject constructor( private val getCastInfoUseCase: GetCastInfoUseCase ) : ViewModel() { // 해당 Result Sealed Class 는 보다 이해를 돕기위해 ViewModel에서 생성하였습니다. sealed class Result<T>( val data: T? = null, val message: String? = null ) { class Loading<T>: Result<T>() class Success<T>(data: T) : Result<T>(data) class NetworkError<T>(message: String?, data: T? = null) : Result<T>(data, message) class Error<T>(message: String?, data: T? = null) : Result<T>(data, message) } private val getCastInfo: StateFlow<Result<CastInfoResponseModel>> = getCastInfoUseCase() .stateIn( scope = viewModelScope, started = SharingStarted.WhileSubscribed(5000), initialValue = Result.Loading() ) fun requestCastInfo() = viewModelScope.launch { getCastInfo().collect { result -> when (result) { is Result.Loading -> { showLoadingDialog() } is Result.Success -> { hideLoadingDialog() // TODO : API 요청 성공일 때 동작 넣기 } is Result.NetworkError -> { hideLoadingDialog() // TODO : Network 오류 발생 때 동작 넣기 } is Result.Error -> { hideLoadingDialog() // TODO : API ERROR 발생 때 동작 넣기 } } } } }
- ❗️주의❗️ 👉
🍀 Coroutine Channel
Channel
은 단방향Observing
이라고 생각하시면 쉽습니다.✅ 참고
LiveData
에서는 단방향 옵저빙을 위해SingleLiveEvent
혹은Event Wrapper
를 사용합니다.Coroutine Channel
은Flow
의 단방향 옵저빙 이라고 생각하시면 됩니다.
Channel
은 정확히 한 번만 처리해야하는 이벤트 를 처리하는 데 사용됩니다. 이는 일반적으로 단일 구독자 가있는 이벤트 유형의 설계에서 사용됩니다.✅ 참고
- 예를 들면 Click Event Listener 가 이에 해당합니다.
Click Event Listener
는 항상Observing
할 필요없이 눌렀을 때만Observing
하면 되므로 이럴 때Channel
을 사용합니다.
send
값이 오지않으면Observing
을 하지않고,send
값이 들어오면Observing
시작합니다.Channel
의Buffer Type
Rendezvous
(Unbuffered) : 기본 타입으로 버퍼가 없습니다.Conflated
: 크기가 1인 고정 버퍼가 있는 채널이 생성 됩니다. 만약에 수신하는Coroutine
이 송신하는Coroutine
을 따라잡지 못했다면, 송신하는 쪽은 새로운 값을Buffer
의 마지막 아이템에 덮어씌웁니다. 즉, 최신 값을 받습니다.Buffered
: 이 모드는 고정된 크기의 버퍼를 생성 (Buffer
는Array
형식)합니다.송신
Coroutine
은Buffer
가 꽉 차있으면 새로운 값을 보내는 걸 중단 합니다.수신 Coroutine
은Buffer
가 빌때까지 계속해서 꺼내서 수행합니다.Unlimited
: 제한 없는 크기의 버퍼를 생성 (Buffer
는LinkedList
형식)합니다. 만약에Buffer
가 소비되지 않았다면 메모리가 힘들어할때까지 계속해서 아이템을 착착 채우고 결국엔OutOfMemeoryException
을 일으키게 됩니다.
1️⃣ 다음은
Coroutine Channel
예제 입니다. 👇🏾1 2 3 4 5 6 7 8 9
@HiltViewModel class MainViewModel @Inject constructor() : ViewModel() { private val _actionLogin = Channel<Unit>(Channel.CONFLATED) val actionLogin = _actionLogin.receiveAsFlow() fun onLoginClick() = viewModelScope.launch { _actionLogin.send(Unit) } }
2️⃣ 다음은
수신
을 받아서 사용하는 쪽 예제 입니다.1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
@AndroidEntryPoint class MainActivity : AppCompatActivity() { private lateinit var binding: ActivityMainBinding private val mainViewModel: MainViewModel by viewModels() override fun onCreate(saveInstanceState: Bundle?) { binding = DataBindingUtil.setContentView(this, R.layout.activity_main) binding.lifecycleOwner = this lifecycleScope.launchWhenStarted { mainViewModel.actionLogin.collect { // TODO: 로그인 버튼 클릭 시 동작할 코드 넣기 } } } }
🍀 Flow Observer
Flow
는 스스로Android
의Lifecycle
에 대해 알지 못합니다. (Lifecycle
에 따른 중지나 재개가 어렵습니다. ) 따라서 해당 문제를 해결하기 위해Custom Class
를 만듭니다.해당 코드는
lifecycle
이onStart
가 되면 구독을시작
하고,onStop
이 되면 구독취소
합니다.1️⃣ 다음은
FlowObserver
예제 코드 입니다. 👇1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
class FlowObserverInStop<T>( lifecycleOwner: LifecycleOwner, private val flow: Flow<T>, private val collector: suspend (T) -> Unit ) { private var job: Job? = null init { lifecycleOwner.lifecycle.addObserver(LifecycleEventObserver { source, event -> when (event) { Lifecycle.Event.ON_START -> { job = source.lifecycleScope.launch { flow.collect { collector(it) } } } Lifecycle.Event.ON_STOP -> { job?.cancel() job = null } else -> {} } }) } } inline fun <reified T> Flow<T>.observeOnLifecycleStop( lifecycleOwner: LifecycleOwner, noinline collector: suspend (T) -> Unit ) = FlowObserverInStop(lifecycleOwner, this, collector) // .onEach{ } 사용할때 사용 inline fun <reified T> Flow<T>.observeInLifecycleStop( lifecycleOwner: LifecycleOwner ) = FlowObserverInStop(lifecycleOwner, this) {}
✅ 설명 👇
observeOnLifecycleStop()
의 매개변수에는lifecycle
을 넣습니다.Activity
의 경우this
이고,Fragment
인 경우viewLifecycleOwner
를 넣어주시면 됩니다.
2️⃣
FlowObserver
사용 예제 입니다. 👇🏾1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
@AndroidEntryPoint class MainActivity : AppCompatActivity() { private lateinit var binding: ActivityMainBinding private val mainViewModel: MainViewModel by viewModels() override fun onCreate(saveInstanceState: Bundle?) { binding = DataBindingUtil.setContentView(this, R.layout.activity_main) binding.lifecycleOwner = this // StateFlow Observing mainViewModel.actionLogin.observeOnLifecycleStop(this) { // TODO : true, false 동작 넣기 } // Coroutine Channel Observing mainViewModel.actionLogin.onEach { // TODO: 로그인 버튼 클릭 시 동작할 코드 넣기 }.observeInLifecycleStop(this) } }
🍀 마치며
이번 포스팅에서는 StateFlow
, SharedFlow
, Coroutine Channel
에 대하여 알아보았습니다.
LiveData
를 사용하고 Event Wrapper
를 통해 단방향 Observing
을 처리해도 되지만 문제는 Clean Architecture
에 있습니다.
LiveData
는 UI
에 밀접하게 연관되어 있기 때문에 Clean Architecture
로 Project
를 구성하면 Domain
, Data layer
에서 비동기 방식으로 데이터를 처리하기에 자연스러운 방법이 없습니다.
또한 LiveData
는 안드로이드 플랫폼에 속해 있기 때문에 순수 Java
/ Kotlin
을 사용해야 하는 Domain Layer
에서 사용하기에 적합하지 않습니다.
이럴 경우 LiveData
를 대체하여 StateFlow
나 SharedFlow
또는 Coroutine Channel
을 사용 할 수 있습니다.