我正在尝试创建一个BodyPublisher
将反序列化我的JSON对象的自定义。我可以在创建请求并使用的ofByteArray
方法时反序列化JSON ,BodyPublishers
但我更愿意使用自定义发布者。
public class CustomPublisher implements HttpRequest.BodyPublisher {
private byte[] bytes;
public CustomPublisher(ObjectNode jsonData) {
...
// Deserialize jsonData to bytes
...
}
@Override
public long contentLength() {
if(bytes == null) return 0;
return bytes.length
}
@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
CustomSubscription subscription = new CustomSubscription(subscriber, bytes);
subscriber.onSubscribe(subscription);
}
private CustomSubscription implements Flow.Subscription {
private final Flow.Subscriber<? super ByteBuffer> subscriber;
private boolean cancelled;
private Iterator<Byte> byterator;
private CustomSubscription(Flow.Subscriber<? super ByteBuffer> subscriber, byte[] bytes) {
this.subscriber = subscriber;
this.cancelled = false;
List<Byte> bytelist = new ArrayList<>();
for(byte b : bytes) {
bytelist.add(b);
}
this.byterator = bytelist.iterator();
}
@Override
public void request(long n) {
if(cancelled) return;
if(n < 0) {
subscriber.onError(new IllegalArgumentException());
} else if(byterator.hasNext()) {
subscriber.onNext(ByteBuffer.wrap(new byte[]{byterator.next()));
} else {
subscriber.onComplete();
}
}
@Override
public void cancel() {
this.cancelled = true;
}
}
}
此实现有效,但仅当request
使用1作为参数调用subscriptions 方法时。但这就是我将其与HttpRequest一起使用时发生的情况。
我很确定这不是创建自定义订阅的任何首选方法或最佳方法,但是我还没有找到更好的方法来使它生效。
如果有人可以带领我走上更好的道路,我将不胜感激。
您应该避免使用字节数组,因为这样做会给大型对象造成内存问题。
我不会尝试编写自定义发布者。相反,只需利用工厂方法HttpRequest.BodyPublishers.ofInputStream即可。
HttpRequest.BodyPublisher publisher =
HttpRequest.BodyPublishers.ofInputStream(() -> {
PipedInputStream in = new PipedInputStream();
ForkJoinPool.commonPool().submit(() -> {
try (PipedOutputStream out = new PipedOutputStream(in)) {
objectMapper.writeTree(
objectMapper.getFactory().createGenerator(out),
jsonData);
}
return null;
});
return in;
});
如前所述,您可以使用HttpRequest.BodyPublishers.ofByteArray
。对于相对较小的对象而言,这很好,但是出于习惯,我编写了可伸缩性程序。假设不需要扩展代码的问题是,其他开发人员会假定传递大型对象是安全的,而不会意识到对性能的影响。
编写自己的正文发布者将需要大量工作。其subscribe
方法继承自Flow.Publisher。
该subscribe
方法的文档以此开头:
如果可能,添加给定的订户。
每次subscribe
调用您的方法时,都需要将Subscriber添加到某种形式的集合中,需要创建Flow.Subscription的实现,并且需要立即将其传递给订阅者的onSubscribe
方法。您的Subscription实现对象仅需要在request
调用Subscription的方法时,通过调用相应的Subscriber(而不是任何Subscriber)的onNext方法来发送一个或多个ByteBuffer ,并且一旦发送完所有数据,就必须调用相同的订阅者onComplete()
方法。最重要的是,Subscription实现对象需要处理cancel
请求。
通过扩展SubmissionPublisher(这是Flow.Publisher的默认实现),然后向其中添加一个contentLength()
方法,可以使很多操作变得更加容易。但是,如SubmissionPublisher文档所示,对于最小的工作实现,您仍有大量工作要做。
HttpRequest.BodyPublishers.of…方法将为您完成所有这些工作。ofByteArray
对于小物体来说还可以,但是ofInputStream
对任何您可以传入的物体都可以使用。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句