Creating An RxJS Observable From A (server Sent) EventSource


Answer :

You could use the following code to manually create Observable for EventSource stream:



export class AppComponent implements OnInit {
someStrings:string[] = [];

constructor(private zone: NgZone) {}

ngOnInit(){
const observable = Observable.create(observer => {
const eventSource = new EventSource('/interval-sse-observable');
eventSource.onmessage = x => observer.next(x.data);
eventSource.onerror = x => observer.error(x);

return () => {
eventSource.close();
};
});

this.subscription = observable.subscribe({
next: guid => {
this.zone.run(() => this.someStrings.push(guid));
},
error: err => console.error('something wrong occurred: ' + err)
});
}
}

// somewhere
// this.subscription.unsubscribe()


Don't forget to import the NgZone class:



import {Component, OnInit, NgZone} from '@angular/core';


See also Angular2 View Not Changing After Data Is Updated



I should complete Yurzui's answer:



In my case, working with Angular 6 I had some weird behavior when assigning a function to onmessage. I therefore added an event listener instead and it worked like a charm:



const observable = Observable.create(observer => {
const eventSource = new EventSource('/interval-sse-observable');
eventSource.addEventListener("message", (event: MessageEvent) => observer.next(event.data));
eventSource.addEventListener("error", (event: MessageEvent) => observer.error(event));

return () => {
eventSource.close();
};
});


Comments

Popular posts from this blog

Converting A String To Int In Groovy

"Cannot Create Cache Directory /home//.composer/cache/repo/https---packagist.org/, Or Directory Is Not Writable. Proceeding Without Cache"

Android SDK Location Should Not Contain Whitespace, As This Cause Problems With NDK Tools