【问题标题】:Global Exception Handling in gRPC c#gRPC c#中的全局异常处理
【发布时间】:2018-08-29 13:25:07
【问题描述】:

我想要一个 Global Exception Handler 用于我的 gRPC 服务。通常我如下配置全局异常处理。但是,如果在我的服务方法中抛出任何异常,则不会以这种方式处理。有什么方法可以做到吗?

static void Main(string[] args)
        {
            AppDomain.CurrentDomain.UnhandledException += GlobalExceptionHandler;
            throw new Exception();
            // Shutdown.WaitOne();
        }

        static void GlobalExceptionHandler(object sender, UnhandledExceptionEventArgs e) {
            throw new RpcException(new Status(StatusCode.Internal, e.ExceptionObject.ToString()));
        }

【问题讨论】:

  • 全局“未处理异常”处理程序用于应用程序代码根本没有捕获的异常。为了保持执行环境的健全性,gRPC 框架确实捕获方法处理程序(由用户实现)抛出的异常,终止正在进行的调用(如果用户代码抛出 RpcException,异常中的 StatusCode 将传播到客户端)并记录处理程序已引发异常(通常不应发生)。我想说这是大多数用户所期望的行为,因此没有计划改变这一点。

标签: c# exception-handling grpc


【解决方案1】:

你需要使用拦截器来做到这一点:

在服务器上捕获异常以Json的形式发送给客户端:

public class GrpcServerInterceptor : Interceptor
{
    public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>(TRequest request, ServerCallContext context, UnaryServerMethod<TRequest, TResponse> continuation)
    {
        try
        {
            return await base.UnaryServerHandler(request, context, continuation);
        }
        catch (Exception exp)
        {
            throw this.TreatException(exp);
        }
    }
    
    public override async Task<TResponse> ClientStreamingServerHandler<TRequest, TResponse>(IAsyncStreamReader<TRequest> requestStream, ServerCallContext context, ClientStreamingServerMethod<TRequest, TResponse> continuation)
    {
        try
        {
            return await base.ClientStreamingServerHandler(requestStream, context, continuation);
        }
        catch (Exception exp)
        {
            throw this.TreatException(exp);
        }
    }
    
    public override async Task ServerStreamingServerHandler<TRequest, TResponse>(TRequest request, IServerStreamWriter<TResponse> responseStream, ServerCallContext context, ServerStreamingServerMethod<TRequest, TResponse> continuation)
    {
        try
        {
            await base.ServerStreamingServerHandler(request, responseStream, context, continuation);
        }
        catch (Exception exp)
        {
            throw this.TreatException(exp);
        }
    }
    
    public override async Task DuplexStreamingServerHandler<TRequest, TResponse>(IAsyncStreamReader<TRequest> requestStream, IServerStreamWriter<TResponse> responseStream, ServerCallContext context, DuplexStreamingServerMethod<TRequest, TResponse> continuation)
    {
        try
        {
            await base.DuplexStreamingServerHandler(requestStream, responseStream, context, continuation);
        }
        catch (Exception exp)
        {
            throw this.TreatException(exp);
        }
    }

    private RpcException TreatException(Exception exp)
    {
        // Convert exp to Json
        string exception = JsonConvert.SerializeObject(exp, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.Auto });

        // Convert Json to byte[]
        byte[] exceptionByteArray = Encoding.UTF8.GetBytes(exception);

        // Add Trailer with the exception as byte[]
        Metadata metadata = new Metadata { { "exception-bin", exceptionByteArray } 
    };

        // New RpcException with original exception
        return new RpcException(new Status(StatusCode.Internal, "Error"), metadata);
    }
}

使用服务器拦截器:

// Startup -> ConfigureServices
services.AddGrpc(
    config =>
    {
        config.Interceptors.Add<GrpcServerInterceptor>();
    });

现在在客户端你也需要定义一个拦截器:

private class GrpcClientInterceptor : Interceptor
{
    public override TResponse BlockingUnaryCall<TRequest, TResponse>(TRequest request, ClientInterceptorContext<TRequest, TResponse> context, BlockingUnaryCallContinuation<TRequest, TResponse> continuation)
    {
        try
        {
            return base.BlockingUnaryCall(request, context, continuation);
        }
        catch (RpcException exp)
        {
            TreatException(exp);
            throw;
        }
    }

    public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(TRequest request, ClientInterceptorContext<TRequest, TResponse> context, AsyncUnaryCallContinuation<TRequest, TResponse> continuation)
    {
        AsyncUnaryCall<TResponse> chamada = continuation(request, context);
        return new AsyncUnaryCall<TResponse>(
            this.TreatResponseUnique(chamada.ResponseAsync),
            chamada.ResponseHeadersAsync,
            chamada.GetStatus,
            chamada.GetTrailers,
            chamada.Dispose);
    }

    public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(ClientInterceptorContext<TRequest, TResponse> context, AsyncClientStreamingCallContinuation<TRequest, TResponse> continuation)
    {
        AsyncClientStreamingCall<TRequest, TResponse> chamada = continuation(context);
        return new AsyncClientStreamingCall<TRequest, TResponse>(
            chamada.RequestStream,
            this.TreatResponseUnique(chamada.ResponseAsync),
            chamada.ResponseHeadersAsync,
            chamada.GetStatus,
            chamada.GetTrailers,
            chamada.Dispose);
    }

    public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(TRequest request, ClientInterceptorContext<TRequest, TResponse> context, AsyncServerStreamingCallContinuation<TRequest, TResponse> continuation)
    {
        AsyncServerStreamingCall<TResponse> chamada = continuation(request, context);
        return new AsyncServerStreamingCall<TResponse>(
            new TreatResponseStream<TResponse>(chamada.ResponseStream),
            chamada.ResponseHeadersAsync,
            chamada.GetStatus,
            chamada.GetTrailers,
            chamada.Dispose);
    }

    public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(ClientInterceptorContext<TRequest, TResponse> context, AsyncDuplexStreamingCallContinuation<TRequest, TResponse> continuation)
    {
        AsyncDuplexStreamingCall<TRequest, TResponse> chamada = continuation(context);
        return new AsyncDuplexStreamingCall<TRequest, TResponse>(
            chamada.RequestStream,
            new TreatResponseStream<TResponse>(chamada.ResponseStream),
            chamada.ResponseHeadersAsync,
            chamada.GetStatus,
            chamada.GetTrailers,
            chamada.Dispose);
    }

    internal static void TreatException(RpcException exp)
    {
        // Check if there's a trailer that we defined in the server
        if (!exp.Trailers.Any(x => x.Key.Equals("exception-bin")))
        {
            return;
        }

        // Convert exception from byte[] to  string
        string exceptionString = Encoding.UTF8.GetString(exp.Trailers.GetValueBytes("exception-bin"));

        // Convert string to exception
        Exception exception = JsonConvert.DeserializeObject<Exception>(exceptionString, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.Auto });

        // Required to keep the original stacktrace (https://stackoverflow.com/questions/66707139/how-to-throw-a-deserialized-exception)
        exception.GetType().GetField("_remoteStackTraceString", BindingFlags.NonPublic | BindingFlags.Instance).SetValue(exception, exception.StackTrace);

        // Throw the original exception
        ExceptionDispatchInfo.Capture(exception).Throw();
    }

    private async Task<TResponse> TreatResponseUnique<TResponse>(Task<TResponse> resposta)
    {
        try
        {
            return await resposta;
        }
        catch (RpcException exp)
        {
            TreatException(exp);
            throw;
        }
    }
}

private class TreatResponseStream<TResponse> : IAsyncStreamReader<TResponse>
{
    private readonly IAsyncStreamReader<TResponse> stream;

    public TreatResponseStream(IAsyncStreamReader<TResponse> stream)
    {
        this.stream = stream;
    }

    public TResponse Current => this.stream.Current;

    public async Task<bool> MoveNext(CancellationToken cancellationToken)
    {
        try
        {
            return await this.stream.MoveNext(cancellationToken).ConfigureAwait(false);
        }
        catch (RpcException exp)
        {
            GrpcClientInterceptor.TreatException(exp);
            throw;
        }
    }
}

现在使用客户端拦截器:

this.MyGrpcChannel.Intercept(new GrpcClientInterceptor()).CreateGrpcService<IService>();

【讨论】:

    猜你喜欢
    • 2011-05-20
    • 2016-11-03
    • 2016-12-13
    • 2019-07-18
    • 2017-02-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-12-21
    相关资源
    最近更新 更多