Akka 흐름을 입력(`에서`)으로 출력(Out)

0

질문

내가 하려고 쓰기 코드는 다음과 같다:-

  1. 를 읽고 큰 csv 파일에서 원본 다음과 같 s3.
  2. 프로세스가 파일을 기록하여 기록이다.
  3. 알림을 보낼 사용자
  4. 출력을 작성하여 원격 위치

샘플 레코드에서 입력 csv:

recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000

내가 입력한 경우 등을 나타내는 레코드에서 입력 csv:

case class InputRecord(recordId: String, name: String, salary: Long)

샘플에서 기록을 출력한 csv(공유할 수 있습니다):

recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager

출력한 경우 등을 나타내는 레코드에서 입력 csv:

case class OutputRecord(recordId: String, name: String, designation: String)

읽기를 사용하여 레코드 akka 스트림 csv(사용 Alpakka 반응성 s3 https://doc.akka.io/docs/alpakka/current/s3.html):

def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] = 
S3.download(s3Object.bucket, s3Object.path)
      .runWith(Sink.head)
// This is then converted to csv

지금 나는 기능을 처 레코드:

def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer

함수를 작성 OutputRecord csv

def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] = 
S3.multipartUpload(s3Object.bucket,
                       s3Object.path,
                       metaHeaders = MetaHeaders(Map())

능 이메일을 보내 알림:

def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info

모두 함께 바느질

readAsCSV.flatMap { recordSource =>
  recordSource.map { record
    val outputRecord = process(record)
    outputRecord
  }
  .via(notify) //Error: Line 15
  .to(writeOutput) //Error: Line 16
  .run()
}

선 15&16 내가 오류,내가 하나를 추가할 수 있 선 15 또는 선 16 지만 둘 모두 이후 notify & writeOutputoutputRecord. 한 알리라 나는 느슨한 나 outputRecord.

이 있을 수 있는 방법을 모두 추가 notifywriteOutput 동 graph?

자동 병렬 실행을 위해 내가 먼저 전화 notifywriteOutput. 그래서 이것이 유용하지 않습니다. https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing

사용 사례는 것이 매우 간단하지만 일부는 어떻게 나를 찾을 수 없는 깨끗한 솔루션입니다.

akka akka-stream alpakka amazon-s3
2021-11-23 22:36:54
1

최고의 응답

1

의 출력 notifyPushResult하지만 입력 writeOutputByteString. 면을 변경하는 것이 컴파일. 필요한 경우 ByteString을 얻을에서 동일한 OutputRecord.

BTW,샘플 코드에서는 당신을 제공하고 있 비슷한 오류가 있에 readCSVprocess.

2021-11-24 03:36:16

다른 언어로

이 페이지는 다른 언어로되어 있습니다

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................