Concatinating 두 개의 흐름에 스트림 Akka

0

질문

내가 하려고 concat 두 개의 흐름과 저는지 설명할 수 있 출력을 나의 구현합니다.

val source = Source(1 to 10)
val sink = Sink.foreach(println)

val flow1 = Flow[Int].map(s => s + 1)
val flow2 = Flow[Int].map(s => s * 10)

val flowGraph = Flow.fromGraph(
    GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val concat = builder.add(Concat[Int](2))
      val broadcast = builder.add(Broadcast[Int](2))

      broadcast ~> flow1 ~> concat.in(0)
      broadcast ~> flow2 ~> concat.in(1)

      FlowShape(broadcast.in, concat.out)
    }
  )

source.via(flowGraph).runWith(sink)

나는 다음 출력에서 이 코드입니다.

2
3
4
.
.
.
11
10
20
.
.
.
100

대신,나만 보"2"되고 인쇄했습니다. 할 수 있는 것은 잘못이 무엇인지 설명해 내 implmentation 어떻게 변화 프로그램으로 원하는 출력을 얻을.

akka akka-stream scala
2021-10-21 17:29:00
2

최고의 응답

3

에서 Akka 스트림의 API 를 문서도구:

Concat:

를 방출할 때에 현재는 요소를 사용할 경우 현재의 입력이 완료되면하려고 다음 중 하나

Broadcast:

를 방출한 경우 출력의 모든지 backpressuring 및 입력이 있 요소 사용 가능

이 두 사업자 사이에 작동하지 않을 것과 함께 있으므로 충돌에서 그들이 어떻게 작동-- Concat 을 당하고 모든 요소 중 하나에서의 Broadcast의 출력을 전환하기 전에 다른 하나는 반면, Broadcast 지 않을 것을 방출하지 않는 한 대한 수요가있다 그것의 모든 출력이 있습니다.

무엇을 위해,당신이 필요할 수 있는 연결을 사용하여 concat 에 의해 제안으로 코멘트:

source.via(flow1).concat(source.via(flow2)).runWith(sink)

거나 동등하게,사용 Source.combine 아래와 같다:

Source.combine(source.via(flow1), source.via(flow2))(Concat[Int](_)).runWith(sink)
2021-10-21 22:34:04
0

GraphDSL는 간소화 버전을 구현하의 소스입니다.결합:

val sg = Source.fromGraph(
  GraphDSL.create(){ implicit builder =>
    import GraphDSL.Implicits._

    val concat = builder.add(Concat[Int](2))

    source ~> flow1 ~> concat
    source ~> flow2 ~> concat

    SourceShape(concat.out)
  }
)

sg.runWith(sink)
2021-10-26 19:23:56

다른 언어로

이 페이지는 다른 언어로되어 있습니다

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................