[1] Helpers - Bulk API 

특정 사항이나  raw API를 요약하는 간단한 Helpers functions 컬렉션입니다.

특정 형식에 대한 요구 사항 및 다른 사항으로 인해 직접 사용하는 경우 번거로울 수 있으므로 Bulk API에는 여러 가지 Helpers가 있습니다.

모든 Bulk Helpers는 Elasticsearch 클래스의 인스턴스와 반복 가능한 작업을 수용합니다 (반복 가능하거나 생성자가 될 수 있음). 대량의 데이터 세트를 메모리에 로드하지 않고도 인덱싱 할 수 있습니다.

[2] 사용 예제

import json
import os
from elasticsearch import Elasticsearch
from elasticsearch import helpers

ES_HOST = '127.0.0.1'  # set es url or host
ES = Elasticsearch(hosts=ES_HOST, port=9200)
n = 10

##############################
# Bulk api 사용법
# bulk api를 활용해서 불안정한 search api 대신,
# 다량의 데이터를 한번에 전송하기 위해 작성된 예제 코드입니다.
# 코드는 아래와 같은 방식으로 구현되었습니다.
# ==========================================
# 먼저, resultList로 부터 json 포맷으로 저장된 결과들을 list형인 result_dict에 저장합니다.
# 여기서, resultDict에 있는 데이터들이 n개 이상일 경우, 엘라스틱서치에 전송됩니다.
# n개를 넘지않을 경우에는 추가 데이터를 받아오거나,
# 코드가 종료되는 시점에서 나머지 데이터를 전송하는 방향으로 처리하시면 됩니다.
# ==========================================
##############################
def collector():
  resultList = [{"target":apple, "taste":"good"}, {"target":banana, "taste": "bad"}]
  resultDict = []
  
  for data in resultList:
  	result = {"_index": ES_index, "_source": data}
  	resultDict.append(result)
    
    if len(resultDict) > n:
      helpers.bulk(ES, resultDict)
      resultDict = []
      
  helpers.bulk(ES, resultDict)

[3] 참고 자료

Python Helpers 모듈, https://elasticsearch-py.readthedocs.io/en/master/helpers.html

 

Helpers — Elasticsearch 7.5.1 documentation

Lets say we have an iterable of data. Lets say a list of words called mywords and we want to index those words into individual documents where the structure of the document is like {"word": " "} . The parallel_bulk() api is a wrapper around the bulk() api

elasticsearch-py.readthedocs.io

 

반응형

elasticsearch의 인덱스에 대한 change 여부를 체크하는 방법에 대한 포스트입니다.

1. 시퀀스 번호를 활용한 추적

시퀀스 번호 체크 방법

http://{ip}:9200/_search?seq_no_primary_term=true

Today this can be solved client side by storing the last sequence number and then polling the shard level stats for the current sequence number; if it is higher, there must have been a change. Closing.

마지막 시퀀스 번호를 저장 한 다음 현재 시퀀스 번호의 샤드 레벨 통계를 폴링하여 클라이언트 측에서 해결할 수 있습니다.

https://github.com/elastic/elasticsearch/issues/13830

Determine if the index has been modified · Issue #13830 · elastic/elasticsearch

Today, it is difficult to determine if any index has been updated (e.g., in mostly read only scenarios where the answer becomes more interesting). If you have a simple, outside cache, then it would...

github.com

2. 스택 모니터링의 indices stats 정보, index stats API  히스토리컬 데이터 확인

http://{ip}:9200/_stats

 

http://{ip}:9200/{index}/_stats

https://www.elastic.co/guide/en/kibana/7.5/elasticsearch-metrics.html#indices-overview-page

https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-stats.html

반응형

이번 포스트는 Elasticsearch를 공부하면서 참고한 자료들을 단순히 주제별로 정리한 포스트입니다.

 

1. ELK 스택

Elasticsearch + Logstash + Kibana 스택 구축 가이드

Digital Ocean - [how to install]

 

2. elasticsearch-dump [엘라스틱서치-덤프]

엘라스틱 서치에 있는 파일들을 json 포맷이나 csv 포맷으로 저장하는 파이썬 모듈 

2.1. Copy

2.2. Backup

 

3.4. merge

curl -XPOST 'localhost:9200/kimchy,elasticsearch/_forcemerge?pretty'

Best way, using reindex API

POST _reindex
{
  "conflicts": "proceed",
  "source": {
  "index": ["twitter", "blog"],
  "type": ["tweet", "post"]
  },
  "dest": {
  "index": "all_together"
  }
}

 

3.5. mapping

PUT post
{
  "mappings":{
    "nulled":{
      "properties":{
        "user":{
        "type":"text"
        },
        "title":{
        "type":"text"
        },
        "detail":{
        "type":"text"
        },
        "url":{
        "type":"text"
        },
        "datetime":{
        "type":"date","format":"YYYY-MM-dd HH:mm"
        }
     }
   }
}



반응형

+ Recent posts