【问题标题】:Read buffered response with Angular2/RxJS使用 Angular2/RxJS 读取缓冲响应
【发布时间】:2017-02-10 18:07:50
【问题描述】:

我正在构建一个从后端读取数据的网站。该数据是即时计算的,并以缓冲的方式发送回客户端。 IE。一旦计算出第一个块,它就会发送给客户端,然后它计算下一个块并将其发送给客户端。这整个过程发生在同一个 HTTP 请求中。客户端不应等待完整的响应完成,而应在发送后立即自行处理每个块。此类响应通常可以使用 XHR 进度处理程序(例如 How to get progress from XMLHttpRequest)来使用。

如何使用 Angular2 中的 HttpModule 使用 RxJS 和 Observables 来处理这样的响应?


编辑:peeskillet 在下面给出了出色而详细的答案。另外,我做了一些进一步的挖掘,发现了一个feature request for the HttpModule of Angular和一个StackOverflow question with another approach on how to solve it

【问题讨论】:

    标签: angular rxjs observable angular2-http


    【解决方案1】:

    注意:以下答案只是一个 POC。它旨在教育 Http 的体系结构,并提供一个简单的工作 POC 实现。应该查看XHRConnection 的源代码,了解在实现此功能时还应考虑的其他事项。

    在尝试实现这一点时,我看不到任何直接利用 XHR 的方法。似乎我们可能只需要提供使用Http 所涉及的一些组件的自定义实现。我们应该考虑的三个主要组成部分是

    • Connection
    • ConnectionBackend
    • Http

    HttpConnectionBackend 作为其构造函数的参数。发出请求时,例如使用getHttp 将创建与ConnectionBackend.createConnection 的连接,并返回ConnectionObservable 属性(从createConnection 返回)。在最精简(简化)的视图中,它看起来像这样

    class XHRConnection implements Connection {
      response: Observable<Response>;
      constructor( request, browserXhr) {
        this.response = new Observable((observer: Observer<Response>) => {
          let xhr = browserXhr.create();
          let onLoad = (..) => {
            observer.next(new Response(...));
          };
          xhr.addEventListener('load', onLoad);
        })
      }
    }
    
    class XHRBackend implements ConnectionBackend {
      constructor(private browserXhr) {}
      createConnection(request): XHRConnection {
        return new XHRConnection(request, this.broswerXhr).response;
      }
    }
    
    class Http {
      constructor(private backend: ConnectionBackend) {}
    
      get(url, options): Observable<Response> {
        return this.backend.createConnection(createRequest(url, options)).response;
      }
    }
    

    所以知道了这个架构,我们可以尝试实现类似的东西。

    对于Connection,这里是 POC。为简洁起见省略了导入,但在大多数情况下,所有内容都可以从 @angular/http 导入,Observable/Observer 可以从 rxjs/{Type} 导入。

    export class Chunk {
      data: string;
    }
    
    export class ChunkedXHRConnection implements Connection {
      request: Request;
      response: Observable<Response>;
      readyState: ReadyState;
    
      chunks: Observable<Chunk>;
    
      constructor(req: Request, browserXHR: BrowserXhr, baseResponseOptions?: ResponseOptions) {
        this.request = req;
        this.chunks = new Observable<Chunk>((chunkObserver: Observer<Chunk>) => {
          let _xhr: XMLHttpRequest = browserXHR.build();
          let previousLen = 0;
          let onProgress = (progress: ProgressEvent) => {
            let text = _xhr.responseText;
            text = text.substring(previousLen);
            chunkObserver.next({ data: text });
            previousLen += text.length;
    
            console.log(`chunk data: ${text}`);
          };
          _xhr.addEventListener('progress', onProgress);
          _xhr.open(RequestMethod[req.method].toUpperCase(), req.url);
          _xhr.send(this.request.getBody());
          return () => {
            _xhr.removeEventListener('progress', onProgress);
            _xhr.abort();
          };
        });
      }
    }
    

    这里我们只是订阅 XHR progress 事件。由于XHR.responseText 喷出整个连接文本,我们只需substring 获取块,并通过Observer 发出每个卡盘。

    对于XHRBackend,我们有以下内容(没什么特别的)。同样,一切都可以从@angular/http 导入;

    @Injectable()
    export class ChunkedXHRBackend implements ConnectionBackend {
      constructor(
          private _browserXHR: BrowserXhr, private _baseResponseOptions: ResponseOptions,
          private _xsrfStrategy: XSRFStrategy) {}
    
      createConnection(request: Request): ChunkedXHRConnection {
        this._xsrfStrategy.configureRequest(request);
        return new ChunkedXHRConnection(request, this._browserXHR, this._baseResponseOptions);
      }
    }
    

    对于Http,我们将对其进行扩展,添加一个getChunks 方法。如果需要,您可以添加更多方法。

    @Injectable()
    export class ChunkedHttp extends Http {
      constructor(protected backend: ChunkedXHRBackend, protected defaultOptions: RequestOptions) {
        super(backend, defaultOptions);
      }
    
      getChunks(url, options?: RequestOptionsArgs): Observable<Chunk> {
        return this.backend.createConnection(
           new Request(mergeOptions(this.defaultOptions, options, RequestMethod.Get, url))).chunks;
      }
    }
    

    mergeOptions 方法可以在Http source 中找到。

    现在我们可以为它创建一个模块。用户应直接使用ChunkedHttp 而不是Http。但是因为不尝试覆盖Http 令牌,如果需要,您仍然可以使用Http

    @NgModule({
      imports: [ HttpModule ],
      providers: [
        {
          provide: ChunkedHttp,
          useFactory: (backend: ChunkedXHRBackend, options: RequestOptions) => {
            return new ChunkedHttp(backend, options);
          },
          deps: [ ChunkedXHRBackend, RequestOptions ]
        },
        ChunkedXHRBackend
      ]
    })
    export class ChunkedHttpModule {
    }
    

    我们导入 HttpModule 是因为它提供了我们需要注入的其他服务,但如果我们不需要,我们不想重新实现这些服务。

    要测试只需将ChunkedHttpModule 导入AppModule。另外为了测试我使用了以下组件

    @Component({
      selector: 'app',
      encapsulation: ViewEncapsulation.None,
      template: `
        <button (click)="onClick()">Click Me!</button>
        <h4 *ngFor="let chunk of chunks">{{ chunk }}</h4>
      `,
      styleUrls: ['./app.style.css']
    })
    export class App {
      chunks: string[] = [];
    
      constructor(private http: ChunkedHttp) {}
    
      onClick() {
        this.http.getChunks('http://localhost:8080/api/resource')
          .subscribe(chunk => this.chunks.push(chunk.data));
      }
    }
    

    我设置了一个后端端点,它每半秒吐出 10 个块 "Message #x"。这就是结果

    某处似乎存在错误。只有九个 :-)。我认为它与服务器端有关。

    【讨论】:

    • 谢谢你,很好的回答 :) 如果你有兴趣,你也可以看看我在进一步挖掘时添加到问题中的链接。
    • 您介意将其发布为可重用模块吗?它可能是大量数据角度应用程序的必备品。
    • 这个讨论发生在 2017 年初,是否有任何 Angular 内置或插件可用于在 2021 年实现相同的功能?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-09-30
    • 1970-01-01
    • 2016-01-23
    • 1970-01-01
    • 1970-01-01
    • 2017-09-04
    • 2017-08-24
    相关资源
    最近更新 更多