Android Retrofit 2 + RxJava: Listen To Endless Stream
Answer :
Here my solution:
You can use the @Streaming annotation:
public interface ITwitterAPI { @GET("/2/rsvps") @Streaming Observable<ResponseBody> twitterStream(); } ITwitterAPI api = new Retrofit.Builder() .baseUrl("http://stream.meetup.com") .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) .build().create(ITwitterAPI.class);
With @Streaming
we can get raw input From ResponseBody
.
Here my function to wrap body divided by lines with events:
public static Observable<String> events(BufferedSource source) { return Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { try { while (!source.exhausted()) { subscriber.onNext(source.readUtf8Line()); } subscriber.onCompleted(); } catch (IOException e) { e.printStackTrace(); subscriber.onError(e); } } }); }
And result usage:
api.twitterStream() .flatMap(responseBody -> events(responseBody.source())) .subscribe(System.out::println);
upd about gracefully stopping
When we unsubscribing, retrofit closes inputstream. But impossible to detect inputstream closed or not from inputstream themselves, so only way - try reading from stream - we gets exception with Socket closed
message. We can interpret this exception as closing:
@Override public void call(Subscriber<? super String> subscriber) { boolean isCompleted = false; try { while (!source.exhausted()) { subscriber.onNext(source.readUtf8Line()); } } catch (IOException e) { if (e.getMessage().equals("Socket closed")) { isCompleted = true; subscriber.onCompleted(); } else { throw new UncheckedIOException(e); } } //if response end we get here if (!isCompleted) { subscriber.onCompleted(); } }
And if connection closed because response end, we haven't any exceptions. Here isCompleted
check for that. Let me know if i am wrong :)
Comments
Post a Comment