에 docstring, elasticsearch.helpers.async_bulk
자신을 설명
도우미 대한:meth:
~elasticsearch.AsyncElasticsearch.bulk
api 제공하는 더 인간 친화적 인 인터페이스-소비 반복기의 동작과 를 보내 그동 방법에 덩어리. 원
컨텍스트
용 AsyncElasticsearch.bulk()
성공적으로 보내는 팬더 dataframes 일부 ES 인스턴스
def _rec_to_actions(self, df):
for record in df.to_dict(orient="records"):
yield ('{ "index" : { "_index" : "%s" }}' % (self.index))
yield (json.dumps(record, default=int))
async def send_to_elasticsearch(self, df: DataFrame):
logger.info(f"{self.stage_name} sending batch to elastic")
await self.elastic_client.bulk(self._rec_to_actions(df))
문제
그러나,그것은 async_bulk
을,나를 얻 index is missing
오류가 있습니다.
async def send_to_elasticsearch(self, df: DataFrame):
await async_bulk(self.elastic_client, self._rec_to_actions(df))
도 조정 _rec_to_actions()
에서 여러 가지 방법이 효과가 있다.
def _rec_to_actions(self, df):
for record in df.to_dict(orient="records"):
record["index"] = self.index
yield (json.dumps(record, default=int))
난 주요 문제는 나가지 확실히 알 것은 행동의 맥락에서,동 방법. 이 개념은 어디에서나 문서만 있지 않은 명확한 데이터 구조에 대응에서 이 라이브러리 소스 코드 (없음 찾을 수있는,어쨌든)
무엇을 정확하게 작업 하고 어떻게 조정기를 보내 df 의 데이터 self.index
?
환경
- python="3.9.5"
- 동 방법을 제="7.14.1"