ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • NiFi Index to Index Data Pipeline
    NiFi 2024. 11. 20. 21:12

    오늘은 Nifi 데이터 파이프라인을 소개하려고 합니다! 

    NiFi는 Attribute, Content로 이루어진 FlowFile을 활용하며, 직관적으로 파악할 수 있는 UI 제공과 코드 없이도 기존 프로세서를 사용하여 데이터 파이프라인을 개발할 수 있습니다. (기존 프로세서 외에도 Jar, Nar 파일을 프로세서로 실행시킬 수도 있습니다.)


    제가 소개할 NiFi를 통한 목표는 아래와 같습니다.

     

    목표

    인덱스 to 인덱스 데이터 색인 (Reindex 과정)
    - search_after 사용
    - Bulk API 사용
    제약사항
    - Query를 날릴 수 있어야 함
    - 중복 불가

     

    구성 환경

    • NiFi 1.27.0
    • Linux Centos
    필자의 경우 해당 프로세스를 사용하기 전에 SSL 설정, Key를 발급받아 Variables에 저장해둔 상태입니다.

     

    Datapipe line

    구성된 파이프라인의 일부인 SearchElasticsearch -> Update Attribute -> SplitJson 부분을 먼저 보겠습니다.

    SearchElasticsearch (시작)  -> Update Attribute -> SplitJson

    SearchElasticsearch 특징

    • [] JSONArray 형태로 인입
    • Lucene Syntax 쿼리(ex. query_string)는 지원하지 않는다. (DSL 쿼리를 활용해야 함)
    • Paging 처리를 할 때 (search_after, ..) 모든 document를 돌았음에도 중지되지 않는다.

    이러한 SearchElasticsearch Processor의 특징 때문에 Update Attribute, SplitJson을 추가로 해주어 데이터를 처리합니다.

     

    UpdateAttribute

    Update Attribute 프로세서에서는 SearchElasticsearch로 들어온 JSONArray의 길이를 Attribute에 추가해줍니다.

    • '+' 눌러 속성추가 : ${hit.count:gt(0):and(${hit.count:equals(${size})})} 
      (여기서 size 는 해당 프로세서 그룹의 variables에 적용해둔 search_after을 페이징 사이즈입니다.)

    SplitJson

    SplitJson 프로세서에서는 []로 들어온 JSONArray를 JSONObjecty 단위로 분할해 줍니다. 
      
    (이어서) EvaluateJsonPath -> RouteOnAttribute -> PutElasticsearchJson

     

     

    EvaluateJsonPath

    EvaluateJsonPath 프로세서에서는 JSONObject로 쪼개진 FlowFile을 중복 처리를 하기 위해 속성을 추가해줍니다. 

    • '+' 눌러 속성추가 : 해당 FlowFile Content 부분에서 Bulk API의 _id로 쓸 고유값을 가져옵니다.
      $.kw_docid ('$'는 원문을 뜻함)

    RouteOnAttribute

    RouteOnAttribute 프로세서를 통해 "UpdateAttribute" 프로세서에서 설정한 속성값에 따라 분기 처리를 해줍니다.

     

    PutElasticsearch

    이 프로세서를 통해 Bulk API를 날려줍니다. bulk size를 지정할 수 있으며 Identifier Attribute 설정을 통해
    "EvaluateJsonPath"에서 설정한 bulk api의 _id를 지정해줍니다.

    * 주의 사항 * 
    필자는 해당 프로세스를 설정할 때 설정한 _id값이 아닌 랜던값이 설정되었는데요..
    당시 속성 이름을 언더바가 포함된 "_id"로 설정했으며 언더바를 지우니 해결되었습니다.

     


    저의 경우 데이터 색인작업이 끝나면 "InvokeHttp" 프로세서를 통해 해당 데이터 파이프라인(프로세서 그룹)을 정지 시켰습니다.
    NiFi의 설정이나 다른 부분들을 자세하게 다루진 않았습니다..만 혹시 InvokeHttp를 적용하고 싶으시거나 막히는 부분이 있으시다면 댓글 남겨주세요! 추후에 글을 작성할수도 있지만 댓글달아주시면 바로바로 답변드리겠습니다!

Designed by Tistory.