grpc-image-processor-impl-3

为了在gRPC Java实现中实现零阻塞和少内存消耗,针对客户端流式发送图片分块(ImageChunk)的场景,我们需要确保gRPC线程不阻塞于图片处理操作,并通过高效的内存管理减少内存消耗。以下将分别针对两种返回响应形式(repeated Response和streaming Response)提供实现方案。

共同设计要点

  • 非阻塞处理:使用异步线程池处理图片,避免阻塞gRPC线程。
  • 内存效率:使用ByteArrayOutputStream收集分块数据,减少中间对象的内存开销。
  • 顺序处理:由于分块顺序发送,每张图片的分块索引从0开始递增,负数表示当前图片结束,isLast表示整个流结束。
  • 错误处理:确保错误处理不会多次调用onErroronCompleted

流式响应(streaming Response)实现

服务定义(proto):

1
2
3
service ImageService {
rpc ProcessImages(stream ImageChunk) returns (stream Response);
}

实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.stub.StreamObserver;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;

public class ImageServiceStreamingImpl extends ImageServiceGrpc.ImageServiceImplBase {

// 使用监听ExecutorService用于回调处理
private final ListeningExecutorService processingExecutor =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));

@Override
public StreamObserver<ImageChunk> processImages(StreamObserver<Response> responseObserver) {
return new StreamObserver<ImageChunk>() {
private ImageCollector currentImage = null;
private final List<ListenableFuture<Response>> futures = new ArrayList<>();
private final AtomicBoolean isErrorCalled = new AtomicBoolean(false);

@Override
public void onNext(ImageChunk chunk) {
if (currentImage == null) {
currentImage = new ImageCollector(chunk);
} else {
currentImage.addChunk(chunk);
}

// 检查当前图片是否结束(index为负数)
if (chunk.getIndex() < 0) {
byte[] imageData = currentImage.getData();
ImageChunk firstChunk = currentImage.getFirstChunk();
currentImage = null; // 重置收集器

// 提交处理任务
ListenableFuture<Response> future = processingExecutor.submit(
() -> processImage(imageData, firstChunk));
futures.add(future);

// 添加回调,处理完成后发送响应
Futures.addCallback(future, new FutureCallback<Response>() {
@Override
public void onSuccess(Response result) {
responseObserver.onNext(result);
}

@Override
public void onFailure(Throwable t) {
// 确保只调用一次onError
if (isErrorCalled.compareAndSet(false, true)) {
responseObserver.onError(t);
}
}
}, processingExecutor);
}

// 如果isLast为true,整个流结束,但实际处理在onCompleted中
}

@Override
public void onError(Throwable t) {
if (isErrorCalled.compareAndSet(false, true)) {
responseObserver.onError(t);
}
}

@Override
public void onCompleted() {
// 等待所有处理任务完成,然后调用onCompleted
for (ListenableFuture<Response> future : futures) {
try {
future.get(); // 等待任务完成,回调中已发送响应
} catch (InterruptedException | ExecutionException e) {
// 回调中已处理错误,这里仅记录或忽略
}
}
responseObserver.onCompleted();
}
};
}

private Response processImage(byte[] data, ImageChunk firstChunk) {
// 实现图片处理逻辑,返回Response
return Response.newBuilder().build();
}

private static class ImageCollector {
private final ImageChunk firstChunk;
private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

public ImageCollector(ImageChunk firstChunk) {
this.firstChunk = firstChunk;
addChunk(firstChunk);
}

public void addChunk(ImageChunk chunk) {
outputStream.write(chunk.getData().toByteArray(), 0, chunk.getData().size());
}

public byte[] getData() {
return outputStream.toByteArray();
}

public ImageChunk getFirstChunk() {
return firstChunk;
}
}
}

非流式响应(repeated Response)实现

服务定义(proto):

1
2
3
4
5
6
7
service ImageService {
rpc ProcessImages(stream ImageChunk) returns (ResponseList);
}

message ResponseList {
repeated Response responses = 1;
}

实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import io.grpc.stub.StreamObserver;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ImageServiceRepeatedImpl extends ImageServiceGrpc.ImageServiceImplBase {

private final ExecutorService processingExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

@Override
public StreamObserver<ImageChunk> processImages(StreamObserver<ResponseList> responseObserver) {
return new StreamObserver<ImageChunk>() {
private ImageCollector currentImage = null;
private final List<Future<Response>> futures = new ArrayList<>();

@Override
public void onNext(ImageChunk chunk) {
if (currentImage == null) {
currentImage = new ImageCollector(chunk);
} else {
currentImage.addChunk(chunk);
}

if (chunk.getIndex() < 0) {
byte[] imageData = currentImage.getData();
ImageChunk firstChunk = currentImage.getFirstChunk();
currentImage = null; // 重置收集器

// 提交处理任务
Future<Response> future = processingExecutor.submit(
() -> processImage(imageData, firstChunk));
futures.add(future);
}
}

@Override
public void onError(Throwable t) {
responseObserver.onError(t);
}

@Override
public void onCompleted() {
List<Response> responses = new ArrayList<>();
for (Future<Response> future : futures) {
try {
responses.add(future.get());
} catch (InterruptedException | ExecutionException e) {
responseObserver.onError(e);
return;
}
}
ResponseList responseList = ResponseList.newBuilder()
.addAllResponses(responses)
.build();
responseObserver.onNext(responseList);
responseObserver.onCompleted();
}
};
}

private Response processImage(byte[] data, ImageChunk firstChunk) {
// 实现图片处理逻辑,返回Response
return Response.newBuilder().build();
}

private static class ImageCollector {
private final ImageChunk firstChunk;
private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

public ImageCollector(ImageChunk firstChunk) {
this.firstChunk = firstChunk;
addChunk(firstChunk);
}

public void addChunk(ImageChunk chunk) {
outputStream.write(chunk.getData().toByteArray(), 0, chunk.getData().size());
}

public byte[] getData() {
return outputStream.toByteArray();
}

public ImageChunk getFirstChunk() {
return firstChunk;
}
}
}

优化说明

  • 零阻塞:通过线程池异步处理图片,gRPC线程仅用于接收分块和提交任务,不阻塞。
  • 少内存消耗:使用ByteArrayOutputStream直接累积数据,避免多次复制和中间对象存储。每张图片处理完成后立即释放收集器内存。
  • 错误处理:使用原子布尔值确保流式响应中只调用一次onError,避免非法状态异常。
  • 线程池配置:根据实际处理需求调整线程池大小(如使用Runtime.getRuntime().availableProcessors())。

使用建议

  • 根据实际场景选择流式或非流式响应。流式响应可实时返回结果,而非流式响应需等待所有处理完成。
  • 如果图片很大,考虑使用磁盘存储或更高效的内存管理(如直接字节缓冲区),但上述方案已优化内存使用。
  • 监控线程池队列长度,避免任务堆积导致内存溢出。

以上实现确保了gRPC线程零阻塞和较少的内存消耗,同时处理多张图片的流式请求。