Асинхронный вызов для каждого элемента внутри коллекции

У меня проблема, которую я не смог решить до сих пор. Я новичок в RxKotlin, так что это может быть легко. Посмотрите на код:

override fun infos(): Stream<Info> = client.infoAboutItem(identifier) .map { val itemId = it.itemId ?: "" val item = client.itemForId(itemId) ClientInfo(client, it, source, item) as Info } .let { AccessStream(it) } 

Где поток – наша собственная коллекция. Карта – это метод, который позволяет выполнять итерацию по каждому элементу внутри этой коллекции.

Проблема здесь в том, что

  client.itemForId(itemId) 

это http-вызов, который возвращает Single, который не является идеальным.

Я хотел бы создать асинхронный вызов внутри карты, который будет возвращать Item вместо Single, а затем передать его в ClientInfo. То, что я пробовал до сих пор, заключалось в использовании подписки внутри карты и использовании метода blockingGet (), но это блокирует основной поток, даже если я наблюдаю и подписываюсь на другой поток

Поэтому он включает в себя асинхронный вызов для каждой вещи в коллекции.

Спасибо за помощь

Вы можете попытаться вернуть Observable<Stream<Info>> а затем это будет выглядеть так:

  override fun infos(): Observable<Stream<Info>> = Observable.from(client.infoAboutItem(identifier)) .flatMapSingle { val itemId = it.itemId ?: "" client.itemForId(itemId) } .map { ClientInfo(client, it, source, item) as Info } .toList() .flatMap { AccessStream(it) } 

Вы должны перенести эту дорогостоящую операцию в наблюдаемую и использовать плоскую карту, чтобы занести эти данные в Информацию о клиенте.

Я написал небольшой образец, чтобы показать его.

 class SimpleTest { val testScheduler = TestScheduler() @Test fun test() { infos().observeOn(Schedulers.immediate()) .subscribe { logger("Output", it.toString()) } testScheduler.advanceTimeBy(10, TimeUnit.MINUTES) } fun infos(): Single<List<ClientInfo>> { return Observable.from(infoAboutItem("some_identifier")) .doOnNext { logger("Next", it.toString()) } .flatMap { aboutItem -> Observable.fromCallable { itemForId(aboutItem.itemId) } .subscribeOn(testScheduler) .map { ClientInfo(aboutItem = aboutItem, item = it) } } .doOnNext { logger("Next", it.toString()) } .toList() .toSingle() } data class ClientInfo( val id: String = UUID.randomUUID().toString(), val aboutItem: AboutItem, val item: Item ) data class AboutItem(val itemId: String = UUID.randomUUID().toString()) data class Item(val id: String = UUID.randomUUID().toString()) fun infoAboutItem(identifier: String): List<AboutItem> { return (1..10).map { AboutItem() } } fun itemForId(itemId: String): Item { val sleepTime = Random().nextInt(1000).toLong() Thread.sleep(sleepTime) return Item() } fun logger(tag: String, message: String): Unit { val formattedDate = Date(Schedulers.immediate().now()).format() System.out.println("$tag @ $formattedDate: $message") } fun Date.format(): String { return SimpleDateFormat("HH:mm:ss.SSS", Locale.US).format(this) } } о class SimpleTest { val testScheduler = TestScheduler() @Test fun test() { infos().observeOn(Schedulers.immediate()) .subscribe { logger("Output", it.toString()) } testScheduler.advanceTimeBy(10, TimeUnit.MINUTES) } fun infos(): Single<List<ClientInfo>> { return Observable.from(infoAboutItem("some_identifier")) .doOnNext { logger("Next", it.toString()) } .flatMap { aboutItem -> Observable.fromCallable { itemForId(aboutItem.itemId) } .subscribeOn(testScheduler) .map { ClientInfo(aboutItem = aboutItem, item = it) } } .doOnNext { logger("Next", it.toString()) } .toList() .toSingle() } data class ClientInfo( val id: String = UUID.randomUUID().toString(), val aboutItem: AboutItem, val item: Item ) data class AboutItem(val itemId: String = UUID.randomUUID().toString()) data class Item(val id: String = UUID.randomUUID().toString()) fun infoAboutItem(identifier: String): List<AboutItem> { return (1..10).map { AboutItem() } } fun itemForId(itemId: String): Item { val sleepTime = Random().nextInt(1000).toLong() Thread.sleep(sleepTime) return Item() } fun logger(tag: String, message: String): Unit { val formattedDate = Date(Schedulers.immediate().now()).format() System.out.println("$tag @ $formattedDate: $message") } fun Date.format(): String { return SimpleDateFormat("HH:mm:ss.SSS", Locale.US).format(this) } } 
  • Как показать диалог «Прогресс» и «Отклонить»
  • Как сделать groupBy и собирать с помощью RxJava и Kotlin?
  • RxKotlin - Single.just () не выделяется при подписке на TestSubscriber
  • Несколько запросов retrofit2 с использованием Flowable в Котлине
  • Rx-Kotlin ждутTerminalEvent, никогда не получающего onComplete
  • как реализовать Switch, используя привязку данных в android
  • RxJava - Входы клавиатуры с обратным давлением?
  • Как закрепить несколько наблюдаемых в языке Kotlin с помощью RxAndroid
  • RecyclerView не является прокручиваемым, а элементный клик не работает в recyclerview
  • Rxjava с kotlin
  • RxKotlin - Динамический массив наблюдателей
  • Давайте будем гением компьютера.