为了在gRPC Java实现中实现零阻塞和少内存消耗,针对客户端流式发送图片分块(ImageChunk)的场景,我们需要确保gRPC线程不阻塞于图片处理操作,并通过高效的内存管理减少内存消耗。以下将分别针对两种返回响应形式(repeated Response和streaming Response)提供实现方案。
共同设计要点
- 非阻塞处理:使用异步线程池处理图片,避免阻塞gRPC线程。
- 内存效率:使用
ByteArrayOutputStream收集分块数据,减少中间对象的内存开销。
- 顺序处理:由于分块顺序发送,每张图片的分块索引从0开始递增,负数表示当前图片结束,
isLast表示整个流结束。
- 错误处理:确保错误处理不会多次调用
onError或onCompleted。
流式响应(streaming Response)实现
服务定义(proto):
1 2 3
| service ImageService { rpc ProcessImages(stream ImageChunk) returns (stream Response); }
|
实现代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
| import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import io.grpc.stub.StreamObserver; import java.io.ByteArrayOutputStream; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean;
public class ImageServiceStreamingImpl extends ImageServiceGrpc.ImageServiceImplBase {
private final ListeningExecutorService processingExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
@Override public StreamObserver<ImageChunk> processImages(StreamObserver<Response> responseObserver) { return new StreamObserver<ImageChunk>() { private ImageCollector currentImage = null; private final List<ListenableFuture<Response>> futures = new ArrayList<>(); private final AtomicBoolean isErrorCalled = new AtomicBoolean(false);
@Override public void onNext(ImageChunk chunk) { if (currentImage == null) { currentImage = new ImageCollector(chunk); } else { currentImage.addChunk(chunk); }
if (chunk.getIndex() < 0) { byte[] imageData = currentImage.getData(); ImageChunk firstChunk = currentImage.getFirstChunk(); currentImage = null;
ListenableFuture<Response> future = processingExecutor.submit( () -> processImage(imageData, firstChunk)); futures.add(future);
Futures.addCallback(future, new FutureCallback<Response>() { @Override public void onSuccess(Response result) { responseObserver.onNext(result); }
@Override public void onFailure(Throwable t) { if (isErrorCalled.compareAndSet(false, true)) { responseObserver.onError(t); } } }, processingExecutor); }
}
@Override public void onError(Throwable t) { if (isErrorCalled.compareAndSet(false, true)) { responseObserver.onError(t); } }
@Override public void onCompleted() { for (ListenableFuture<Response> future : futures) { try { future.get(); } catch (InterruptedException | ExecutionException e) { } } responseObserver.onCompleted(); } }; }
private Response processImage(byte[] data, ImageChunk firstChunk) { return Response.newBuilder().build(); }
private static class ImageCollector { private final ImageChunk firstChunk; private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
public ImageCollector(ImageChunk firstChunk) { this.firstChunk = firstChunk; addChunk(firstChunk); }
public void addChunk(ImageChunk chunk) { outputStream.write(chunk.getData().toByteArray(), 0, chunk.getData().size()); }
public byte[] getData() { return outputStream.toByteArray(); }
public ImageChunk getFirstChunk() { return firstChunk; } } }
|
非流式响应(repeated Response)实现
服务定义(proto):
1 2 3 4 5 6 7
| service ImageService { rpc ProcessImages(stream ImageChunk) returns (ResponseList); }
message ResponseList { repeated Response responses = 1; }
|
实现代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
| import io.grpc.stub.StreamObserver; import java.io.ByteArrayOutputStream; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future;
public class ImageServiceRepeatedImpl extends ImageServiceGrpc.ImageServiceImplBase {
private final ExecutorService processingExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
@Override public StreamObserver<ImageChunk> processImages(StreamObserver<ResponseList> responseObserver) { return new StreamObserver<ImageChunk>() { private ImageCollector currentImage = null; private final List<Future<Response>> futures = new ArrayList<>();
@Override public void onNext(ImageChunk chunk) { if (currentImage == null) { currentImage = new ImageCollector(chunk); } else { currentImage.addChunk(chunk); }
if (chunk.getIndex() < 0) { byte[] imageData = currentImage.getData(); ImageChunk firstChunk = currentImage.getFirstChunk(); currentImage = null;
Future<Response> future = processingExecutor.submit( () -> processImage(imageData, firstChunk)); futures.add(future); } }
@Override public void onError(Throwable t) { responseObserver.onError(t); }
@Override public void onCompleted() { List<Response> responses = new ArrayList<>(); for (Future<Response> future : futures) { try { responses.add(future.get()); } catch (InterruptedException | ExecutionException e) { responseObserver.onError(e); return; } } ResponseList responseList = ResponseList.newBuilder() .addAllResponses(responses) .build(); responseObserver.onNext(responseList); responseObserver.onCompleted(); } }; }
private Response processImage(byte[] data, ImageChunk firstChunk) { return Response.newBuilder().build(); }
private static class ImageCollector { private final ImageChunk firstChunk; private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
public ImageCollector(ImageChunk firstChunk) { this.firstChunk = firstChunk; addChunk(firstChunk); }
public void addChunk(ImageChunk chunk) { outputStream.write(chunk.getData().toByteArray(), 0, chunk.getData().size()); }
public byte[] getData() { return outputStream.toByteArray(); }
public ImageChunk getFirstChunk() { return firstChunk; } } }
|
优化说明
- 零阻塞:通过线程池异步处理图片,gRPC线程仅用于接收分块和提交任务,不阻塞。
- 少内存消耗:使用
ByteArrayOutputStream直接累积数据,避免多次复制和中间对象存储。每张图片处理完成后立即释放收集器内存。
- 错误处理:使用原子布尔值确保流式响应中只调用一次
onError,避免非法状态异常。
- 线程池配置:根据实际处理需求调整线程池大小(如使用
Runtime.getRuntime().availableProcessors())。
使用建议
- 根据实际场景选择流式或非流式响应。流式响应可实时返回结果,而非流式响应需等待所有处理完成。
- 如果图片很大,考虑使用磁盘存储或更高效的内存管理(如直接字节缓冲区),但上述方案已优化内存使用。
- 监控线程池队列长度,避免任务堆积导致内存溢出。
以上实现确保了gRPC线程零阻塞和较少的内存消耗,同时处理多张图片的流式请求。