Android Retrofit 2 + RxJava: Listen To Endless Stream


Answer :

Here my solution:

You can use the @Streaming annotation:

public interface ITwitterAPI {      @GET("/2/rsvps")     @Streaming     Observable<ResponseBody> twitterStream(); }  ITwitterAPI api = new Retrofit.Builder()           .baseUrl("http://stream.meetup.com")           .addCallAdapterFactory(RxJavaCallAdapterFactory.create())           .build().create(ITwitterAPI.class); 

With @Streaming we can get raw input From ResponseBody.

Here my function to wrap body divided by lines with events:

public static Observable<String> events(BufferedSource source) {     return Observable.create(new Observable.OnSubscribe<String>() {         @Override         public void call(Subscriber<? super String> subscriber) {             try {                 while (!source.exhausted()) {                     subscriber.onNext(source.readUtf8Line());                 }                 subscriber.onCompleted();             } catch (IOException e) {                 e.printStackTrace();                 subscriber.onError(e);             }         }     }); } 

And result usage:

api.twitterStream()   .flatMap(responseBody -> events(responseBody.source()))   .subscribe(System.out::println); 

upd about gracefully stopping

When we unsubscribing, retrofit closes inputstream. But impossible to detect inputstream closed or not from inputstream themselves, so only way - try reading from stream - we gets exception with Socket closed message. We can interpret this exception as closing:

        @Override         public void call(Subscriber<? super String> subscriber) {             boolean isCompleted = false;             try {                 while (!source.exhausted()) {                     subscriber.onNext(source.readUtf8Line());                 }             } catch (IOException e) {                 if (e.getMessage().equals("Socket closed")) {                     isCompleted = true;                     subscriber.onCompleted();                 } else {                     throw new UncheckedIOException(e);                 }             }             //if response end we get here             if (!isCompleted) {                 subscriber.onCompleted();             }         } 

And if connection closed because response end, we haven't any exceptions. Here isCompleted check for that. Let me know if i am wrong :)


Comments

Popular posts from this blog

Converting A String To Int In Groovy

"Cannot Create Cache Directory /home//.composer/cache/repo/https---packagist.org/, Or Directory Is Not Writable. Proceeding Without Cache"

Android How Can I Convert A String To A Editable