Akka Alppaka Producer如何与Akka HTTP集成

ehsan sha

嗨,大家好,我如何通过带有Alpakka Kafka连接器的http-akka广告获取数据发送到kafka,这是我的信息来源:

object WebTrack extends App  with  Directives with LazyLogging {
val host = "localhost"
val port = 7070

val authorization = "Authorization"

val route = withSizeLimit(96239727) {
  post {
    headerValueByName("Authorization") { auth =>
      entity(as[data]) { trans =>
        //************************** I need send data to Alpakka Kafka connector ********************************

        complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "<h1>Say hello to akka-http</h1>"))
      }
    }
  }
}

val bindingFuture = Http().bindAndHandleAsync(Route.asyncHandler(route), host, port)
.onComplete {
  case Success(_) => {
    logger.debug("This is very convenient ;-)")
    println(s"Server online at http://localhost:7070 \\nPress RETURN to stop...")
  }
  case Failure(e) => {
    println("Error Bind Http().bindAndHandleAsync")
  }
}

}

丹尼尔·伊诺乔萨(Daniel Hinojosa)

您可以使用Akka Streams做很多不同的事情

Producer根据文档https://doc.akka.io/docs/akka-stream-kafka/current/producer.html这只是一个使用接收器的示例请务必检查出passThrough它可以执行的操作:

  implicit val system: ActorSystem = ActorSystem("my-system")
  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val executionContext: ExecutionContextExecutor = system.dispatcher

  case class User(firstName:String, lastName:String)
  object UserJsonSupport extends DefaultJsonProtocol with SprayJsonSupport {
    implicit val PortofolioFormats = jsonFormat2(User)
  }

  import UserJsonSupport._
  val config = system.settings.config.getConfig("akka.kafka.producer")
  val producerSettings = ProducerSettings(config, new StringSerializer, new StringSerializer)

  val route = withSizeLimit(96239727) {
    post {
      headerValueByName("Authorization") { auth =>
        entity(as[User]) { user: User =>
          Source.single(user)
            .map(user => new ProducerRecord[String, String]("last_names_topic", user.lastName, user.firstName))
            .runWith(Producer.plainSink(producerSettings))
          complete(HttpEntity(ContentTypes.`text/html(UTF-8)`,
            "<h1>Say hello to akka-http</h1>"))
        }
      }
    }
  }

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章