Kotlin/Coroutines / / 2023. 4. 26. 01:25

[Kotlin Coroutine] Flow 공식 문서 번역 및 정리 (3)


  • 불필요한 코드나 잘못 작성된 내용에 대한 지적은 언제나 환영합니다. 👍
  • Kotlin Coroutine 공식 문서 내의 Asynchronous Flow 페이지를 공부하며 이해하기 쉽도록 한글 문맥으로 더 자연스럽게 번역 및 정리한 글입니다.

 

Buffering

 

다른 코루틴에서 플로우의 다른 부분을 실행하는 것은 플로우를 수집하는 데 걸리는 총 시간 관점에서 도움이 될 수 있습니다. 특히 오랜 시간이 걸리는 비동기 작업이 포함된 경우가 그렇습니다. 예시를 한번 보도록 하겠습니다. simple 플로우를 생성하는 게 100ms 시간이 소요되고, 콜렉터도 해당 플로우를 처리하는데 조금 더 느려서 300ms 시간이 소요되는 케이스가 있다고 가정해 봅시다. 3개의 숫자로 구성된 이러한 플로우를 수집하는 데 얼마나 걸리는지 확인해 보겠습니다.

 

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple().collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
    }   
    println("Collected in $time ms")
}

다음과 같은 결과를 출력하며, 전체 수집에 약 1200ms (숫자 3개, 각각 400ms)가 걸립니다.

1
2
3
Collected in 1220 ms

 

우리는 buffer 연산자를 사용하여 simple 플로우의 emitting 코드를 순차적으로 실행하는 대신, 수집 코드와 동시에 실행할 수 있도록 할 수 있습니다.

val time = measureTimeMillis {
    simple()
        .buffer() // buffer emissions, don't wait
        .collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
}   
println("Collected in $time ms")

이렇게 하면 숫자들은 같지만, 순차적으로 실행하는 대신 simple 플로우의 emitting code를 동시에 실행하여 collecting code와 함께 처리하는 효율적인 파이프라인을 만듭니다. 따라서 첫 번째 숫자를 기다리는 데 약 100ms가 걸린 다음, 각 숫자를 처리하는 데는 300ms만 소요됩니다. 이 방법으로 실행하면 약 1000ms가 걸립니다.

1
2
3
Collected in 1071 ms
✏️ flowOn 연산자는 CoroutineDispatcher를 변경할 때 동일한 버퍼링 메커니즘을 사용하지만, 여기서는 실행 컨텍스트를 변경하지 않고 버퍼링을 요청합니다.

 

 

Conflation

 

플로우가 작업 또는 작업 상태의 부분적인 결과만 나타낼 때엔, 각각의 모든 값을 처리할 필요가 없고 대신 가장 최근 값만 처리할 수도 있습니다. 예를 들어, 콜렉터가 결과를 처리하기 너무 느릴 때, conflate 연산자를 사용하여 중간 값을 건너뛸 수 있습니다.

 

val time = measureTimeMillis {
    simple()
        .conflate() // conflate emissions, don't process each one
        .collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
}   
println("Collected in $time ms")

결과

1
3
Collected in 758 ms

첫 번째 번호가 아직 처리되는 동안 두 번째와 세 번째 번호가 이미 생성되었음을 알 수 있습니다. 그 결과 두 번째 숫자가 conflated 되어, ​​가장 최근의 숫자(세 번째 숫자)만 콜렉터에게 전달되었습니다.

 

 

Processing the latest value

 

Conflation은 발행자와 콜렉터 모두 느릴 때 처리 속도를 높이는 한 가지 방법입니다. 중간 값을 건너뛰고 가장 최근 값만 처리함으로써 불필요한 작업을 피하고 잠재적으로 처리 시간을 줄일 수 있습니다. 다른 방법은 느린 콜렉터를 취소하고, 새 값이 발행될 때마다 다시 시작하는 것입니다. 

xxxLatest 연산자는 xxx 연산자와 동일하게 작동하지만 새 값에 대한 블록의 코드를 취소합니다. 예를 들어, 이전 예시에서 conflatecollectLatest로 변경할 수 있습니다.

val time = measureTimeMillis {
    simple()
        .collectLatest { value -> // cancel & restart on the latest value
            println("Collecting $value") 
            delay(300) // pretend we are processing it for 300 ms
            println("Done $value") 
        } 
}   
println("Collected in $time ms")

결과

Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 741 ms

collectLatest는 300ms가 걸리지만, 100ms마다 새 값이 발행되기 때문에 블록이 모든 값에서 실행은 되지만 끝까지 프로세싱이 완료되는 건 마지막 값뿐 인 것을 볼 수 있습니다.

 


 

Composing multiple flows

 

여러 플로우를 구성하는 방법은 여러 가지가 있습니다.

 

Zip

 

Kotlin 표준 라이브러리의 Sequence.zip 확장 함수와 마찬가지로, 플로우에는 두 개 플로우의 해당 값을 결합하는 zip 연산자가 있습니다.

val nums = (1..3).asFlow() // numbers 1..3
val strs = flowOf("one", "two", "three") // strings 
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
    .collect { println(it) } // collect and print

결과

1 -> one
2 -> two
3 -> three

 

Combine

 

어떤 플로우가 변수나 작업의 가장 최근 값을 나타내는 경우(관련된 conflation 섹션도 참조), 해당 플로우에 따라 계산을 수행하고 업스트림 플로우에서 값을 발행할 때마다 다시 계산해야 할 수도 있습니다. 이에 해당하는 연산자의 그룹을 combine이라고 합니다.

예를 들어 이전 예제에서 숫자는 300ms마다 업데이트되지만 문자열은 400ms마다 업데이트되는 경우, zip 연산자를 사용하여 압축하면 400ms마다 결과가 출력되고 동일한 결과가 생성됩니다.

 

val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time 
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    }

결과

1 -> one at 428 ms from start
2 -> two at 828 ms from start
3 -> three at 1231 ms from start

 

이제 여기서 zip 연산자 대신 combine 연산자를 사용하는 경우를 살펴보겠습니다.

 

val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms          
val startTime = System.currentTimeMillis() // remember the start time 
nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    }

zip 대신 combine 연산자를 사용할 때 완전히 다른 출력을 얻습니다. 이 경우 nums 또는 strs 플로우의 각각 발행될 때마다 출력됩니다.

1 -> one at 452 ms from start
2 -> one at 651 ms from start
2 -> two at 854 ms from start
3 -> two at 952 ms from start
3 -> three at 1256 ms from start

 


 

Flattening flows

 

플로우는 값을 비동기적으로 받은 시퀀스로 나타냅니다. 그래서 각각의 값이 다른 값의 시퀀스를 요청하는 상황이 매우 쉽게 발생할 수 있습니다. 예시로 살펴보겠습니다. 500ms 차이로 두 개의 문자열 플로우를 반환하는 다음 함수를 보겠습니다.

 

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First")
    delay(500) // wait 500 ms
    emit("$i: Second")
}

이제 만약 세 개의 정수 Flow가 있고, 다음과 같이 각각에 대해 'requestFlow'를 호출한다면:

(1..3).asFlow().map { requestFlow(it) }

그러면 우리는 플로우가 플로우를 감싸고 있는 Flow<Flow<String>> 형태의 결과를 얻게 되며, 이를 사용하려면 하나의 Flow로 평탄화(flatten)해야 합니다. Collections과 sequences는 이를 위한 flatten 및 flatMap 연산자를 가지고 있습니다. 그러나 플로우는 비동기적인 특성 때문에 다른 평탄화 방식이 필요하며, 이에 대한 플로우의 평탄화 연산자(flattening operators) 집합이 존재합니다.

 


flatMapConcat

 

플로우의 플로우 연결(Concatenation)flatMapConcat 및 flattenConcat 연산자에 의해 제공됩니다. 이들은 해당 시퀀스 연산자의 가장 직접적인 유사체입니다. 다음 예제에서 알 수 있듯이 내부 플로우가 완료될 때까지 기다렸다가 다음 플로우를 수집합니다.

val startTime = System.currentTimeMillis() // remember the start time 
(1..3).asFlow().onEach { delay(100) } // emit a number every 100 ms 
    .flatMapConcat { requestFlow(it) }                                                                           
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    }

flatMapConcat의 순차적 특성이 결과에서 ​​명확하게 나타납니다.

1: First at 121 ms from start
1: Second at 622 ms from start
2: First at 727 ms from start
2: Second at 1227 ms from start
3: First at 1328 ms from start
3: Second at 1829 ms from start

 

 

flatMapMerge

 

또 다른 평탄화 작업은 들어오는 모든 플로우를 동시에 수집하고, 해당 값을 단일 플로우로 병합하여 값이 가능한 한 빨리 방출되도록 하는 것입니다. flatMapMergeflattenMerge 연산자를 통해 가능합니다. 두 연산자 모두 동시에 수집되는 플로우의 수를 제한하는 concurrency 매개변수를 받습니다 (옵셔널, 기본값 DEFAULT_CONCURRENCY)

val startTime = System.currentTimeMillis() // remember the start time 
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
    .flatMapMerge { requestFlow(it) }                                                                           
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    }

결과에서 flatMapMerge의 성질을 명확하게 볼 수 있습니다.

1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start
✏️ flatMapMerge는 코드 블록 { requestFlow(it) }을 순차적으로 호출하지만, 결과적으로 나온 Flow들을 동시에 수집합니다. 이는 순차적으로 map { requestFlow(it) }을 수행한 후에 결과에 대해 flattenMerge를 호출하는 것과 동일합니다.

 

 

flatMapLatest

 

flatMapLatest 연산자는 collectLatest 연산자와 유사한 방식으로 작동하면서 플로우를 평탄화합니다. 새로운 플로우가 방출되면 이전 플로우를 취소하고 새 플로우 수집을 시작합니다. flatMapLatest 연산자를 사용하여 가장 최근에 내보낸 플로우만 수집 및 처리되도록 할 수 있습니다.

 

val startTime = System.currentTimeMillis() // remember the start time 
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
    .flatMapLatest { requestFlow(it) }                                                                           
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    }

위 예시의 실행 결과는 flatMapLatest가 작동하는 방식을 잘 보여줍니다:

1: First at 142 ms from start
2: First at 322 ms from start
3: First at 425 ms from start
3: Second at 931 ms from start

 

이 예제에서 flatMapLatest는 새로운 값이 수신되면 { requestFlow(it) } 블록 내의 모든 코드를 취소합니다. 이것은 호출하는 requestFlow 자체가 빠르고, suspending function이 아니며, 취소될 수 없는 함수이기 때문에 이 특정 예제에서는 특별한 차이를 만들지 않습니다. 그러나 requestFlow에서 delay와 같은 suspending function을 사용한다면 출력에 차이가 있을 것입니다.


[Kotlin Coroutine] Flow 공식 문서 번역 및 정리 (4) 에서 계속됩니다.


 

 

  • 네이버 블로그 공유
  • 네이버 밴드 공유
  • 페이스북 공유
  • 카카오스토리 공유