Android способы работы с многопоточностью

Содержание
  1. Потоки
  2. Использование фоновых потоков
  3. Плохое приложение
  4. Запуск потока
  5. Усыпить поток
  6. Приоритет потоков
  7. Отмена выполнения потока
  8. Многопоточность в Android. Все что вам нужно знать. Часть 1 — Введение
  9. Многозадачность в Android
  10. Компоненты многопоточности, которые присоединяются к активности / фрагменту
  11. AsyncTask
  12. Загрузчики
  13. Компоненты многопоточности, которые не присоединяются к активности / фрагменту
  14. Service
  15. IntentService
  16. Многопоточность и асинхронность
  17. Создание потоков и визуальный интерфейс
  18. Многопоточное программирование в Android с использованием RxJava 2
  19. Почему реактивное программирование?
  20. Никаких больше обратных вызовов
  21. Простой контроль ошибок
  22. Очень простое использование многопоточности
  23. RxJava НЕ многопоточна по умолчанию
  24. Простой пример
  25. Подружимся с планировщиками (Schedulers)
  26. Schedulers.io()
  27. Schedulers.computation()
  28. Schedulers.newThread()
  29. Schedulers.single()
  30. Schedulers.from(Executor executor)
  31. AndroidSchedulers.mainThread()
  32. Понимание subscribeOn() и observeOn()
  33. subscribeOn()
  34. Под капотом
  35. observeOn()
  36. Под капотом
  37. Резюме

Потоки

Потоки позволяют выполнять несколько задач одновременно, не мешая друг другу, что даёт возможность эффективно использовать системные ресурсы. Потоки используются в тех случаях, когда одно долгоиграющее действие не должно мешать другим действиям. Например, у нас есть музыкальный проигрыватель с кнопками воспроизведения и паузы. Если вы нажимаете кнопку воспроизведения и у вас запускается музыкальный файл в отдельном потоке, то вы не можете нажать на кнопку паузы, пока файл не воспроизведётся полностью. С помощью потоков вы можете обойти данное ограничение.

Использование фоновых потоков

Чтобы быть уверенным, что ваше приложение не теряет отзывчивости, хорошим решением станет перемещение всех медленных, трудоёмких операций из главного потока приложения в дочерний.

Применение фоновых потоков — необходимое условие, если вы хотите избежать появления диалогового окна для принудительного закрытия приложения. Когда активность в Android на протяжении 5 секунд не отвечает на события пользовательского ввода (например, нажатие кнопки) или приёмник широковещательных намерений не завершает работу обработчика onReceive() в течение 10 секунд, считается, что приложение зависло. Подобные ситуации следует избегать любой ценой. Используйте фоновые потоки для всех трудоёмких операций, включая работу с файлами, сетевые запросы, транзакции в базах данных и сложные вычисления.

Android предоставляет несколько механизмов перемещения функциональности в фоновый режим.

  • Activity.runOnUiThread(Runnable)
  • View.post(Runnable)
  • View.postDelayed(Runnable, long)
  • Handlers
  • AsyncTask

Класс AsyncTask позволяет определить операции, которые будут выполняться в фоне, вы также будете иметь доступ к обработчику событий, что позволит отслеживать прогресс выполнения задач и выводить результаты в контексте главного графического потока. Подробнее об этом классе в отдельной статье.

Хотя использование AsyncTask — хорошее решение, случается, что для работы в фоновом режиме приходится создавать собственные потоки и управлять ими.

В Java есть стандартный класс Thread, который вы можете использовать следующим образом:

Данный способ подходит только для операций, связанных с временем. Но вы не сможете обновлять графический интерфейс программы.

Если вам нужно обновлять интерфейс программы, то нужно использовать AsyncTask, о котором говорилось выше, или вы можете реализовать ваш собственный класс, наследованный от Thread, используя объект Handler из пакета android.os для синхронизации с потоком GUI перед обновлением пользовательского интерфейса.

Вы можете создавать дочерние потоки и управлять ими с помощью класса Handler, а также классов, доступных в пространстве имён java.lang.Thread. Ниже показан простой каркас для переноса операций в дочерний поток.

Плохое приложение

Напишем «плохое» приложение, неправильно использующее основной поток. Однажды мы писали программу для подсчёта ворон. На этот раз будем считать чёрных котов, которые перебегают нам дорогу. Зачем они это делают — молчит наука. Может быть собранная статистика поможет разгадать тайну. Добавим на экран активности кнопки и текстовую метку. Код для щелчка кнопки.

Для имитации тяжёлой работы программа делает паузу на двадцать секунд, а потом выводит текст с подсчётом котов. Если нажать на кнопку один раз и подождать двадцать секунд, то программа отработает как положено. Но представьте себе, что вы нажали на кнопку один раз. Программа запустила паузу. Вы, не дожидаясь окончания паузы, снова нажали на кнопку. Программа должна выполнить вашу команду, но предыдущая команда ещё не отработала и наступает конфликт. Попробуйте нажать на кнопку несколько раз с небольшими перерывами. В какой-то момент приложение зависнет и выведет системное диалоговое окно:

В реальных приложениях такое окно может разозлить пользователя и он поставит низкую оценку вашему приложению.

В данном случае ошибку вызывает не сам вывод текста в текстовой метке, который, к слову, тоже выполняется в основном потоке, а сам щелчок кнопки. Если вы закомментируете последние две строчки кода, связанные с TextView, то ошибка сохранится.

Вам необходимо перенести трудоёмкую задачу в отдельный поток. Для этого создаётся экземпляр класса Runnable, у которого есть метод run(). Далее создаётся объект Thread, в конструкторе у которого указывается созданный Runnable. После этого можно запускать новый поток с помощью метода start(). Перепишем пример.

Весь код мы перенесли в метод run(). Теперь вы можете безостановочно щёлкать по кнопке. На этот раз приложение сохранит свою работоспособность. Чтобы в этом убедиться, в код добавлено протоколирование логов Log.i(). При каждом нажатии создаётся новый поток, в котором выполняется код. Потоки друг другу не мешают и дожидаются своей очереди, когда система позволит им отработать.

Основной поток также называют UI-потоком. Имено в главном потоке можно обновить текст у текстовой метки. В создаваемых нами потоках это делать нельзя. Если вы уберёте комментарии с последнего примера и запустите проект, то получите сообщение об ошибке.

Нужен некий посредник между создаваемыми потоками и основным UI-потоком. В роли такого посредника служит класс Handler (полное название класса android.os.Handler, не перепутайте). Вам нужно создать экземпляр класса и указать код, который нужно выполнить.

После строчки кода с Log.i() добавьте вызов метода посредника.

Поток вызывает посредника, который в свою очередь обновляет интерфейс. В нашем случае посредник посылает пустое сообщение от потока.

Но бывает так, что от потока требуется получить информацию для обработки. Ниже упрощённый пример.

Запуск потока

Предположим, мы разрабатываем собственный проигрыватель. У нас есть кнопка Play, которая вызывает метод play() для воспроизведения музыки:

Теперь запустим метод в другом потоке. Сначала создаётся новый поток. Далее описывается объект Runnable в конструкторе потока. А внутри созданного потока вызываем наш метод play(). И, наконец, запускаем поток.

Усыпить поток

Иногда требуется временно приостановить поток («усыпить»):

Приоритет потоков

Для установки приоритета процесса используется метод setPriority(), который вызывается до запуска потока. Значение приоритета может варьироваться от Thread.MIN_PRIORITY (1) до Thread.MAX_PRIORITY (10):

Отмена выполнения потока

У потока есть метод stop(), но использовать его не рекомендуется, поскольку он оставляет приложение в неопределённом состоянии. Обычно используют такой подход:

Существует и другой способ, когда все запускаемые потоки объявляются демонами. В этом случае все запущенные потоки будут автоматически завершены при завершении основного потока приложения:

Источник

Многопоточность в Android. Все что вам нужно знать. Часть 1 — Введение

13.08.2017 в 11:40

Каждый Android разработчик, в тот или иной момент сталкивается с необходимостью иметь дело с потоками в своем приложении.

Читайте также:  Законные способы увольнения работника

Когда приложение запускается, оно создает первый поток выполнения, известный как основной поток или main thread. Основной поток отвечает за отправку событий в соответствующие виджеты пользовательского интерфейса, а также связь с компонентами из набора инструментов Android UI.

Чтобы ваше приложение сохраняло отзывчивость, важно избегать использования основного потока для выполнения любой операции, которая может привести к его блокировке.

Сетевые операции и обращения к базе данных, а также загрузка определенных компонентов, являются типичными примерами операций, которые не следует выполнять в основном потоке. Когда они вызваны в главном потоке, они вызваны синхронно, что означает, что пользовательский интерфейс не будет ни на что реагировать до завершения операции. По этой причине, они обычно выполняются в отдельных потоках, что позволяет избежать блокировки пользовательского интерфейса во время выполнения (т. е. они выполняются асинхронно из UI).

Android предоставляет множество способов создания и управления потоками, и множество сторонних библиотек, которые делают управление потоками гораздо более приятным.

В этой статье вы узнаете о некоторых распространенных сценариях в Android разработке, где многопоточность становится важной, и некоторые простые решения, которые могут быть применены к этим сценариям, и многое другое.

Многозадачность в Android

В Android вы можете классифицировать все компоненты потоков на две основные категории:

Потоки связанные с активностью / фрагментом. Эти потоки привязаны к жизненному циклу активности / фрагмента и завершаются сразу после их уничтожения.

Потоки не связанные с активностью / фрагментом. Эти потоки могут продолжать работать за пределами жизни активности / фрагмента (если есть), из которых они были созданы.

Компоненты многопоточности, которые присоединяются к активности / фрагменту

AsyncTask

AsyncTask это наиболее основной Android компонент для организации потоков. Он прост в использовании и может быть хорошей основой для вашего сценария.

Однако, AsyncTask не подойдет, если вам нужен отложенный запуск задачи, после завершения работы вашей активности / фрагмента. Стоит отметить, что даже такая простая вещь, как вращение экрана может вызвать уничтожение активности.

Загрузчики

Загрузчики могут решить проблемы, упомянутые выше. Загрузчик автоматически останавливается, когда уничтожается активность и перезапускает себя, после пересоздания активности.

В основном есть два типа загрузчиков: AsyncTaskLoader и CursorLoader . О загрузчике CursorLoader вы узнаете далее в этой статье.

AsyncTaskLoader похож на AsyncTask , но немного сложнее.

Компоненты многопоточности, которые не присоединяются к активности / фрагменту

Service

Service это компонент, который полезен для выполнения длинных (или потенциально длительных) операций без какого-либо пользовательского интерфейса.

Service работает в основном потоке своего процесса; не создает свой собственный поток и не запускается в отдельном процессе, если вы это не указали.

Используя Service вы обязаны остановить его, когда его работа будет завершена, вызвав методы stopSelf() или stopService() .

IntentService

IntentService работает в отдельном потоке и автоматически останавливается после завершения работы.

IntentService обычно используется для коротких задач, которые не обязательно должны быть привязаны к какому-либо пользовательскому интерфейсу.

Источник

Многопоточность и асинхронность

Создание потоков и визуальный интерфейс

Когда мы запускаем приложение на Android, система создает поток, который называется основным потоком приложения или UI-поток. Этот поток обрабатывает все изменения и события пользовательского интерфейса. Однако для вспомогательных операций, таких как отправка или загрузка файла, продолжительные вычисления и т.д., мы можем создавать дополнительные потоки.

Для создания новых потоков нам доcтупен стандартный функционал класса Thread из базовой библиотеки Java из пакета java.util.concurrent , которые особой трудности не представляют. Тем не менее трудности могут возникнуть при обновлении визуального интерфейса из потока.

Например, создадим простейшее приложение с использованием потоков. Определим следующую разметку интерфейса в файле activity_main.xml :

Здесь определена кнопка для запуска фонового потока, а также текстовое поле для отображения некоторых данных, которые будут генерироваться в запущенном потоке.

Далее определим в классе MainActivity следующий код:

Итак, здесь к кнопке прикреплен обработчик нажатия, который запускает новый поток. Создавать и запускать поток в Java можно различными способами. В данном случае сами действия, которые выполняются в потоке, определяются в методе run() объекта Runnable :

Для примера получаем текущее время и пытаемся отобразить его в элементе TextView.

Далее определяем объект потока — объект Thread , который принимает объект Runnable. И с помощью метода start() запускаем поток:

Вроде ничего сложного. Но если мы запустим приложение и нажмем на кнопку, то мы столкнемся с ошибкой:

Поскольку изменять состояние визуальных элементов, обращаться к ним мы можем только в основном потоке приложения или UI-потоке.

Для решения этой проблемы — взаимодействия во вторичных потоках с элементами графического интерфейса класс View() определяет метод post() :

В качестве параметра он принимает задачу, которую надо выполнить, и возвращает логическое значение — true , если задача Runnable успешно помещена в очередь сообщение, или false , если не удалось разместить в очереди

Также у класса View есть аналогичный метод:

Он также запускает задачу, только через определенный промежуток времени в миллисекундах, который указывается во втором параметре.

Так, изменим код MainActivity следующим образом

Теперь для обновления TextView применяется метод post:

То есть здесь в методе run() передаемого в метод post() объекта Runnable мы можем обращаться к элементам визуального интерфейса и взаимодействовать с ними.

Подобным образом можно работать и с другими виджетами, которые наследуются от класса View .

Источник

Многопоточное программирование в Android с использованием RxJava 2

Если вы новичок в общении с RxJava или пытались разобраться в этом, но не довели дело до конца, то ниже вы найдете для себя кое-что новое.


Оригинал статьи написан 29 ноября 2017. Перевод вольный.

Нам в GO-JEK требуется выполнять большое количество асинхронных операций в приложениях и мы не можем позволить себе идти на компромиссы в ущерб скорости работы и плавности пользовательского интерфейса.

Написание сложных многопоточных Android приложений может быть достаточно трудоемким процессом, который время от времени будет вас сильно перегружать из-за необходимости заботиться о большом количестве связанных друг с другом вещей. Это и многие другие причины убедили нас использовать RxJava в разрабатываемых Android приложениях.

В этой статье мы поговорим о том как мы использовали реальные возможности работы с многопоточностью в RxJava для того, чтобы сделать процесс разработки приложения максимально простым, легким и веселым. Во всех примерах кода ниже будет использоваться RxJava 2, но описанные концепции можно будет применять и в других реактивных расширениях.

Почему реактивное программирование?

Каждая статья о реактивном программировании начинается с такого обязательного блока и мы не нарушим эту традицию. Существует несколько преимуществ использования реактивного подхода к построению Android приложений, обратим внимание на те, которые вам действительно нужны.

Никаких больше обратных вызовов

Если вы давно разрабатываете под Android, то, должно быть, заметили, как быстро вещи становятся чересчур сложными и неподконтрольными с использованием вложенных обратных вызовов.

Это происходит, когда вы выполняете несколько асинхронных операций последовательно и хотите, чтобы дальнейшие действия зависели от результата предыдущих операций. Почти сразу код становится слишком перегруженным и сложным для поддержки.

Читайте также:  Способы взаимодействия с другими культурами

Простой контроль ошибок

В императивном мире, в ситуации когда выполняется множество сложных асинхронных операций, ошибки могут возникать в большом количестве мест. И в каждом месте вы должны обрабатывать эти ошибки, в результате появляется много повторяющегося шаблонного кода, методы становятся громоздкими.

Очень простое использование многопоточности

Все мы знаем (и тайно признаем) насколько иногда сложной может быть работа с многопоточностью в Java. Например, выполнение части кода в фоновом потоке и возврат результата обратно в главный поток. Это только звучит просто, но на практике появляется много подводных камней, которые нужно обходить.

RxJava делает безумно легким выполнение нескольких сложных операций в любом потоке на ваш выбор, заботясь о корректной синхронизации и позволяя без проблем переключаться между потоками.

Преимущества RxJava бесконечны. Мы можем говорить об этом часами и адски утомить вас, но вместо этого давайте копнем глубже и начнем изучать реальную работу с многопоточностью в RxJava.

RxJava НЕ многопоточна по умолчанию

Да, вы прочли всё верно. RxJava по умолчанию не многопоточна в любом случае. Определение, данное для RxJava на официальном сайте, выглядит примерно следующим образом:
«Библиотека для составления асинхронных и основанных на событиях программ с использованием последовательностей (observable sequences) для виртуальной Java машины».

Увидев слово «асинхронных», многие люди ошибочно полагают, что RxJava многопоточна по умолчанию. Да, RxJava поддерживает многопоточность, предлагает множество мощных возможностей для легкой работы с асинхронными операциями, но это не значит что поведение RxJava по умолчанию многопоточно.

Если вы уже немного работали с RxJava, то её знаете базовые конструкции:

  • Наблюдаемый источник (source Observable), далее
  • несколько операторов (Operators), затем
  • целевой подписчик (Subscriber)

Если вы запустите данный пример кода, то ясно увидите, что все действия выполняются в основном потоке приложения (проследите за именами потоков в логе в консоли). Этот пример показывает, что по умолчанию поведение RxJava блокирующее. Всё выполняется в том же потоке, в котором вызван код.

Бонус: Интересно, что же делает doOnNext() ? Это не что иное, как side-effect оператор. Он помогает внедряться в цепочку объектов observable и выполнять грязные (impure) операции. Например, внедрять дополнительный код в цепочке вызовов для отладки. Прочитать больше можно здесь.

Простой пример

Для того, чтобы начать работать с многопоточностью с применением RxJava необходимо познакомиться с базовыми классами и методами, такими как Schedulers, observeOn/subscribeOn.

Давайте рассмотрим один из самых простых примеров. Допустим, мы хотим получить список объектов Book сетевым запросом и показать его в основном потоке приложения. Довольно общий и понятный пример для начала.

Здесь мы видим метод getBooks() , который осуществляет сетевой вызов и собирает список книг для нас. Сетевой вызов занимает время (несколько миллисекунд или секунд), поэтому мы используем subscribeOn() и указываем планировщик Schedulers.io() для выполнения операции в потоке ввода-вывода.

Также мы используем оператор observeOn() вместе с планировщиком AndroidSchedulers.mainThread() для того, чтобы обрабатывать результат в основном потоке и показать список книг в пользовательском интерфейсе приложения.

Не волнуйтесь, скоро мы перейдем к более продвинутым вещам. Этот пример был предназначен только для того, чтобы вспомнить базовые понятия, прежде чем погрузиться глубже.

Подружимся с планировщиками (Schedulers)

RxJava предоставляет мощный набор планировщиков. Вы не можете получить прямой доступ к потокам или управлять ими. Если вам нужно работать с потоками, то необходимо воспользоваться встроенными планировщиками.

Можете представлять планировщики как потоки или пулы потоков (коллекции потоков) для выполнения разного рода задач.

Говоря проще, если вам нужно выполнить задачу в отдельном потоке — необходимо использовать верный планировщик, который возьмёт поток из своего пула доступных потоков и выполнит в нём задачу.

В RxJava доступны несколько типов планировщиков. Самая сложная часть — выбрать верный планировщик для вашей задачи. Задача никогда не будет выполняться оптимально, если вы не выберете верный планировщик. Давайте разберем каждый планировщик.

Schedulers.io()

Этот планировщик основывается на неограниченном пуле потоков и используется для интенсивной работы с вводом-выводом без использования ЦП, например, доступ к файловой системе, выполнение сетевых вызовов, доступ к базе данных и так далее. Количество потоков в этом планировщике неограничено и может расти по мере необходимости.

Schedulers.computation()

Этот планировщик используется для выполнения работы, высоко нагружающей ЦП, такой как обработка больших объемов данных, изображений и так далее. Планировщик основывается на ограниченном пуле потоков с размером в количество доступных процессоров.
Так как этот планировщик подходит только для интенсивной работы с ЦП — количество его потоков ограничено. Сделано это для того, чтобы потоки не конкурировали за процессорное время и не простаивали.

Schedulers.newThread()

Этот планировщик создает совершенно новый поток при каждом вызове. В данном случае использование пула потоков не принесет никакой выгоды. Потоки очень затратно создавать и уничтожать. Вы должны быть осторожны и не злоупотреблять чрезмерным созданием потоков, так как это может привести в замедлению работы системы и переполнению памяти. Новый поток будет создаваться для обработки каждого элемента, полученного из observable-источника.
В идеале вы должны использовать этот планировщик довольно редко, в основном для выведения в отдельный поток долго работающих частей программы.

Schedulers.single()

Этот планировщик основывается на единственном потоке, который используется для последовательного выполнения задач. Он может быть очень полезен, когда у вас есть набор фоновых заданий в разных местах вашего приложения, но нельзя допустить одновременного выполнения более чем одного из этих заданий.

Schedulers.from(Executor executor)

Этот планировщик будет основываться на вашем собственном Executor . Может возникнуть ситуация, в которой необходимо будет выполнять определенные задачи в планировщике на основании собственной логики распределения потоков.

Допустим, вы хотите ограничить число параллельных сетевых вызовов, которые делает ваше приложение. Можно создать собственный планировщик, который будет работать на базе ограниченного в размерах пула потоков ( Scheduler.from(Executors.newFixedThreadPool(n)) ) и использовать его во всех местах, связанных с сетевыми вызовами.

AndroidSchedulers.mainThread()

Это специальный планировщик, который недоступен в библиотеке RxJava. Необходимо использовать расширяющую библиотеку RxAndroid для доступа к этому планировщику. Этот планировщик полезен в Android приложениях для выполнения действий в потоке пользовательского интерфейса.
По умолчанию этот планировщик ставит задания в очередь в Looper , связанный с основным потоком, но есть возможность переопределения: AndroidSchedulers.from(Looper looper) .

Заметка: Будьте осторожны в использовании планировщиков, основанных на неограниченных пулах потоков, таких как Schedulers.io() . Всегда есть риск бесконечного роста количества потоков.

Понимание subscribeOn() и observeOn()

Теперь, когда у вас есть представление о типах планировщиков, разберем subscribeOn() и observeOn() в деталях.

Вы должны глубоко разбираться в том, как эти два оператора работают по отдельности и вместе, чтобы профессионально работать с многопоточностью в RxJava.

subscribeOn()

Простыми словами, этот оператор говорит в какой поток наблюдаемый источник (source observable) будет передавать элементы. Вы должны уяснить важность слова «источник». Когда у вас цепь наблюдаемых элементов (observables), источник (source observable) — это всегда корневой элемент или верхняя часть цепи, откуда происходит создание событий.

Читайте также:  Способы минимизации угрозы банкротства презентация

Как вы уже видели, если не использовать subscribeOn() , то все события происходят в том потоке, в котором произошел вызов кода (в нашем случае — main поток).

Давайте перенаправим события в вычислительный поток с помощью subscribeOn() и планировщика Schedulers.computation() . Когда вы запустите нижеследующий пример кода, то увидите, что события происходят в одном из вычислительных потоков, доступных в пуле — RxComputThreadPool-1 .

В целях сокращения кода мы не будем полностью переопределять все методы DisposableSubscriber , так как нам не нужно переопределять onError() и onComplete() . Воспользуемся doOnNext() и лямбдами.

Не важно в каком месте в цепочке вызовов вы используете subscribeOn() . Он работает только с наблюдаемым источником (source observable), и контролирует в какой поток наблюдаемый источник передает события.

В нижеследующем примере после observable-источника создаются другие объекты observable (методами map() и filter() ), а оператор subscribeOn() помещен в конце цепочки вызовов. Но как только вы запустите этот код, то заметите, что все события будут возникать в потоке, указанном в subscribeOn() . Это станет более понятным при добавлении observeOn() в цепь вызовов. И даже если мы разместим subscribeOn() ниже observeOn() , то логика работы не изменится. subscribeOn() работает только с наблюдаемым источником (source observable).

Также важно понять, что нельзя использовать subscribeOn() несколько раз в одной цепочке вызовов. Можно, конечно, написать ещё раз, но никаких изменений это не повлечет. В примере ниже мы последовательно вызываем три различных планировщика, можете ли вы догадаться, какой планировщик сработает при запуске?

Если вы ответили Schedulers.io() , то вы правы! Даже если делать вызов многократно — сработает только первый subscribeOn() , вызванный после observable-источника.

Под капотом

Стоит потратить ещё немного времени на более подробное изучение рассмотренного примера. Почему срабатывает только планировщик Schedulers.io() ? Обычно все думают, что сработает Schedulers.newThread() , так как этот вызов находится в конце цепочки.

Необходимо понять, что в RxJava подписка создаётся после обратного вызова всех экземпляров Observable . Код ниже поможет нам разобраться в этом. Это ранее рассмотренный пример, но расписанный подробнее.

Для того, чтобы понять как всё работает — начнем разбирать всё с последней строки примера. В ней целевой подписчик (target subscriber), вызывает метод subscribe() у observable объекта o3 , который затем делает неявный вызов subscribe() у своего родительского observable объекта o2 . Реализация наблюдателя (observer), предоставляемая объектом o3 , умножает переданные числа на 10.

Процесс повторяется и o2 неявно вызывает subscribe() у объекта o1 , передавая реализацию наблюдателя, которая позволяет обрабатывать только четные числа. Теперь мы достигли корневого элемента ( o1 ), у которого нет родителя для последующего вызова subscribe() . На этом этапе завершается цепочка наблюдаемых (observable) элементов, после чего observable-источник начинает передавать (emit) элементы.

Теперь для вас должна быть понятна концепция работы подписок в RxJava. К настоящему времени у вас должно появиться понимание того, как формируются цепочки наблюдаемых (observable) объектов и как события распространяются, начиная с observable-источника.

observeOn()

Как мы уже видели, subscribeOn() указывает observable-источнику передавать элементы в определенный поток и этот поток будет отвечать за продвижение элементов вплоть до подписчика (Subscriber). Поэтому, по умолчанию, подписчик получает обработанные элементы в этом же потоке.

Но это может быть не то поведение, которого вы ожидаете. Предположим, вы хотите получить некие данные из сети и отобразить их в пользовательском интерфейсе.

Нужно выполнить две вещи:

  • Сделать сетевой вызов в неблокирующем потоке ввода-вывода
  • Получить результат в основном потоке приложения

У вас будет Observable , который осуществляет сетевой вызов в потоке ввода-вывода и передает результат подписчику. Если вы используете только subscribeOn(Schedulers.io()) , то целевой подписчик будет обрабатывать результат в том же потоке ввода-вывода. И нам не повезло, так как работать с пользовательским интерфейсом в Android можно только в основном потоке.

Теперь нам крайне необходимо переключить потоки и мы используем для этого observeOn() . Когда observeOn() встречается в цепочке вызовов, то передаваемые observable-источником элементы незамедлительно перебрасываются в поток, указанный в observeOn() .

В этом придуманном примере мы наблюдаем получение целых чисел из сети и их дальнейшую передачу из observable-источника. В реальных примерах это может быть любая другая асинхронная операция, например, чтение большого файла, выборка данных из базы данных и т.д. Вы можете запустить данный пример и посмотреть на результаты, просто следите за логами в консоли.

Теперь рассмотрим более сложный пример, в котором observeOn() будет вызываться несколько раз для переключения потоков в процессе обработки данных.

В примере выше observable-источник передаёт элементы в цепочку обработчиков в потоке ввода вывода, так как мы использовали subscribeOn() вместе с Schedulers.io() . Далее мы хотим преобразовать каждый элемент, используя оператор map() , но сделать это нужно в вычислительном потоке. Для этого используем observeOn() вместе с Schedulers.computation() перед вызовом map() для переключения потока и передачи элементов в целевой вычислительный поток.

Следующим шагом отфильтруем некоторые элементы и по какой-то причине мы хотим выполнить эту операцию в новом потоке для каждого из элементов. Используем снова observeOn() , но уже в паре с Schedulers.newThread() перед вызовом оператора filter() для передачи каждого элемента в новый поток.

В итоге мы хотим, чтобы подписчик получил результат обработки в потоке пользовательского интерфейса. Для этого используем observeOn() вместе с планировщиком AndroidSchedulers.mainThread() .

Но что произойдет, если использовать observeOn() несколько раз последовательно? В примере ниже в каком потоке подписчик получит результат?

Если запустите пример, то увидите, что подписчик получит элементы в вычислительном потоке RxComputationThreadPool-1 . Это значит, что сработал последний вызванный observeOn() . Интересно почему?

Под капотом

Возможно вы уже догадались. Как мы знаем, подписка (subscription) вызывается после обратного обхода всех Obsevable , но с передачей событий (emissions) всё происходит наоборот, то есть в обычном порядке, как записан код. Вызов происходит от observable-источника и далее вниз по цепочке вызова вплоть до подписчика.

Оператор observeOn() всегда работает в прямом порядке, поэтому последовательно происходит переключение потоков и последним происходит переключение на вычислительный поток ( observeOn(Schedulers.computation()) ). Итак, когда нужно переключить поток для обработки данных в новом потоке, то просто сначала вызовите observeOn() , а далее обрабатывайте элементы. Синхронизация, исключение состояния гонки, всё это и многие другие сложности многопоточности RxJava обрабатывает за вас.

Резюме

Сейчас у вас должно быть достаточно хорошее представление о том, как правильно использовать RxJava для написания многопоточных приложений, обеспечивающих быструю и плавную работу пользовательского интерфейса.

Если понимание не пришло сразу, ничего страшного. Прочитайте статью ещё раз, поэкспериментируйте с примерами кода. Здесь достаточно много нюансов для понимания, не торопитесь.

Источник

Оцените статью
Разные способы