grpc-image-processor-impl-2

Protobuf 定义

首先,我们先定义好 .proto 文件,这是我们讨论的基础。

Protocol Buffers

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
syntax = "proto3";

package com.example.grpc;

option java_multiple_files = true;
option java_package = "com.example.grpc";
option java_outer_classname = "ImageProto";

// 图片处理服务
service ImageProcessor {
// 方案一:客户端流式上传,服务器一次性返回所有结果
rpc ProcessImages(stream ImageChunk) returns (Responses);

// 方案二:客户端流式上传,服务器流式返回每个结果
rpc ProcessImagesStream(stream ImageChunk) returns (stream Response);
}

// 图片分块
message ImageChunk {
bytes data = 1; // 图片的二进制数据块
int32 index = 2; // 块的索引。0, 1, 2... 负数代表当前图片的最后一个块
bool is_last = 3; // 是否是整个RPC调用的最后一个块
}

// 单个处理结果
message Response {
string status = 1;
string message = 2; // 例如:图片处理成功/失败信息,或者处理结果的ID等
}

// 多个处理结果的集合(用于方案一)
message Responses {
repeated Response results = 1;
}
  1. 绝不阻塞gRPC线程:gRPC的onNext方法由Netty的I/O线程(Event Loop)驱动。任何同步的、耗时的操作(包括等待异步任务完成)都会阻塞这个线程,导致服务器无法接收新的请求,吞吐量急剧下降。
  2. 异步化业务逻辑:你的 processImage 必须是异步的。在Java中,最佳实践是让它返回一个 CompletableFuture<Response>。我们会为这个业务逻辑创建一个独立的线程池。
  3. 精细化内存管理:只在内存中保留当前正在处理的一张图片的数据。一旦一张图片的数据收集完毕并提交给处理线程池,就立刻释放其内存,为下一张图片做准备。

方案一:非流式返回 (repeated Response)

这种方案下,服务器需要收集所有图片的处理结果,然后在客户端数据流结束后,将所有结果一次性返回。

策略:

在onNext中接收数据块,拼装成完整的图片后,将其提交给一个专门的业务线程池进行异步处理。处理会返回一个 CompletableFuture。我们将这些Future收集起来。在onCompleted方法中,我们使用 CompletableFuture.allOf() 来注册一个回调,当所有图片都处理完毕后,再统一打包发送响应。

实现代码 (ImageProcessorImpl.java):

Java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import com.example.grpc.*;
import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

public class ImageProcessorImpl extends ImageProcessorGrpc.ImageProcessorImplBase {

// 关键:为耗时的图片处理逻辑创建一个独立的线程池,避免阻塞gRPC的I/O线程
private final ExecutorService imageProcessingExecutor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);

@Override
public StreamObserver<ImageChunk> processImages(StreamObserver<Responses> responseObserver) {
return new ImageProcessingObserver(responseObserver, imageProcessingExecutor);
}

// 这是一个内部类,每个RPC调用都会创建一个新的实例。
// 这使得它可以安全地持有单个RPC调用的状态,天然支持并发。
private static class ImageProcessingObserver implements StreamObserver<ImageChunk> {

private final StreamObserver<Responses> responseObserver;
private final ExecutorService executor;

// 状态变量:用于缓存当前正在接收的图片
private ByteArrayOutputStream currentImageBuffer;
private ImageChunk firstChunkOfCurrentImage;

// 关键:用于收集所有图片处理任务的Future
private final List<CompletableFuture<Response>> processingFutures = new ArrayList<>();

public ImageProcessingObserver(StreamObserver<Responses> responseObserver, ExecutorService executor) {
this.responseObserver = responseObserver;
this.executor = executor;
}

@Override
public void onNext(ImageChunk chunk) {
try {
// index为0表示一张新图片的开始
if (chunk.getIndex() == 0) {
currentImageBuffer = new ByteArrayOutputStream();
firstChunkOfCurrentImage = chunk;
}

// 将数据块写入当前图片的缓冲区
if (currentImageBuffer != null) {
currentImageBuffer.write(chunk.getData().toByteArray());
}

// 负数index表示当前图片的结束
if (chunk.getIndex() < 0) {
byte[] imageData = currentImageBuffer.toByteArray();
ImageChunk firstChunk = firstChunkOfCurrentImage;

// 关键:异步提交处理任务
CompletableFuture<Response> future = processImageAsync(imageData, firstChunk, executor);
processingFutures.add(future);

// 关键:重置状态,释放内存,为下一张图片做准备
currentImageBuffer = null;
firstChunkOfCurrentImage = null;
}
} catch (IOException e) {
// 处理IO异常
responseObserver.onError(io.grpc.Status.INTERNAL.withDescription("Failed to write image chunk").asRuntimeException());
// 可以在这里请求取消流
}
}

@Override
public void onError(Throwable t) {
// 客户端发生错误,记录日志
System.err.println("Client-side error: " + t.getMessage());
// 清理资源
processingFutures.clear();
}

@Override
public void onCompleted() {
// 客户端所有数据发送完毕
System.out.println("Client stream completed. Waiting for all images to be processed.");

// 关键:使用 allOf 组合所有Future,注册一个回调。
// 这个操作本身是非阻塞的。
CompletableFuture.allOf(processingFutures.toArray(new CompletableFuture[0]))
.whenComplete((v, throwable) -> {
if (throwable != null) {
// 如果任何一个处理任务失败,则向客户端报告错误
responseObserver.onError(io.grpc.Status.INTERNAL
.withDescription("Error processing one or more images: " + throwable.getCause().getMessage())
.asRuntimeException());
} else {
// 所有任务成功完成
List<Response> results = processingFutures.stream()
.map(CompletableFuture::join) // join()在这里是安全的,因为allOf保证了它们都已完成
.collect(Collectors.toList());

Responses finalResponse = Responses.newBuilder().addAllResults(results).build();

responseObserver.onNext(finalResponse);
responseObserver.onCompleted();
}
});
}
}

/**
* 模拟一个非阻塞的图片处理方法
* @param data 图片完整二进制数据
* @param firstChunk 图片的第一个分块信息
* @param executor 执行任务的线程池
* @return 一个包含Response的CompletableFuture
*/
private static CompletableFuture<Response> processImageAsync(byte[] data, ImageChunk firstChunk, ExecutorService executor) {
return CompletableFuture.supplyAsync(() -> {
// 在这里执行你的图片处理逻辑,例如:
System.out.printf("Processing image of size %d bytes on thread %s%n",
data.length, Thread.currentThread().getName());

// 模拟耗时操作
try {
Thread.sleep(500); // 模拟
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

// 返回处理结果
return Response.newBuilder()
.setStatus("OK")
.setMessage("Processed image of size " + data.length)
.build();
}, executor);
}
}

优点:

  • 客户端可以一次性拿到所有结果,逻辑简单。
  • 符合“请求-响应”的聚合模式。

缺点:

  • 客户端必须等待最慢的那个图片处理完成才能收到任何响应。
  • 服务器在onCompleted之前需要持有所有CompletableFuture对象。

方案二:流式返回 (stream Response)

这种方案下,服务器每处理完一张图片,就立刻将结果通过流返回给客户端。

策略:

与方案一类似,onNext的逻辑基本不变。不同之处在于,当异步处理任务 CompletableFuture 创建后,我们不把它存入列表,而是直接为其注册一个 thenAccept 回调。在这个回调中,一旦图片处理完成,就立即调用 responseObserver.onNext() 将结果发送出去。

实现代码 (ImageProcessorImpl.java):

只需修改 ImageProcessorImpl 中的一个方法和对应的 StreamObserver 实现。

Java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// ... 省略 imports 和 ImageProcessorImpl 类的定义 ...

@Override
public StreamObserver<ImageChunk> processImagesStream(StreamObserver<Response> responseObserver) {
// 注意,这里的 responseObserver 泛型是 Response,而不是 Responses
return new ImageProcessingStreamingObserver(responseObserver, imageProcessingExecutor);
}

// 方案二的StreamObserver实现
private static class ImageProcessingStreamingObserver implements StreamObserver<ImageChunk> {

private final StreamObserver<Response> responseObserver;
private final ExecutorService executor;

// 状态变量与方案一相同
private ByteArrayOutputStream currentImageBuffer;
private ImageChunk firstChunkOfCurrentImage;

// 注意:这里不再需要 a list of futures

public ImageProcessingStreamingObserver(StreamObserver<Response> responseObserver, ExecutorService executor) {
this.responseObserver = responseObserver;
this.executor = executor;
}

@Override
public void onNext(ImageChunk chunk) {
try {
if (chunk.getIndex() == 0) {
currentImageBuffer = new ByteArrayOutputStream();
firstChunkOfCurrentImage = chunk;
}

if (currentImageBuffer != null) {
currentImageBuffer.write(chunk.getData().toByteArray());
}

if (chunk.getIndex() < 0) {
byte[] imageData = currentImageBuffer.toByteArray();
ImageChunk firstChunk = firstChunkOfCurrentImage;

// 关键:异步提交处理任务,并直接注册回调来发送结果
processImageAsync(imageData, firstChunk, executor)
.thenAccept(response -> {
// 这个回调会在 imageProcessingExecutor 线程池中执行
// responseObserver 的 onNext 是线程安全的
responseObserver.onNext(response);
})
.exceptionally(ex -> {
// 处理单个任务的异常,可以决定是忽略还是关闭流
System.err.println("Error processing image: " + ex.getMessage());
// 也可以在这里发送一个错误的Response
Response errorResponse = Response.newBuilder()
.setStatus("ERROR")
.setMessage(ex.getMessage())
.build();
responseObserver.onNext(errorResponse);
return null;
});

// 同样重置状态,释放内存
currentImageBuffer = null;
firstChunkOfCurrentImage = null;
}
} catch (IOException e) {
responseObserver.onError(io.grpc.Status.INTERNAL.withDescription("Failed to write image chunk").asRuntimeException());
}
}

@Override
public void onError(Throwable t) {
System.err.println("Client-side error: " + t.getMessage());
// 客户端出错了,我们只需要确保我们的响应流也关闭了
// onError 会自动传播
}

@Override
public void onCompleted() {
// 客户端数据流结束
// 因为每个结果都已经异步发送了,这里我们只需要告诉客户端,服务器这边也结束了。
// 但要注意,可能还有一些任务正在处理中。
// 一个健壮的实现可能会等待所有已提交的任务完成后再调用 onCompleted。
// 但对于流式响应,通常直接关闭即可,客户端会继续接收完已在管道中的消息。
responseObserver.onCompleted();
System.out.println("Client stream completed. Server is closing the response stream.");
}
}

// processImageAsync 方法与方案一完全相同,此处省略

优点:

  • 低延迟:客户端可以尽快收到已处理完的图片结果,无需等待其他图片。
  • 内存占用更平滑:服务器端不需要维护一个不断增长的 Future 列表。

缺点:

  • 客户端需要处理一个响应流,逻辑比接收单个响应略复杂。

总结与选择

特性 方案一 (非流式返回) 方案二 (流式返回)
实现核心 CompletableFuture.allOf() future.thenAccept()
客户端延迟 高(取决于最慢的任务) 低(结果立即可用)
服务器内存 需要维护Future列表 更低,无需列表
适用场景 客户端需要所有结果才能进行下一步操作的“批处理”场景。 实时性要求高,希望尽快展示或处理部分结果的场景,如实时相册上传。

对于你的问题,两种方案都实现了零阻塞低内存消耗的核心目标。

  • 零阻塞:通过将业务逻辑放入独立的ExecutorService并使用CompletableFuture实现。
  • 低内存消耗:通过在处理完一张图片后立即清空 ByteArrayOutputStream (currentImageBuffer = null),确保内存中最多只有一张完整图片的数据。

推荐使用方案二(流式返回),因为它通常提供了更好的用户体验和更强的系统响应能力,是gRPC流式处理的典型优势体现。