如何为Java 11 HttpRequest创建自定义BodyPublisher

Challe:

我正在尝试创建一个BodyPublisher将反序列化我的JSON对象的自定义我可以在创建请求并使用的ofByteArray方法时反序列化JSON BodyPublishers但我更愿意使用自定义发布者。

public class CustomPublisher implements HttpRequest.BodyPublisher {
    private byte[] bytes;
    
    public CustomPublisher(ObjectNode jsonData) {
        ...
        // Deserialize jsonData to bytes
        ...
    }
    
    @Override
    public long contentLength() {
        if(bytes == null) return 0;
        return bytes.length
    }
    
    @Override
    public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
        CustomSubscription subscription = new CustomSubscription(subscriber, bytes);
        subscriber.onSubscribe(subscription);       
    }

    private CustomSubscription implements Flow.Subscription {
         private final Flow.Subscriber<? super ByteBuffer> subscriber;
         private boolean cancelled;
         private Iterator<Byte> byterator;

         private CustomSubscription(Flow.Subscriber<? super ByteBuffer> subscriber, byte[] bytes) {
             this.subscriber = subscriber;
             this.cancelled = false;
             List<Byte> bytelist = new ArrayList<>();
             for(byte b : bytes) {
                 bytelist.add(b);
             }
             this.byterator = bytelist.iterator();
         }

         @Override
         public void request(long n) {
             if(cancelled) return;
             if(n < 0) {
                 subscriber.onError(new IllegalArgumentException());
             } else if(byterator.hasNext()) {
                 subscriber.onNext(ByteBuffer.wrap(new byte[]{byterator.next()));
             } else {
                 subscriber.onComplete();
             }
         }

         @Override
         public void cancel() {
             this.cancelled = true;
         }
    }
}

此实现有效,但仅当request使用1作为参数调用subscriptions 方法时。但这就是我将其与HttpRequest一起使用时发生的情况。

我很确定这不是创建自定义订阅的任何首选方法或最佳方法,但是我还没有找到更好的方法来使它生效。

如果有人可以带领我走上更好的道路,我将不胜感激。

VGR:

您应该避免使用字节数组,因为这样做会给大型对象造成内存问题。

我不会尝试编写自定义发布者。相反,只需利用工厂方法HttpRequest.BodyPublishers.ofInputStream即可

HttpRequest.BodyPublisher publisher =
    HttpRequest.BodyPublishers.ofInputStream(() ->  {
        PipedInputStream in = new PipedInputStream();

        ForkJoinPool.commonPool().submit(() -> {
            try (PipedOutputStream out = new PipedOutputStream(in)) {
                objectMapper.writeTree(
                    objectMapper.getFactory().createGenerator(out),
                    jsonData);
            }
            return null;
        });

        return in;
    });

如前所述,您可以使用HttpRequest.BodyPublishers.ofByteArray对于相对较小的对象而言,这很好,但是出于习惯,我编写了可伸缩性程序。假设不需要扩展代码的问题是,其他开发人员会假定传递大型对象是安全的,而不会意识到对性能的影响。

编写自己的正文发布者将需要大量工作。subscribe方法继承自Flow.Publisher

subscribe方法的文档以此开头:

如果可能,添加给定的订户。

每次subscribe调用您的方法时,都需要将Subscriber添加到某种形式的集合中,需要创建Flow.Subscription的实现,并且需要立即将其传递给订阅者的onSubscribe方法。您的Subscription实现对象仅需要在request调用Subscription的方法时,通过调用相应的Subscriber(而不是任何Subscriber)的onNext方法来发送一个或多个ByteBuffer ,并且一旦发送完所有数据,就必须调用相同的订阅者onComplete()方法。最重要的是,Subscription实现对象需要处理cancel请求。

通过扩展SubmissionPublisher(这是Flow.Publisher的默认实现),然后向其中添加一个contentLength()方法,可以使很多操作变得更加容易但是,如SubmissionPublisher文档所示,对于最小的工作实现,您仍有大量工作要做。

HttpRequest.BodyPublishers.of…方法将为您完成所有这些工作。ofByteArray对于小物体来说还可以,但是ofInputStream对任何您可以传入的物体都可以使用。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章