【问题标题】:Long Polling on HttpClient result and streaming into the CSV file对 HttpClient 结果进行长轮询并流式传输到 CSV 文件中
【发布时间】:2018-08-07 14:04:34
【问题描述】:

问题 1: 如何实现相同的行为?但不是Observable.interval 它将被回调调用。

例如: 我有5000ms 间隔,但我的服务器非常慢,并且在5000ms 之后没有返回结果。但是下一个调用是在5000ms 之后调用的。我不想要那样的。我想从服务器返回结果后,它会调用下一个调用。

问题 2: 如何在不一个接一个地创建多个文件的情况下立即将结果流式传输到 csv 文件。对于当前的实现,我使用FileSaver,它在IE11 中工作。我想继续使用它。 有没有办法将数据流式传输到文件而不是将其收集到数组中,因为我有很大的数据集。像 1m 行等等...... 示例:

const progress = Observable.interval(1000)
  .switchMap(() => this.messageService.getResults(query))
  .map(messageResult => messageResult)
  .subscribe((data: MessagesResult) => {
    inProcess = true;
    if (!data.isMoreResults && data.auditMessageList.length === 0) {
      this.fileSaver.save(`result.csv`, csvData);
      inProcess = false;
      this.logger.info('Download file finished...');
      progress.unsubscribe();
    }
    const start = this.filterModel.offset * this.filterModel.limit;
    const rows = [...csvData];
    rows.splice(start, 0, ...data.auditMessageList);
    csvData = rows;
    if (inProcess) {
      this.logger.info('Exporting in progress...');
    }
    query.offset++;
  }, error => this.logger.error(error));

}

【问题讨论】:

    标签: angular rxjs httpclient


    【解决方案1】:

    正如您发现的那样,使用 Observable.interval 不会“等待”流的其余部分。

    我一般使用repeatWhendelay

    const progress = Observable.defer(() => this.messageService.getResults(query))
      .repeatWhen(notifications => notifications.delay(1000)) 
      ...
    

    这是工作示例:https://jsfiddle.net/a0rz6nLv/19/

    我不太了解你们其他人的代码。

    不要在subscribe 方法中使用progress.unsubscribe();。请考虑使用 takeWhiletakeUntil - 两者都会为您完成 observable。

    .takeWhile(data => data.isMoreResults  data.auditMessageList.length > 0)
    

    也可以通过使用reducetoArray 来缓冲结果

    .reduce((accumulator, data) => data.auditMessageList.concat(accumulator), [])
    

    副作用最好由do 操作员处理

    .do({
      next: () => {
        inProgress = true;
        this.logger.info('Exporting in progress...');
      },
      complete: () => {
        inProgress = false;
        this.logger.info('Download file finished...');
      }
    })
    

    关于第二个问题-我不知道-您应该能够从服务器流式传输 csv。如果您无法修改服务器,也许其他人会知道如何在客户端进行修改...

    【讨论】:

    • @m1ch4这是在第一次请求后死亡。如何让它运行到终止状态?
    • @IamStalker 老实说,我不明白你的代码 - 我已经写了一些可能有帮助的建议。也许尝试从问题中删除以this. 开头的所有内容(使其更加独立)或创建jsfiddle
    【解决方案2】:

    问题 1

    这是一个实现一个函数的示例,该函数在收到响应时调用自身。

    后端:

    1. 模拟一个在 5 秒和 10 秒内响应的慢速后端
    2. 在每次响应时,服务器都会给出当前的request_numberstate
    3. 对于 3 个第一响应,stateactive,之后,stateclosed

    代码:

    /* Mocked backend. I'm slow, like really slow */
    class SlowBackend {
      MAX_ITERATIONS = 3; // suppose you're reading a table and you have pagination, with 3 pages
      currentIteration = 0;
    
      constructor() {}
    
      getStuff() {
        console.log(`**Request N. ${this.currentIteration}**\n[Back] : received a request from the front`);
        const responseDelay = Math.random() * 5000 + 5000; // response between 5s and 10s
        let state = "open";
        if(++this.currentIteration > this.MAX_ITERATIONS)
          state = "closed";
    
        return Observable
          .timer(responseDelay)
          .map( () => {
          console.log(`[Back] : Responding after ${responseDelay} ms`)
            return {
              request_number : this.currentIteration,
              state : state
            };
    
          })
      }
    }
    

    正面:

    这基本上是你的组件。

    class Frontend {
    
      isPollingActivated = true;
      responses = [];
    
    
      constructor(private backendService) {
        this.backendService = new SlowBackend(); // connection to backend
        this.requestOnRegularBasis();
      }
    
      requestOnRegularBasis() {
        if (!this.isPollingActivated)
          return;
    
        this.backendService.getStuff()
          .subscribe(response => {
            console.log(`[Front] : received response from server. State : ${response.state}`);
    
            // Choose one of the following blocks, comment the other according to what you need
    
            // Block 1 : Sync processing example
            console.log(`[Front] : doing some sync processing`);
            this.doSomeSyncProcessing(response);
            this.requestOnRegularBasis();
    
            // Block 2 : Async processing example
            // console.log(`[Front] : doing some async processing`);
            // this.doSomeAsyncProcessing(response)
            //    .subscribe(this.requestOnRegularBasis);
    
          })
      }
    
      private doSomeSyncProcessing(response){
        if(response.state == 'closed'){
          this.isPollingActivated = false; // stop polling
          this.saveDataToCsv();
        }
        else
          this.responses.push(Object.values(response).join(';')) // csv line separated by ';'
      }
    
      private saveDataToCsv(){
        const headers = ['current_request;state']
        this.responses = headers.concat(this.responses)
        console.log('saving to csv : ', this.responses.join('\n'));
    
        // Uncomment this to use FileSaver API
        /*
        const blob = new Blob(headers.concat(this.responses), {type: "text/csv;charset=utf-8"});
        saveAs(blob, "my_responses.csv");*
        */
      }
    
      private doSomeAsyncProcessing(response){
        return Observable.timer(1000).map(() => this.doSomeSyncProcessing(response));
      }
    
    }
    

    输出:

    **Request N. 0**
    [Back] : received a request from the front
    [Back] : Responding after 5482 ms
    [Front] : received response from server. State : open
    [Front] : doing some sync processing
    **Request N. 1**
    [Back] : received a request from the front
    [Back] : Responding after 7489 ms
    [Front] : received response from server. State : open
    [Front] : doing some sync processing
    **Request N. 2**
    [Back] : received a request from the front
    [Back] : Responding after 9627 ms
    [Front] : received response from server. State : open
    [Front] : doing some sync processing
    **Request N. 3**
    [Back] : received a request from the front
    [Back] : Responding after 5806 ms
    [Front] : received response from server. State : closed
    [Front] : doing some sync processing
    saving to csv :
    current_request;state
    1;open
    2;open
    3;open
    

    问题 2

    你不能。

    至少不使用FileSaver。因为它不支持逐块写入。当您实例化您的Blob 时,您必须 准备好所有数据。 有一些库支持块,但它们要么用于服务器端(例如 node.js),要么是特定于浏览器的。

    检查这个:Save client generated data as file in JavaScript in chunks

    注意:

    如果您尝试使用 js 在客户端机器中存储 1M 行 csv,那么架构可能有问题。 因为这不是浏览器的常见用例。客户应该有弱机器,因此接收处理, 轻巧,易于解析信息。就此而言,您可以例如在服务器端构建 csv,这将 拥有写入流文件的所有权利,以及不错的处理/内存容量。

    演示:问题 1

    http://jsbin.com/rojutudayu/2/edit?html,js,console

    演示:如何下载 blob?

        <script src="https://cdn.rawgit.com/eligrey/FileSaver.js/e9d941381475b5df8b7d7691013401e171014e89/FileSaver.min.js"> </script>
    
    <script> 
    var blob = new Blob(["Hello, world!"], {type: "text/plain;charset=utf-8"});
    saveAs(blob, "hello world.txt");
    </script>

    【讨论】:

    • 假设我正在调用 API,我不知道最大迭代次数并且我没有持续超时。假设服务器非常慢,并且在每次不同的超时时间后返回响应。
    • 在我的示例中,没有固定超时。它在 5 秒到 10 秒之间变化,但如果它从 1 秒到 1 分钟变化,那将是相同的。如果服务器在约 150 秒后未能响应,则连接将挂断,如果您使用 http 请求,则无法解决此问题。在这种情况下,您将不得不打开一个 websocket,或者使用类似 firebase 的技术。至于最大迭代次数,没问题,可以把3换成5000次迭代。问题是,一旦 blob(或 csv)的大小达到 600Mb 左右,您将不得不存储它,并重置您的响应数组。
    • 嗯,有趣的是,我在帖子中读到了您在此处提供的 js 块,它可以处理 1gb 的数据。那对我有好处。以及如何使用浏览器下载进度开始将其作为一个块下载。喜欢你下载简单的大文件吗?
    • FileSaver documentation 在 Chrome 中你可以达到 500Mb,尽管其他帖子说你可以达到 2Gb。我想您应该在某些浏览器中对此进行测试以获得答案。至于保存文件,我编辑了我的示例并包含了一个演示。基本上,一旦构建了 blob,就可以像从 URL 一样下载它。
    • 当您可以在每个服务器响应时创建一个 csv。但是您不能使用 FileServer 将数据附加到已经存在的 csv,它不处理这个问题。问题是浏览器不允许您轻松访问本地存储,这将是一个巨大的安全问题。所以你不能追加到现有文件,因为这涉及读+写。但是,您可以编写多个 csv 文件,最后将它们全部合并到一个文件中。这对您来说是一个合理的解决方案吗?
    【解决方案3】:

    问题 1:

    使用forkJoin。它将等待所有 Observables 完成。 与delay(5000)结合时,最短时间为5s。如果 5s 前没有返回 API 响应,仍然等待返回结果 (demo)

    const stream1$ = of(1).pipe(
      delay(5000)
    );
    
    const intervalTime = Math.random() * 5000 + 5000
    
    // replace with your API stream
    const stream2$ = of(intervalTime).pipe(
      delay(intervalTime)
    );
    
    forkJoin(stream1$, stream2$)
      .subscribe(([_, s2]) => {
        console.log(s2);
      })
    

    问题 2:

    如果文件很大,你应该让 Web Browser 来处理它。最好将文件保存在服务器中,然后返回下载链接。对于小文件,性能不是问题。您可以将文件数据存储在 RAM 中,然后保存文件一次。

    编辑:如果文件很大,FileSaver 开发人员建议使用StreamSaver。你应该看看它

    StreamSaver.js 采用不同的方法。现在,您实际上可以直接创建一个可写流到文件系统,而不是将数据保存在客户端存储或内存中(我不是在谈论 chromes 沙盒文件系统)

    StreamSaver.js 是在客户端保存流的解决方案。它非常适合需要保存在客户端创建的大量数据的网络应用程序,其中 RAM 非常有限,例如在移动设备上。

    【讨论】:

      猜你喜欢
      • 2017-05-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2010-11-07
      • 2012-10-11
      • 2019-05-15
      • 2011-07-06
      • 2016-06-20
      相关资源
      最近更新 更多