grpc-image-processor-impl-1

要实现gRPC在处理客户端流式图片分块时的零阻塞和少内存消耗,我们需要设计一个高效的异步处理架构。

下面是一个基于Java的实现方案:

这个实现的核心特点和优势:

  1. 零阻塞设计
    1. 使用独立的处理线程池,避免阻塞gRPC的I/O线程
    2. 图片分块的接收和处理完全异步分离
    3. 采用队列缓冲机制,实现生产者-消费者模式
  2. 内存优化
    1. 图片分块组装完成后立即释放缓冲区
    2. 使用Semaphore控制并发处理的图片数量
    3. 实现背压机制,防止客户端发送过快导致内存溢出
  3. 两种响应模式
    1. 流式响应:每处理完一张图片立即发送响应
    2. 批量响应:所有图片处理完成后统一发送
  4. 高效处理
    1. 图片处理相互独立,并行执行
    2. 线程池配置根据CPU核心数动态调整
    3. 使用轻量级同步机制减少开销 使用时,只需根据需要选择响应模式创建服务实例:
    4. new ImageProcessingService(10, ImageProcessingService.ResponseMode.STREAMING)
    5. new ImageProcessingService(10, ImageProcessingService.ResponseMode.BATCHED)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
public class ImageProcessingService extends ImageServiceGrpc.ImageServiceImplBase {

// 处理线程池 - 用于图片组装和处理
private final ExecutorService processingExecutor;

// 控制并发处理的图片数量,防止内存溢出
private final Semaphore processingSemaphore;

// 每个客户端队列的最大大小,用于背压控制
private static final int MAX_QUEUE_SIZE = 100;

// 图片处理响应的两种模式
public enum ResponseMode {
STREAMING, // 流式响应,处理完一张发送一张
BATCHED // 批量响应,所有处理完后一起发送
}

private final ResponseMode responseMode;

public ImageProcessingService(int maxConcurrentImages, ResponseMode mode) {
this.responseMode = mode;

// 配置处理线程池
this.processingExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2,
60L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new ThreadFactory() {
private int counter = 0;
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "image-processor-" + counter++);
thread.setDaemon(true);
return thread;
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // 实现背压
);

// 限制并发处理的图片数量
this.processingSemaphore = new Semaphore(maxConcurrentImages);
}

@Override
public StreamObserver<ImageChunk> processImages(StreamObserver<Response> responseObserver) {
if (responseMode == ResponseMode.STREAMING) {
return new StreamingImageChunkObserver(responseObserver);
} else {
return new BatchedImageChunkObserver(responseObserver);
}
}

// 流式响应的观察者实现
private class StreamingImageChunkObserver implements StreamObserver<ImageChunk> {
protected final StreamObserver<Response> responseObserver;
protected final Queue<ImageChunk> chunkQueue = new ArrayDeque<>();
protected final AtomicBoolean isProcessing = new AtomicBoolean(false);
protected ImageAssembler currentImageAssembler;

public StreamingImageChunkObserver(StreamObserver<Response> responseObserver) {
this.responseObserver = responseObserver;
this.currentImageAssembler = new ImageAssembler();
}

@Override
public void onNext(ImageChunk chunk) {
// 实现背压:当队列满时阻塞
synchronized (chunkQueue) {
while (chunkQueue.size() >= MAX_QUEUE_SIZE) {
try {
chunkQueue.wait(100); // 等待队列有空间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
chunkQueue.add(chunk);
chunkQueue.notify();
}

// 确保只有一个处理线程在运行
if (isProcessing.compareAndSet(false, true)) {
processingExecutor.execute(this::processQueue);
}
}

@Override
public void onError(Throwable t) {
responseObserver.onError(t);
cleanup();
}

@Override
public void onCompleted() {
// 添加结束标记
synchronized (chunkQueue) {
chunkQueue.add(new ImageChunk().setIsLast(true).setIndex(-1));
chunkQueue.notify();
}
}

protected void processQueue() {
try {
while (true) {
ImageChunk chunk;
synchronized (chunkQueue) {
while (chunkQueue.isEmpty()) {
chunkQueue.wait();
}
chunk = chunkQueue.poll();
chunkQueue.notify();
}

// 检查是否是流的最后一个包
if (chunk.getIsLast()) {
// 处理完当前正在组装的图片
if (currentImageAssembler.hasChunks()) {
processCurrentImage();
}
responseObserver.onCompleted();
cleanup();
return;
}

// 处理图片分块
if (chunk.getIndex() < 0) {
// 负数index表示当前图片的最后一包
currentImageAssembler.addChunk(chunk);
processCurrentImage();
// 创建新的图片组装器
currentImageAssembler = new ImageAssembler();
} else {
// 添加到当前图片组装器
currentImageAssembler.addChunk(chunk);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
responseObserver.onError(e);
} catch (Exception e) {
responseObserver.onError(e);
} finally {
isProcessing.set(false);
}
}

protected void processCurrentImage() throws InterruptedException {
// 等待获取处理许可
processingSemaphore.acquire();

// 获取当前图片的数据和第一个分块
byte[] imageData = currentImageAssembler.getImageData();
ImageChunk firstChunk = currentImageAssembler.getFirstChunk();

// 清除当前图片组装器的数据,立即释放内存
currentImageAssembler.clear();

// 异步处理图片,非阻塞
processingExecutor.execute(() -> {
try {
Response response = processImage(imageData, firstChunk);
// 发送响应
responseObserver.onNext(response);
} catch (Exception e) {
responseObserver.onError(e);
} finally {
processingSemaphore.release();
}
});
}

protected void cleanup() {
synchronized (chunkQueue) {
chunkQueue.clear();
}
isProcessing.set(false);
if (currentImageAssembler != null) {
currentImageAssembler.clear();
currentImageAssembler = null;
}
}
}

// 批量响应的观察者实现
private class BatchedImageChunkObserver extends StreamingImageChunkObserver {
private final Queue<Response> responseQueue = new ConcurrentLinkedQueue<>();

public BatchedImageChunkObserver(StreamObserver<Response> responseObserver) {
super(responseObserver);
}

@Override
protected void processCurrentImage() throws InterruptedException {
processingSemaphore.acquire();

byte[] imageData = currentImageAssembler.getImageData();
ImageChunk firstChunk = currentImageAssembler.getFirstChunk();
currentImageAssembler.clear();

processingExecutor.execute(() -> {
try {
Response response = processImage(imageData, firstChunk);
responseQueue.add(response);
} catch (Exception e) {
responseObserver.onError(e);
} finally {
processingSemaphore.release();
}
});
}

@Override
protected void processQueue() {
try {
super.processQueue();
// 所有图片处理完成后,发送批量响应
Response batchResponse = Response.newBuilder()
.addAllResponses(responseQueue)
.build();
responseObserver.onNext(batchResponse);
responseObserver.onCompleted();
} catch (Exception e) {
responseObserver.onError(e);
}
}
}

// 图片组装器,负责收集和拼接图片分块
private static class ImageAssembler {
private final ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
private ImageChunk firstChunk;

public void addChunk(ImageChunk chunk) {
if (firstChunk == null) {
firstChunk = chunk; // 保存第一个分块
}
byteStream.write(chunk.getData().toByteArray());
}

public byte[] getImageData() {
return byteStream.toByteArray();
}

public ImageChunk getFirstChunk() {
return firstChunk;
}

public boolean hasChunks() {
return firstChunk != null;
}

// 清除数据,释放内存
public void clear() {
byteStream.reset();
firstChunk = null;
}
}

// 图片处理方法,由业务逻辑实现
private Response processImage(byte[] data, ImageChunk firstChunk) {
// 实际的图片处理逻辑
// ...
return Response.newBuilder().build();
}
}