Elastic

參考資源

  1. 喬叔帶你上手 Elastic Stack - 探索與實踐 Observability系列

Elastic Common Schema (ECS)

Elastic Common Schema (ECS) 是一個規範 (Specification),同時這個規範也是 Open Source 的,是在 Elastic 使用者社群支持之下所開發的,主要定義了存放在 Elasticsearch 之中的 Event (事件) 類型資料常用的欄位,這些欄位的型態、描述、使用與呈現方式,而所謂的 Events 即包含就像是 Logs 或是 Metrics 這樣的資料。

ECS 的規範橫的範圍

  1. Event Sources (事件來源): Event 資料的來源,不論是 Elastic 的產品、第三方的產品或工具、甚至是使用者自己開發的應用程式
  2. Ingestion Architectures (資料注入的架構): 資料注入 (Ingestion) 的架構,不論有沒有使用 Beats、Logstash、Elasticsearch 的 Ingest Pipeline,都有包含在內。
  3. Consumers (使用端): 不論是透過 API 查出資料、Kibana Query、Dashboard、應用程式等方式來使用。

Elastic Common Schema 的設計重點

  1. Core Fields (核心欄位): 指適用於大部份的主要使用情境的欄位,這些欄位也就會在資料分析時、或是製作圖表時,廣泛的被使用在橫跨各種情境的設計之中,這部份的欄位會是比較固定的,不太會隨時間變化。
  2. Extended Fileds (延伸欄位): 只要不存在 Core Fields 的欄位,就是 Extended Fields,這些欄位只有在特定的使用情境中才會使用,並且隨著時間也較容易發生變化。

ECS 一般準則

  1. 每個文件必須包含 @timestamp 欄位。
  2. 需依照 Elasticsearch 的資料型態來定義每個 ECS 的欄位。
  3. 必須記錄所使用的 ECS 版本號在 ecs.version 的欄位之中。
  4. 盡可能的將資料對應到 ECS 已定義的欄位之中。

ingest pipLine

功用

Ingest Pipeline (擷取管道) 是一個內建在 Elasticsearch 中,文件在進入 Index 前的資料轉換 (Transformation) 的工具,主要的任務就是針對透過 Indexing RESTful API 傳入的文件,在真正進入 Elasticsearch Indexing 處理之前,先進行前處理

  1. 將原始的資料豐富化 (enrich),透過查找 Elasticsearch 裡存放在別的 Index 裡的相關資料,加入到文件之中,來豐富原有的文件。
  2. 將日期格式正確的從文件中的某個欄位擷取出來,讓 Elasticsearch 的 @timestamp 有正確的時間。
  3. 將 IP 的欄位,透過反查 GeoIP 的資料庫,加入 GeoLocation 的資訊在文件中,以利於之後能使用地圖檢視資料。
  4. 將文字內容,依照特定的格式,拆解成結構化的 JSON 欄位與值。
  5. 將 URL 拆解成包含 path、scheme、port、domain、query…各個欄位。
  6. 將 URL 中 Query String 裡的 Key / Value,轉成結構化的 JSON 欄位與值。
  7. 當某個條件成立時,填加某個固定值。

運作及使用方式

  1. Incoming Documents (傳入的文件): 在使用 Indexing API、或是 _bulk API 將文件傳入至 Elasticsearch 準備進行 Indexing 時,可以指定要使用哪一組預先定義好的 Ingest Pipeline。
  2. Ingest Pipeline (擷取管道): Elasticsearch 在到 Indexing 的請求時,如果有指定 Ingest Pipeline,Coordinator (協調者) Node 會把這個請求,交給 ingest node,透過定義好的 Pipeline 設定,經由當中指定的各種 Processor (處理器) 一步一步的將資料進行處理。
  3. Target Index (目標索引): Ingest Pipeline 在處理完之後,會將最終的文件,透過 Coordinator 傳送到 Primary Shard 所在的 Node,進行 Indexing 後續的處理。

如何定義ingest pipline

透過 _ingest 的 API 進行設定:

PUT /_ingest/pipeline/<pipeline>
  1. Pipeline 的名字是 my-pipeline-id。
  2. version 是提供使用者自己記錄及參考,與 ingest pipeline 本身的運作功能無關,是選用的欄位。
  3. processors 裡面指定 1 至多個 processors,這部份會是資料 Transform (轉換) 的主要處理,每個 processors 會依照先後順序來執行,一個 processor 做完後會將輸出交給下一個 processor 進行處理,也就是這個功能取名 pipeline 的原因,Elasticsearch 中內建 30 多個 processors,詳細可參考 官方文件 - Ingest Processor Reference。
  4. _meta 是提供給使用者自己存放自己想要額外加入的資訊所使用的,如果 pipeline 是由程式或其他機制在管理時,可以額外記錄一些參考的資訊

ex

PUT /_ingest/pipeline/my-pipeline-id
{
  "version": 1,
  "description" : "My optional pipeline description",
  "processors" : [
    {
      "set" : {
        "description" : "My optional processor description",
        "field": "my-keyword-field",
        "value": "foo"
      }
    }
  ],
  "_meta": {
    "reason": "set my-keyword-field to foo",
    "serialization": {
      "class": "MyPipeline",
      "id": 10
    }
  }
}

模擬測試pipline

api

POST /_ingest/pipeline/my-pipeline-id/_simulate
{
  "docs": [
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "foo": "bar"
      }
    },
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "foo": "rab"
      }
    }
  ]
}

Simulate 指定的 Pipeline 規則 + 測試的文件

POST /_ingest/pipeline/_simulate
{
  "pipeline" :
  {
    "description": "_description",
    "processors": [
      {
        "set" : {
          "field" : "field2",
          "value" : "_value"
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "foo": "bar"
      }
    },
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "foo": "rab"
      }
    }
  ]
}

Indexing 資料時,使用 Ingest Pipeline 的方法

  1. Index API ,指定 pipeline 的名字
POST my-data-stream/_doc?pipeline=my-pipeline
{
  "@timestamp": "2099-03-07T11:04:05.000Z",
  "my-keyword-field": "foo"
}
  1. Bulk API
PUT my-data-stream/_bulk?pipeline=my-pipeline
{ "create":{ } }
{ "@timestamp": "2099-03-07T11:04:06.000Z", "my-keyword-field": "foo" }
{ "create":{ } }
{ "@timestamp": "2099-03-07T11:04:07.000Z", "my-keyword-field": "bar" }
  1. Update By Query
  2. Reindex

Ingest Pipeline 常用的 Processor

gork

Grok 在針對『非結構化的文字資料,整理成結構化的資料』會是非常常用的一個重要工具,這邊要特別介紹的是, Elasticsearch 在 Grok 的支援上,有許多內建好的 Patterns 可以直接拿來使用

Dissect

Date

Convert

Fingerprint

GeoIP

Pipeline

Ingest Pipeline enrich

Enrich (充實、使豐富),指的是在 Ingest Pipeline 中,透過其他地方取得相關的資料,並加在原來的資料當中,讓資料更為豐富。 在 Elasticsearch Ingest Pipeline 的處理過程中,有定義一個 Enrich Processor ,就是專門提供資料 Enrich 的處理,接著將介紹這個 Enrich Processor 的運作方式。

Enrich Processor

Ingest Pipeline 例外處理

ignore_failure

on_failure

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "rename": {
        "description": "Rename 'provider' to 'cloud.provider'",
        "field": "provider",
        "target_field": "cloud.provider",
        "on_failure": [
          {
            "set": {
              "description": "Set 'error.message'",
              "field": "error.message",
              "value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
              "override": false
            }
          }
        ]
      }
    }
  ]
}

外層的 pipeline 設定 on_failure

PUT _ingest/pipeline/my-pipeline
{
  "processors": [ ... ],
  "on_failure": [
    {
      "set": {
        "description": "Index document to 'failed-<index>'",
        "field": "_index",
        "value": "failed-{{{ _index }}}"
      }
    }
  ]
}

Gork設定參考

參考設定 : https://gilesfadai.com/elk-grok/

基礎能力檢核

  1. 能獨立安裝 Elasticsearch Cluster,並且熟悉基本的設定。
  1. 了解 Elasticsearch Index, Mapping 的基本觀念。

  2. 了解 Elasticsearch Cluster, 什麼是 Primary Shard, Replica Shard,Elasticsearch Node 共有哪些角色。

  3. 知道如何建立 Index、設定 Index Settings、設定 Mapping、單一文件的 CRUD、批次處理的 mget, bulk 的操作。

  4. 知道 Indexing 一份文件時,文字欄位如何被解析、Analyzer 的處理方式、Inverted Index 如何儲存。

  5. 能清楚的分辨什麼是 Elasticsearch refresh, Lucene flush, Elasticsearch flush, Lucene commit, Segment file, Field data, Doc values。

  6. 能解釋 Query, Filter 的差異、並且知道如何做選擇。

  7. 掌握基本的 Search API 的使用、並且使用 Query DSL 與 Aggregation。

  8. 處理 indexing & searching 的請求時,Coordinator 是什麼樣的角色,這些請求在執行時,背後做了哪些事? 什麼是 query then fetch? 和 DFS query then fetch 的差異?

  9. 知道以下功能或設定使用的時機或要注意的事項,以及可以避掉什麼樣的坑:Nested Object, Terms Aggregation, max_result_window。

  10. Dynamic Mapping 是做什麼用的? fields 這個欄位型態又是做什麼用的?

  11. 知道在進入 Production 時,一些基本的 Elasticsearch 的設定要如何設置。

  12. Cluster 紅燈、黃燈、綠燈,分別代表什麼樣的狀態? 對於資料的存取會有什麼樣的影響?

fluentd

fluent.conf

<match dev_abc.**>
  logstash_format true
  logstash_prefix dev_sof
  logstash_dateformat %Y%m%d
  pipeline pipeline_id

相關設定介紹

Q&A : https://github.com/uken/fluent-plugin-elasticsearch#pipeline