Im 좀 새로운 서비스를 제공하고 Akka 스트림 및 im trying to get JSON 문자열에서 메시지를 websocket 밀어들게 카프카 주제입니다.
지금 나는 유일한 작업에서"메시지를 얻을에서는 ws"부분입니다.
메시지에서 오는 websocket 다음과 같습니다:
{
"bitcoin":"6389.06534240",
"ethereum":"192.93111286",
"monero":"108.90302506",
"litecoin":"52.25484165"
}
내가 원하는 분이 JSON 메시지를 여러 메시지:
{"coin": "bitcoin", "price": "6389.06534240"}
{"coin": "ethereum", "price": "192.93111286"}
{"coin": "monero", "price": "108.90302506"}
{"coin": "litecoin", "price": "52.25484165"}
를 밀어 이러한 각각의 메시지를 카프카 주제입니다.
여기에 내가 달성 지금까지:
val message_decomposition: Flow[Message, String, NotUsed] = Flow[Message].mapConcat(
msg => msg.toString.replaceAll("[{})(]", "").split(",")
).map( msg => {
val splitted = msg.split(":")
s"{'coin': ${splitted(0)}, 'price': ${splitted(1)}}"
})
val sink: Sink[String, Future[Done]] = Sink.foreach[String](println)
val flow: Flow[Message, Message, Promise[Option[Message]]] =
Flow.fromSinkAndSourceMat(
message_decomposition.to(sink),
Source.maybe[Message])(Keep.right)
val (upgradeResponse, promise) = Http().singleWebSocketRequest(
WebSocketRequest("wss://ws.coincap.io/prices?assets=ALL"),
flow)
일기 예상 출력 Json 메시지 궁금했다면 내가 쓸 수 있는 이 프로듀서상"Akka-틱"스타일을 사용하여 같은 GraphDSL. 그래서 나는 몇 가지 질문이 있:
- 그것은 지속적으로 소비하는 WebSocket 를 사용하여 GraphDSL? 그렇다면,당신을 보여줘 예를 들어 주시기 바랍?
- 그것은 좋은 아이디어를 소비하는 WS 를 사용하여 GraphDSL?
- 어야 나는 분해를 받았 Json 같은 메시지가 전송하기 전에 카프카? 또는 그것의 더 나은 그것을 보낼로 그것은 낮은 대기 시간?
- 을 생성한 후에 메시지를 카프카에,나는 계획하는 소비를 사용하여 아파치 폭풍은 그것이 좋은 생각입니까? 또는 나 가진 지팡 Akka?
에 대한 감사를 읽고,나에게 감사합니다, 타디네