ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Lessons learnt using Coroutines Flow in the Android Dev Summit 2019 app
    프로그래밍/Kotlin 2020. 1. 29. 08:56
    반응형

    [Lessons learnt using Coroutines Flow in the Android Dev Summit 2019 app]

    https://medium.com/androiddevelopers/lessons-learnt-using-coroutines-flow-4a6b285c0d06 한글 번역본입니다. 

     

     

     

    Android Dev Summit 2019 App에 사용된 Flow(https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/)에 대한 내용이다. 이 앱에서 data stream을 어떻게 handle 하는지 알아보자. 

     

     

     

     

    이 앱은 architecture는 recommended app architecture guide(https://developer.android.com/jetpack/docs/guide#recommended-app-arch) 를 따른다. 

     

    Repository와 ViewMdel 사이에 추가적인 domain layer가 있는데 UseCase로 class의 역할을 분담해 class를 작게 만들고 reusable하게 하고 test code 작성을 쉽고록 도와준다.(ex. Repository에서 가져온 데이터를 merge 등) 

     

    많은 Android 앱들은 필요한 데이터를 network나 cache로 lazy하게 load한다. 그래서 몇 가지 특징을 찾을 수 있는데 View와 ViewModel 사이에 LiveData로 communication 하는 것과 UseCase와 그 밑 layer들은 Coroutine과 Flow을 사용한다는 것을 볼 수 있다. 

     

    1. Prefer exposing streams as Flows (not Channels)

    Coroutine으로 data stream을 다루는 두가지 방법이 있다. Flow API와 Channel API이다. Channel은 Coroutine 사이에 single value를 전달하기 위해 사용되고 Flow는 data stream을 모델링하기 위해 구축되었다. Flow는 data stream을 subscribe하기 위한 Facoty이고 Channel은 Flow의 흐름을 support하기 위해 존재한다. 

     

    “Flow는 유연하고 명시적인 contract와 operator들을 제공하므로 Channel보다 Flow를 사용해라” 

     

    Flow는 data stream의 실행을 트리거하고 producer 측에서 제공하는 모든 흐름에 따라 성공 혹은 예외적으로 완료되는 특성으로 data stream을 자동적으로 종료한다. 따라서 producer 측에서 subscribe하는 쪽에 리소스를 쉽게 노출할 수 없다. Channel과 함께 사용하면 더 쉬울 수 있지만, Channel은 노출했다가는 제대로 닫히지 않으면 리소스가 clean up되지 않을 수도 있다.

     

    앱의 data layer는 DB나 network로부터 data를 제공할 책임이 있다. Android summit 앱에는 DataSource interface가 있는데 user event data stream을 제공한다. 

    interface UserEventDataSource {
    
    fun getObservableUserEvent(userId: String): Flow<UserEventResult>
    
    }

    https://github.com/google/iosched/blob/adssched/shared/src/main/java/com/google/samples/apps/iosched/shared/data/userevent/UserEventDataSource.kt

     

    2. How to use Flow in your Android app architecture

    ViewModel에서 사용되기 전에 View/ViewModel과 DataSource(UseCase와 Repository) 사이에서는 종종 여러 종류의 query나 data 연산 후에 data를 다시 조합하는 경우가 많다. Kotlin sequence의 map, filter 등과 같은 operation과 비슷하게 Flow도 많은 data 연산 operator를 제공한다. DB쿼리 이후에 data 연산 시에는 그렇게 오래걸리는 task가 없을 것이지만 Flow의 operator들은 내부적으로 suspend function을 부르도록 되어있다.

     

    Android summit 앱에서도 UserEventResult와 event session data를 Repository layer에서 합치도록 되어있다. 우리는 Flow로부터 받은 각 값을 map operator를 이용해서 DataSource로부터 각 UserEvent value와 event session 정보를 합칠 것이다. 

    class DefaultSessionAndUserEventRepository(
    
        private val userEventDataSource: UserEventDataSource,
    
        private val sessionRepository: SessionRepository
    
    ) : SessionAndUserEventRepository {
    
    
    
        override fun getObservableUserEvent(
    
            userId: String?,
    
            eventId: SessionId
    
        ): Flow<Result<LoadUserSessionUseCaseResult>> {
    
            // Handles null userId
    
    
    
            // Observes the user events and merges them with session data
    
            return userEventDataSource.getObservableUserEvent(userId, eventId).map { userEventResult ->
    
                // lambda of the map operator that can call suspend functions
    
                val event = sessionRepository.getSession(eventId)
    
    
    
                // Merges session with user data and emits the result
    
                val userSession = UserSession(
    
                    event,
    
                    userEventResult.userEvent ?: createDefaultUserEvent(event)
    
                )
    
                Result.Success(LoadUserSessionUseCaseResult(userSession))
    
            }
    
        }
    
    }

     

     

    ViewModel

    UI와 ViewModel 의 communication을 LiveData로 할 때, ViewModel layer는 data stream의 소비를 Flow의 terminal operator(subscribe용)인 (collect, first, toList)로 할 수 있다.

    또한 Flow를 LiveData로 변환하고 싶을 때, Flow.asLiveData() 를 사용하면 된다. (https://mvnrepository.com/artifact/androidx.lifecycle/lifecycle-livedata-ktx?repo=google) Flow만으로 LiveData를 생성할 수 있어 아주 편리하고 observer들의 lifecycle에 기반한 subscription을 관리할 수 있다. 

    class SimplifiedSessionDetailViewModel(
    
      private val loadUserSessionUseCase: LoadUserSessionUseCase,
    
      ...
    
    ): ViewModel() {
    
      val sessions = loadUserSessionUseCase(sessionId).asLiveData()
    
    }

     

    3. When to use a BroadCastChannel or Flow as an implementation detail

    언제 Flow를 사용할까?

    DataSource implementation으로 돌아가서, getObservableUserEvent function을 어떻게 구현할까? Flow나 BroadCastChannel을 return으로 돌릴 수 있다. 각 차이에 대해서 알아보자.

    Flow는 Cold stream이다. Cold stream이란 data producer가 각 event를 consume하는 리스너들에게 data를 제공할 때, 각 리스너들에 대해서 새로운 data stream 의 result를 만드는 것이다. 즉, 각 리스너가 consume을 stop하거나 producer가 Complete나 Error로 종료하면 data의 stream은 자동적으로 close 된다. 

     

    “Flow는 data생성과 종료가 observer들의 lifecycle과 일치할 때 적합하다”

     

    Flow를 사용할 때, limited/unlimited 갯수로 생성이 가능하다.

    val oneElementFlow: Flow<Int> = flow {
    
      // producer block starts here, stream starts
    
      emit(1)
    
      // producer block finishes here, stream will be closed
    
    }
    
    val unlimitedElementFlow: Flow<Int> = flow {
    
      // producer block starts here, stream starts
    
      while(true) {
    
        // Do calculations
    
        emit(result)
    
        delay(100)
    
      }
    
      // producer block finishes here, stream will be closed
    
    }

     

    Flow는 coroutine cancellation에 의해 자동적으로 cleanup을 제공하는 expensive task로 사용되는 경항이 있다. cancellation은 상호적이다. 즉, Flow가 suspend하지 않으면 cancel되지 않는다. 예를 들면 delay는 cancellation 타이밍을 체크하는 suspend function인데, subscriber가 listening을 종료하면 Flow는 resource를 정리하고 data stream 연산을 stop한다.

     

    언제 BroadCast를 사용할까?

    Channel은 coroutine들 사이의 communication할 때 동시성고려 모델로 적합하다. BroadCastChannel은 Channel의 구현체로 Multicast 기능이 있다. 우리의 Data source layer에서 BroadCastChannel을 사용하고 싶다면, 

     

    “BroadcastChannel은 producer와 observer들과 다른 lifecycle을 갖거나 각각 독립적으로 동작할 때 사용하면 유용하다”

     

    BroadcastChannel은 producer가 listener들이 등록을 했던 중지했던 현재 result를 broadcasting해주는 역할이다. 이 방식에서는 producer가 매번 새로운 리스너가 등록될 때마다 새로 시작할 필요가 없고 알 필요도 없다.

     

    더보기

    ==> "Flows are conceptually and usually"cold". They typically don't have any state when they have no subscribers. An event bus is more directly modelled by a hot entity like a broadcast channel."

    https://github.com/Kotlin/kotlinx.coroutines/issues/1294

     

    Flow를 return하고 싶다면 Flow의 implentation을 알필요가 없다. extension function인  BraodcastChannel.asFlow()로 BroadcastChannel을 Flow로 변환할 수 있다. 

     

    단, Flow를 중단한다고 해도 BroadcastChannel의 subscribtion을 취소하지 않는다. BroadcastChannel을 사용할 때 listener들의 lifecycle을 모르기때문에, lifecycle은 각자 알아서 관리해야한다. BroadcastChannel이 close나 stop될 때까지 리소스를 계속 물고 있게 될 수도 있다. BroadcastChannel을 더이상 사용하지 않을 때, 제대로 close()를 호출해줘야한다. BroadcastChannel을 close한다는 것은 더이상 사용할 수 없다는 의미이다. 새로운 instance를 만들어야한다. 

     

    주의

    Flow나 BroadcastChannel API는 여전히 experimental하다. 특히 StateFlow와 share operator는 Channel 사용을 줄이게 할 것이다.

    https://github.com/Kotlin/kotlinx.coroutines/issues/1261

     

    4. Convert data streams callback-based APIs to Coroutines

    많은 library들이 이미 data stream 연산용으로 coroutine을 사용하고 있다. (ex. Room) library가 coroutine operator를 support하지 않는다면 callback-based coroutine을 사용하면 된다. 

     

    Flow implementation

    Flow로 data stream을 전달하고 싶다면, channelFlow() function이나 callbackFlow() function을 사용할 수 있다. channelFlow function은 Channel로 element들이 보내어지는 Flow의 instance를 생성한다. 다른 context 혹은 concurrently하게 동작하는 환경에서 element들을 전달받을 수 있도록 한다. 

     

    아래 예제에서, 우리는 Flow에 listener를 달고 element들을 emit받으려고 한다. 

    1. channelFlow로 Flow를 생성하고 callback listener를 third party library에 단다.

    2. callback에서 수신한 모든 element들을 Flow로 emit한다.

    3. subscriber들이 listening을 멈추면, suspend fun awaitClose로 unregister해준다. 

    override fun getObservableUserEvent(userId: String, eventId: SessionId): Flow<UserEventResult> {
    
    
    
        // 1) Create Flow with channelFlow
    
        return channelFlow<UserEventResult> {
    
    
    
            val eventDocument = firestore.collection(USERS_COLLECTION)
    
                .document(userId)
    
                .collection(EVENTS_COLLECTION)
    
                .document(eventId)
    
        
    
            // 1) Register callback to the API
    
            val subscription = eventDocument.addSnapshotListener { snapshot, _ ->
    
                val userEvent = if (snapshot.exists()) {
    
                    parseUserEvent(snapshot)
    
                } else { null }
    
    
    
                // 2) Send items to the Flow
    
                channel.offer(UserEventResult(userEvent))
    
            }
    
    
    
            // 3) Don't close the stream of data, keep it open until the consumer
    
            // stops listening or the API calls onCompleted or onError. 
    
            // When that happens, cancel the subscription to the 3P library
    
            awaitClose { subscription.remove() }
    
        }
    
    }

     

    BroadcastChannel implementation

    Firestore라는 API로 user 인증을 추적하는 data stream을 사용한다고 하자. 우리는 인증 API쪽에 listener를 등록하고 BroadcastChannel API를 사용했다. listener와 BroadcastChannel의 lifecycle은 다르고 현재 result를 누가 listening하고 있는지 BroadcastChannel은 모른다. 

     

    API를 BroadcastChannel로 변환하기 위해서 우리는 Flow보다 코드를 더 작성해야한다. 클래스를 생성하는데, BroadcastChannel을 variable로 들고 있는 형태이다. initialisation동안 callback을 등록하고 BroadcastChannel로 element들을 전달했다.

    class FirebaseAuthStateUserDataSource(...) : AuthStateUserDataSource {
    
    
    
        private val channel = ConflatedBroadcastChannel<Result<AuthenticatedUserInfo>>()
    
    
    
        private val listener: ((FirebaseAuth) -> Unit) = { auth ->
    
            // Data processing logic
    
    
    
            // Send the current user for observers
    
            if (!channel.isClosedForSend) {
    
                channel.offer(Success(FirebaseUserInfo(auth.currentUser)))
    
            } else {
    
                unregisterListener()
    
            }
    
        }
    
        
    
        @Synchronized
    
        override fun getBasicUserInfo(): Flow<Result<AuthenticatedUserInfo>> {
    
            if (!isListening) {
    
                firebase.addAuthStateListener(listener)
    
                isListening = true
    
            }
    
            return channel.asFlow()
    
        }
    
    }

     

    요약

    -Channel보다는 Flow를 return하라, 대부분의 유용한 contract와 operator는 Flow가 제공해준다.

    -Flow는 새로운 리스너가 등록될 때마다 수행되고 data stream의 ifecycle이 자동적으로 관리된다. 

    -BroadcastChannel은 여러 리스너가 공유하면 lifecycle을 listener쪽에서 알아서 관리해줘야한다.

    -callback-based API를 coroutine으로 변경(Flow나 Channel을 이용)을 고려해보아라. 더 이상적인 앱을 만들 수 있다.

     

    https://medium.com/androiddevelopers/lessons-learnt-using-coroutines-flow-4a6b285c0d06

     

     

     

     

     

     

     

     

     

     

     

     

    반응형
Designed by Tistory.