Protobuf 定义
首先,我们先定义好 .proto 文件,这是我们讨论的基础。
Protocol Buffers
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| syntax = "proto3";
package com.example.grpc;
option java_multiple_files = true; option java_package = "com.example.grpc"; option java_outer_classname = "ImageProto";
service ImageProcessor { rpc ProcessImages(stream ImageChunk) returns (Responses);
rpc ProcessImagesStream(stream ImageChunk) returns (stream Response); }
message ImageChunk { bytes data = 1; int32 index = 2; bool is_last = 3; }
message Response { string status = 1; string message = 2; }
message Responses { repeated Response results = 1; }
|
- 绝不阻塞gRPC线程:gRPC的
onNext方法由Netty的I/O线程(Event Loop)驱动。任何同步的、耗时的操作(包括等待异步任务完成)都会阻塞这个线程,导致服务器无法接收新的请求,吞吐量急剧下降。
- 异步化业务逻辑:你的
processImage 必须是异步的。在Java中,最佳实践是让它返回一个 CompletableFuture<Response>。我们会为这个业务逻辑创建一个独立的线程池。
- 精细化内存管理:只在内存中保留当前正在处理的一张图片的数据。一旦一张图片的数据收集完毕并提交给处理线程池,就立刻释放其内存,为下一张图片做准备。
方案一:非流式返回 (repeated Response)
这种方案下,服务器需要收集所有图片的处理结果,然后在客户端数据流结束后,将所有结果一次性返回。
策略:
在onNext中接收数据块,拼装成完整的图片后,将其提交给一个专门的业务线程池进行异步处理。处理会返回一个 CompletableFuture。我们将这些Future收集起来。在onCompleted方法中,我们使用 CompletableFuture.allOf() 来注册一个回调,当所有图片都处理完毕后,再统一打包发送响应。
实现代码 (ImageProcessorImpl.java):
Java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
| import com.example.grpc.*; import com.google.protobuf.ByteString; import io.grpc.stub.StreamObserver;
import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Collectors;
public class ImageProcessorImpl extends ImageProcessorGrpc.ImageProcessorImplBase {
private final ExecutorService imageProcessingExecutor = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() );
@Override public StreamObserver<ImageChunk> processImages(StreamObserver<Responses> responseObserver) { return new ImageProcessingObserver(responseObserver, imageProcessingExecutor); }
private static class ImageProcessingObserver implements StreamObserver<ImageChunk> {
private final StreamObserver<Responses> responseObserver; private final ExecutorService executor;
private ByteArrayOutputStream currentImageBuffer; private ImageChunk firstChunkOfCurrentImage; private final List<CompletableFuture<Response>> processingFutures = new ArrayList<>();
public ImageProcessingObserver(StreamObserver<Responses> responseObserver, ExecutorService executor) { this.responseObserver = responseObserver; this.executor = executor; }
@Override public void onNext(ImageChunk chunk) { try { if (chunk.getIndex() == 0) { currentImageBuffer = new ByteArrayOutputStream(); firstChunkOfCurrentImage = chunk; }
if (currentImageBuffer != null) { currentImageBuffer.write(chunk.getData().toByteArray()); }
if (chunk.getIndex() < 0) { byte[] imageData = currentImageBuffer.toByteArray(); ImageChunk firstChunk = firstChunkOfCurrentImage;
CompletableFuture<Response> future = processImageAsync(imageData, firstChunk, executor); processingFutures.add(future);
currentImageBuffer = null; firstChunkOfCurrentImage = null; } } catch (IOException e) { responseObserver.onError(io.grpc.Status.INTERNAL.withDescription("Failed to write image chunk").asRuntimeException()); } }
@Override public void onError(Throwable t) { System.err.println("Client-side error: " + t.getMessage()); processingFutures.clear(); }
@Override public void onCompleted() { System.out.println("Client stream completed. Waiting for all images to be processed.");
CompletableFuture.allOf(processingFutures.toArray(new CompletableFuture[0])) .whenComplete((v, throwable) -> { if (throwable != null) { responseObserver.onError(io.grpc.Status.INTERNAL .withDescription("Error processing one or more images: " + throwable.getCause().getMessage()) .asRuntimeException()); } else { List<Response> results = processingFutures.stream() .map(CompletableFuture::join) .collect(Collectors.toList());
Responses finalResponse = Responses.newBuilder().addAllResults(results).build(); responseObserver.onNext(finalResponse); responseObserver.onCompleted(); } }); } }
private static CompletableFuture<Response> processImageAsync(byte[] data, ImageChunk firstChunk, ExecutorService executor) { return CompletableFuture.supplyAsync(() -> { System.out.printf("Processing image of size %d bytes on thread %s%n", data.length, Thread.currentThread().getName()); try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
return Response.newBuilder() .setStatus("OK") .setMessage("Processed image of size " + data.length) .build(); }, executor); } }
|
优点:
- 客户端可以一次性拿到所有结果,逻辑简单。
- 符合“请求-响应”的聚合模式。
缺点:
- 客户端必须等待最慢的那个图片处理完成才能收到任何响应。
- 服务器在
onCompleted之前需要持有所有CompletableFuture对象。
方案二:流式返回 (stream Response)
这种方案下,服务器每处理完一张图片,就立刻将结果通过流返回给客户端。
策略:
与方案一类似,onNext的逻辑基本不变。不同之处在于,当异步处理任务 CompletableFuture 创建后,我们不把它存入列表,而是直接为其注册一个 thenAccept 回调。在这个回调中,一旦图片处理完成,就立即调用 responseObserver.onNext() 将结果发送出去。
实现代码 (ImageProcessorImpl.java):
只需修改 ImageProcessorImpl 中的一个方法和对应的 StreamObserver 实现。
Java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
|
@Override public StreamObserver<ImageChunk> processImagesStream(StreamObserver<Response> responseObserver) { return new ImageProcessingStreamingObserver(responseObserver, imageProcessingExecutor); } private static class ImageProcessingStreamingObserver implements StreamObserver<ImageChunk> {
private final StreamObserver<Response> responseObserver; private final ExecutorService executor;
private ByteArrayOutputStream currentImageBuffer; private ImageChunk firstChunkOfCurrentImage;
public ImageProcessingStreamingObserver(StreamObserver<Response> responseObserver, ExecutorService executor) { this.responseObserver = responseObserver; this.executor = executor; }
@Override public void onNext(ImageChunk chunk) { try { if (chunk.getIndex() == 0) { currentImageBuffer = new ByteArrayOutputStream(); firstChunkOfCurrentImage = chunk; }
if (currentImageBuffer != null) { currentImageBuffer.write(chunk.getData().toByteArray()); }
if (chunk.getIndex() < 0) { byte[] imageData = currentImageBuffer.toByteArray(); ImageChunk firstChunk = firstChunkOfCurrentImage;
processImageAsync(imageData, firstChunk, executor) .thenAccept(response -> { responseObserver.onNext(response); }) .exceptionally(ex -> { System.err.println("Error processing image: " + ex.getMessage()); Response errorResponse = Response.newBuilder() .setStatus("ERROR") .setMessage(ex.getMessage()) .build(); responseObserver.onNext(errorResponse); return null; });
currentImageBuffer = null; firstChunkOfCurrentImage = null; } } catch (IOException e) { responseObserver.onError(io.grpc.Status.INTERNAL.withDescription("Failed to write image chunk").asRuntimeException()); } }
@Override public void onError(Throwable t) { System.err.println("Client-side error: " + t.getMessage()); }
@Override public void onCompleted() { responseObserver.onCompleted(); System.out.println("Client stream completed. Server is closing the response stream."); } }
|
优点:
- 低延迟:客户端可以尽快收到已处理完的图片结果,无需等待其他图片。
- 内存占用更平滑:服务器端不需要维护一个不断增长的
Future 列表。
缺点:
- 客户端需要处理一个响应流,逻辑比接收单个响应略复杂。
总结与选择
| 特性 |
方案一 (非流式返回) |
方案二 (流式返回) |
| 实现核心 |
CompletableFuture.allOf() |
future.thenAccept() |
| 客户端延迟 |
高(取决于最慢的任务) |
低(结果立即可用) |
| 服务器内存 |
需要维护Future列表 |
更低,无需列表 |
| 适用场景 |
客户端需要所有结果才能进行下一步操作的“批处理”场景。 |
实时性要求高,希望尽快展示或处理部分结果的场景,如实时相册上传。 |
对于你的问题,两种方案都实现了零阻塞和低内存消耗的核心目标。
- 零阻塞:通过将业务逻辑放入独立的
ExecutorService并使用CompletableFuture实现。
- 低内存消耗:通过在处理完一张图片后立即清空
ByteArrayOutputStream (currentImageBuffer = null),确保内存中最多只有一张完整图片的数据。
推荐使用方案二(流式返回),因为它通常提供了更好的用户体验和更强的系统响应能力,是gRPC流式处理的典型优势体现。