내가 하려고 쓰기 코드는 다음과 같다:-
- 를 읽고 큰 csv 파일에서 원본 다음과 같 s3.
- 프로세스가 파일을 기록하여 기록이다.
- 알림을 보낼 사용자
- 출력을 작성하여 원격 위치
샘플 레코드에서 입력 csv:
recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000
내가 입력한 경우 등을 나타내는 레코드에서 입력 csv:
case class InputRecord(recordId: String, name: String, salary: Long)
샘플에서 기록을 출력한 csv(공유할 수 있습니다):
recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager
출력한 경우 등을 나타내는 레코드에서 입력 csv:
case class OutputRecord(recordId: String, name: String, designation: String)
읽기를 사용하여 레코드 akka 스트림 csv(사용 Alpakka 반응성 s3 https://doc.akka.io/docs/alpakka/current/s3.html):
def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] =
S3.download(s3Object.bucket, s3Object.path)
.runWith(Sink.head)
// This is then converted to csv
지금 나는 기능을 처 레코드:
def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer
함수를 작성 OutputRecord csv
def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] =
S3.multipartUpload(s3Object.bucket,
s3Object.path,
metaHeaders = MetaHeaders(Map())
능 이메일을 보내 알림:
def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info
모두 함께 바느질
readAsCSV.flatMap { recordSource =>
recordSource.map { record
val outputRecord = process(record)
outputRecord
}
.via(notify) //Error: Line 15
.to(writeOutput) //Error: Line 16
.run()
}
선 15&16 내가 오류,내가 하나를 추가할 수 있 선 15 또는 선 16 지만 둘 모두 이후 notify
& writeOutput
구 outputRecord
. 한 알리라 나는 느슨한 나 outputRecord
.
이 있을 수 있는 방법을 모두 추가 notify
고 writeOutput
동 graph?
자동 병렬 실행을 위해 내가 먼저 전화 notify
만 writeOutput
. 그래서 이것이 유용하지 않습니다. https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing
사용 사례는 것이 매우 간단하지만 일부는 어떻게 나를 찾을 수 없는 깨끗한 솔루션입니다.