Akka 스트림에 지속적으로 소비하는 websocket

0

질문

Im 좀 새로운 서비스를 제공하고 Akka 스트림 및 im trying to get JSON 문자열에서 메시지를 websocket 밀어들게 카프카 주제입니다.

지금 나는 유일한 작업에서"메시지를 얻을에서는 ws"부분입니다.

메시지에서 오는 websocket 다음과 같습니다:

{  
   "bitcoin":"6389.06534240",
   "ethereum":"192.93111286",
   "monero":"108.90302506",
   "litecoin":"52.25484165"
}

내가 원하는 분이 JSON 메시지를 여러 메시지:

   {"coin": "bitcoin", "price": "6389.06534240"}
   {"coin": "ethereum", "price": "192.93111286"}
   {"coin": "monero", "price": "108.90302506"}
   {"coin": "litecoin", "price": "52.25484165"}

를 밀어 이러한 각각의 메시지를 카프카 주제입니다.

여기에 내가 달성 지금까지:

val message_decomposition: Flow[Message, String, NotUsed] = Flow[Message].mapConcat(
    msg => msg.toString.replaceAll("[{})(]", "").split(",")
  ).map( msg => {
    val splitted = msg.split(":")
    s"{'coin': ${splitted(0)}, 'price': ${splitted(1)}}"
  })

val sink: Sink[String, Future[Done]] = Sink.foreach[String](println)

val flow: Flow[Message, Message, Promise[Option[Message]]] =
    Flow.fromSinkAndSourceMat(
      message_decomposition.to(sink),
      Source.maybe[Message])(Keep.right)

val (upgradeResponse, promise) = Http().singleWebSocketRequest(
      WebSocketRequest("wss://ws.coincap.io/prices?assets=ALL"),
      flow)

일기 예상 출력 Json 메시지 궁금했다면 내가 쓸 수 있는 이 프로듀서상"Akka-틱"스타일을 사용하여 같은 GraphDSL. 그래서 나는 몇 가지 질문이 있:

  • 그것은 지속적으로 소비하는 WebSocket 를 사용하여 GraphDSL? 그렇다면,당신을 보여줘 예를 들어 주시기 바랍?
  • 그것은 좋은 아이디어를 소비하는 WS 를 사용하여 GraphDSL?
  • 어야 나는 분해를 받았 Json 같은 메시지가 전송하기 전에 카프카? 또는 그것의 더 나은 그것을 보낼로 그것은 낮은 대기 시간?
  • 을 생성한 후에 메시지를 카프카에,나는 계획하는 소비를 사용하여 아파치 폭풍은 그것이 좋은 생각입니까? 또는 나 가진 지팡 Akka?

에 대한 감사를 읽고,나에게 감사합니다, 타디네

akka akka-stream apache-kafka scala
2021-11-20 14:01:02
1

최고의 응답

1

는 코드는 많 Akka-틱: scaladsl 로 Akka 로 GraphDSL 을 구현하고 사용자 지정 GraphStage. 유,IMO/전자,가을 GraphDSL 은 경우 실제적인 모양의 그래프에 쉽게 표현할 수 있에 scaladsl.

나 개인적으로 이동이 경로 정의 CoinPrice 클래스를 모형을 만드는 명시적

case class CoinPrice(coin: String, price: BigDecimal)

다음 Flow[Message, CoinPrice, NotUsed] 을 분석하 1 들어오는 메시지를 영상 CoinPrices. 가(를 사용하여 플레이 JSON 여기)다음과 같:

val toCoinPrices =
  Flow[Message]
    .mapConcat { msg =>
      Json.parse(msg.toString)
        .asOpt[JsObject]
        .toList
        .flatMap { json =>
          json.underlying.flatMap { kv =>
            import scala.util.Try

            kv match {
              case (coin, JsString(priceStr)) =>
                Try(BigDecimal(priceStr)).toOption
                  .map(p => CoinPrice(coin, p))                

              case (coin, JsNumber(price)) => Some(CoinPrice(coin, price))
              case _ => None
            }
          }
        }
    }

당신은 수도에 따라 어떤 크기의 JSONs 메시지,휴식하고 싶다는 다른 흐 단계를 허용하는 비동기 사이의 경계 JSON 분석 및 추출으로 CoinPrices. 예를 들어,

Flow[Message]
  .mapConcat { msg =>
    Json.parse(msg.toString).asOpt[JsObject].toList
  }
  .async
  .mapConcat { json =>
    json.underlying.flatMap { kv =>
      import scala.util.Try

      kv match {
        case (coin, JsString(priceStr)) =>
          Try(BigDecimal(priceStr)).toOption
            .map(p => CoinPrice(coin, p))

        case (coin, JsNumber(price)) => Some(CoinPrice(coin, price))
        case _ => None
      }
    }
  }

상기에서,단계에서 어느 쪽의 async 경계선에서 실행됩니다 별도의 배우와 이에 따라,가능성이 동시에(있다면 충분히 CPU 코어를 사용할 수 있 etc.), 의 비용으로 추가의 오버헤드를 위해 배우 조정하고 메시지를 주고 받습니다. 는 추가 조정/통신 오버헤드(cf. 군터 그의 보편적 확장성은 법)만 그것을 가치가 있는 경우 JSON 체가 충분히 크고 오는 충분히 빠르(지속적으로 오기 전에 이전 하나 처리를 완료).

는 경우에 당신의 의도를 소비하는 websocket 때까지 이 프로그램을 중지할 수 있습에게 명확하게 사용 Source.never[Message].

2021-11-21 12:42:30

덕분에 당신을 위해 답변,그것은 매우 명확하고,저는 그 질문이 하나 있다. 나는 어떻게 나의 응답으로 다른 stream 단계는? 할 수 있습니 그냥 쇼를 나에게 작은 예를 들어 주시기 바랍? 또는 방향을 적당한 부분의 서류가 있습니까?
Arès

다른 언어로

이 페이지는 다른 언어로되어 있습니다

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................