【问题标题】:Wait for AWS Athena query execution in Go SDK在 Go SDK 中等待 AWS Athena 查询执行
【发布时间】:2022-11-27 10:02:35
【问题描述】:

我有一个运行 Athena 查询的工作代码,并使用以下代码通过轮询 errorGetQueryResults 返回来等待查询完成:

func GetQueryResults(client *athena.Client, QueryID *string) []types.Row {

    params := &athena.GetQueryResultsInput{
        QueryExecutionId: QueryID,
    }

    data, err := client.GetQueryResults(context.TODO(), params)

    for err != nil {
        println(err.Error())
        time.Sleep(time.Second)
        data, err = client.GetQueryResults(context.TODO(), params)
    }

    return data.ResultSet.Rows
}

问题是万一查询失败,我绝对没有办法打破循环。

例如,在 Python 中我可以这样做:

    while athena.get_query_execution(QueryExecutionId=execution_id)["QueryExecution"][
        "Status"
    ]["State"] in ["RUNNING", "QUEUED"]:
        sleep(2)

我可以在 for 循环中进行类似 strings.Contains(err.Error(),"FAILED") 的检查,但我正在寻找一种更简洁的方法。

我尝试寻找 Go 的等价物,但没有成功。 Go SDK 有返回执行状态的函数吗?有没有更好的方法来进一步检查 Go 中的错误而不是 err != nil

【问题讨论】:

    标签: amazon-web-services go aws-sdk amazon-athena aws-sdk-go


    【解决方案1】:

    SDK已经提供了重试。

    这是一个使用 aws-sdk-go-v2 的示例。

    package main
    
    import (
        "context"
        "fmt"
        "time"
    
        "github.com/aws/aws-sdk-go-v2/aws"
        "github.com/aws/aws-sdk-go-v2/service/athena"
        "github.com/aws/aws-sdk-go-v2/service/athena/types"
    )
    
    func main() {
        cfg := aws.NewConfig()
        ath := athena.NewFromConfig(*cfg)
    
        ctx, cancelFunc := context.WithTimeout(context.Background(), time.Minute*5)
        defer cancelFunc()
    
        rows, err := GetQueryResults(ctx, ath, aws.String("query-id"), 10)
        if err != nil {
            panic(err) // TODO: handle error
        }
    
        fmt.Println(rows)
    }
    
    func GetQueryResults(ctx context.Context, client *athena.Client, QueryID *string, attempts int) ([]types.Row, error) {
        t := time.NewTicker(time.Second * 5)
        defer t.Stop()
    
        attemptsFunc := func(o *athena.Options) { o.RetryMaxAttempts = attempts }
    
    WAIT:
        for {
            select {
            case <-t.C:
                out, err := client.GetQueryExecution(ctx, &athena.GetQueryExecutionInput{
                    QueryExecutionId: QueryID,
                }, attemptsFunc)
                if err != nil {
                    return nil, err
                }
    
                switch out.QueryExecution.Status.State {
                case types.QueryExecutionStateCancelled,
                    types.QueryExecutionStateFailed,
                    types.QueryExecutionStateSucceeded:
                    break WAIT
                }
    
            case <-ctx.Done():
                break WAIT
            }
        }
    
        data, err := client.GetQueryResults(ctx, &athena.GetQueryResultsInput{
            QueryExecutionId: QueryID,
        })
        if err != nil {
            return nil, err
        }
    
        return data.ResultSet.Rows, nil
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-09-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-09-16
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多