等待最后一个元素时阻止通量

倒霉:

我想通过rsocket连接两个应用程序。一种是用GO编写的,另一种是用Kotlin编写的。我想实现客户端发送数据流和服务器发送确认响应的连接。

问题在于等待所有元素,如果服务器不使用BlockOnLast(ctx),则会读取整个流,但是会在所有条目到达之前发送响应。如果添加了BlockOnLast(ctx),则服务器(GoLang)被卡住。

我在Kotlin上也写过客户,在这种情况下,整个交流工作都很好。

Enyone可能有帮助吗?

GO服务器:

package main

import (
"context"
"github.com/golang/protobuf/proto"
"github.com/rsocket/rsocket-go"
"github.com/rsocket/rsocket-go/payload"
"github.com/rsocket/rsocket-go/rx"
"github.com/rsocket/rsocket-go/rx/flux"
"rsocket-go-rpc-test/proto"
)

func main() {
addr := "tcp://127.0.0.1:8081"
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := rsocket.Receive().
    Fragment(1024).
    Resume().
    Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
        return rsocket.NewAbstractSocket(
            rsocket.RequestChannel(func(payloads rx.Publisher) flux.Flux {
                println("START")

                payloads.(flux.Flux).
                    DoOnNext(func(input payload.Payload) {
                        chunk := &pl_dwojciechowski_proto.Chunk{}
                        proto.Unmarshal(input.Data(), chunk)
                        println(string(chunk.Content))
                    }).BlockLast(ctx)

                return flux.Create(func(i context.Context, sink flux.Sink) {
                    status, _ := proto.Marshal(&pl_dwojciechowski_proto.UploadStatus{
                        Message: "OK",
                        Code:    0,
                    })

                    sink.Next(payload.New(status, make([]byte, 1)))
                    sink.Complete()
                    println("SENT")
                })
            }),
        ), nil
    }).
    Transport(addr).
    Serve(ctx)
panic(err)

}

Kotlin客户:

private fun clientCall() {
val rSocket = RSocketFactory.connect().transport(TcpClientTransport.create(8081)).start().block()
val client = FileServiceClient(rSocket)

val requests: Flux<Chunk> = Flux.range(1, 10)
    .map { i: Int -> "sending -> $i" }
    .map<Chunk> {
        Chunk.newBuilder()
            .setContent(ByteString.copyFrom(it.toByteArray())).build()
    }

val response = client.send(requests).block() ?: throw Exception("")
rSocket.dispose()
System.out.println(response.message)

}

和用Kotlin编写的GO等效:

    val serviceServer = FileServiceServer(DefaultService(), Optional.empty(), Optional.empty())
val closeableChannel = RSocketFactory.receive()
    .acceptor { setup: ConnectionSetupPayload?, sendingSocket: RSocket? ->
        Mono.just(
            RequestHandlingRSocket(serviceServer)
        )
    }
    .transport(TcpServerTransport.create(8081))
    .start()
    .block()
    closeableChannel.onClose().block()

class DefaultService : FileService {
override fun send(messages: Publisher<Service.Chunk>?, metadata: ByteBuf?): Mono<Service.UploadStatus> {
    return Flux.from(messages)
        .windowTimeout(10, Duration.ofSeconds(500))
        .flatMap(Function.identity())
        .doOnNext { println(it.content.toStringUtf8()) }
        .then(Mono.just(Service.UploadStatus.newBuilder().setCode(Service.UploadStatusCode.Ok).setMessage("test").build()))
}
}

服务器输出:

START
sending -> 1
倒霉:

解决方案如下:

package main
import (
   "context"
   "github.com/golang/protobuf/proto"
   "github.com/rsocket/rsocket-go"
   "github.com/rsocket/rsocket-go/payload"
   "github.com/rsocket/rsocket-go/rx"
   "github.com/rsocket/rsocket-go/rx/flux"
   "rsocket-go-rpc-test/proto"
)
type TestService struct {
   totals int
pl_dwojciechowski_proto.FileService
}
var statusOK = &pl_dwojciechowski_proto.UploadStatus{
   Message: "code",
Code:    pl_dwojciechowski_proto.UploadStatusCode_Ok,
}
var statusErr = &pl_dwojciechowski_proto.UploadStatus{
   Message: "code",
Code:    pl_dwojciechowski_proto.UploadStatusCode_Failed,
}
func main() {
   addr := "tcp://127.0.0.1:8081"
ctx, cancel := context.WithCancel(context.Background())
   defer cancel()
   err := rsocket.Receive().
      Fragment(1024).
      Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
         return rsocket.NewAbstractSocket(
            rsocket.RequestChannel(func(msgs rx.Publisher) flux.Flux {
               dataReceivedChan := make(chan bool, 1)
               toChan, _ := flux.Clone(msgs).
                  DoOnError(func(e error) {
                     dataReceivedChan <- false
}).
                  DoOnComplete(func() {
                     dataReceivedChan <- true
}).
                  ToChan(ctx, 1)
               fluxResponse := flux.Create(func(ctx context.Context, s flux.Sink) {
                  gluedContent := make([]byte, 1024)
                  for c := range toChan {
                     chunk := pl_dwojciechowski_proto.Chunk{}
                     _ = chunk.XXX_Unmarshal(c.Data())
                     gluedContent = append(gluedContent, chunk.Content...)
                  }
                  if <-dataReceivedChan {
                     marshal, _ := proto.Marshal(statusOK)
                     s.Next(payload.New(marshal, nil))
                     s.Complete()
                  } else {
                     marshal, _ := proto.Marshal(statusErr)
                     s.Next(payload.New(marshal, nil))
                     s.Complete()
                  }
               })
               return fluxResponse
}),
), nil
}).
      Transport(addr).
      Serve(ctx)
   panic(err)
}

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章