Elastic
參考資源
Elastic Common Schema (ECS)
Elastic Common Schema (ECS) 是一個規範 (Specification),同時這個規範也是 Open Source 的,是在 Elastic 使用者社群支持之下所開發的,主要定義了存放在 Elasticsearch 之中的 Event (事件) 類型資料常用的欄位,這些欄位的型態、描述、使用與呈現方式,而所謂的 Events 即包含就像是 Logs 或是 Metrics 這樣的資料。
ECS 的規範橫的範圍
- Event Sources (事件來源): Event 資料的來源,不論是 Elastic 的產品、第三方的產品或工具、甚至是使用者自己開發的應用程式
- Ingestion Architectures (資料注入的架構): 資料注入 (Ingestion) 的架構,不論有沒有使用 Beats、Logstash、Elasticsearch 的 Ingest Pipeline,都有包含在內。
- Consumers (使用端): 不論是透過 API 查出資料、Kibana Query、Dashboard、應用程式等方式來使用。
Elastic Common Schema 的設計重點
- Core Fields (核心欄位): 指適用於大部份的主要使用情境的欄位,這些欄位也就會在資料分析時、或是製作圖表時,廣泛的被使用在橫跨各種情境的設計之中,這部份的欄位會是比較固定的,不太會隨時間變化。
- Extended Fileds (延伸欄位): 只要不存在 Core Fields 的欄位,就是 Extended Fields,這些欄位只有在特定的使用情境中才會使用,並且隨著時間也較容易發生變化。
ECS 一般準則
- 每個文件必須包含 @timestamp 欄位。
- 需依照 Elasticsearch 的資料型態來定義每個 ECS 的欄位。
- 必須記錄所使用的 ECS 版本號在 ecs.version 的欄位之中。
- 盡可能的將資料對應到 ECS 已定義的欄位之中。
ingest pipLine
功用
Ingest Pipeline (擷取管道) 是一個內建在 Elasticsearch 中,文件在進入 Index 前的資料轉換 (Transformation) 的工具,主要的任務就是針對透過 Indexing RESTful API 傳入的文件,在真正進入 Elasticsearch Indexing 處理之前,先進行前處理
- 將原始的資料豐富化 (enrich),透過查找 Elasticsearch 裡存放在別的 Index 裡的相關資料,加入到文件之中,來豐富原有的文件。
- 將日期格式正確的從文件中的某個欄位擷取出來,讓 Elasticsearch 的 @timestamp 有正確的時間。
- 將 IP 的欄位,透過反查 GeoIP 的資料庫,加入 GeoLocation 的資訊在文件中,以利於之後能使用地圖檢視資料。
- 將文字內容,依照特定的格式,拆解成結構化的 JSON 欄位與值。
- 將 URL 拆解成包含 path、scheme、port、domain、query…各個欄位。
- 將 URL 中 Query String 裡的 Key / Value,轉成結構化的 JSON 欄位與值。
- 當某個條件成立時,填加某個固定值。
運作及使用方式
- Incoming Documents (傳入的文件): 在使用 Indexing API、或是 _bulk API 將文件傳入至 Elasticsearch 準備進行 Indexing 時,可以指定要使用哪一組預先定義好的 Ingest Pipeline。
- Ingest Pipeline (擷取管道): Elasticsearch 在到 Indexing 的請求時,如果有指定 Ingest Pipeline,Coordinator (協調者) Node 會把這個請求,交給 ingest node,透過定義好的 Pipeline 設定,經由當中指定的各種 Processor (處理器) 一步一步的將資料進行處理。
- Target Index (目標索引): Ingest Pipeline 在處理完之後,會將最終的文件,透過 Coordinator 傳送到 Primary Shard 所在的 Node,進行 Indexing 後續的處理。
如何定義ingest pipline
透過 _ingest 的 API 進行設定:
PUT /_ingest/pipeline/<pipeline>
- Pipeline 的名字是 my-pipeline-id。
- version 是提供使用者自己記錄及參考,與 ingest pipeline 本身的運作功能無關,是選用的欄位。
- processors 裡面指定 1 至多個 processors,這部份會是資料 Transform (轉換) 的主要處理,每個 processors 會依照先後順序來執行,一個 processor 做完後會將輸出交給下一個 processor 進行處理,也就是這個功能取名 pipeline 的原因,Elasticsearch 中內建 30 多個 processors,詳細可參考 官方文件 - Ingest Processor Reference。
- _meta 是提供給使用者自己存放自己想要額外加入的資訊所使用的,如果 pipeline 是由程式或其他機制在管理時,可以額外記錄一些參考的資訊
ex
PUT /_ingest/pipeline/my-pipeline-id
{
"version": 1,
"description" : "My optional pipeline description",
"processors" : [
{
"set" : {
"description" : "My optional processor description",
"field": "my-keyword-field",
"value": "foo"
}
}
],
"_meta": {
"reason": "set my-keyword-field to foo",
"serialization": {
"class": "MyPipeline",
"id": 10
}
}
}
模擬測試pipline
api
POST /_ingest/pipeline/my-pipeline-id/_simulate
{
"docs": [
{
"_index": "index",
"_id": "id",
"_source": {
"foo": "bar"
}
},
{
"_index": "index",
"_id": "id",
"_source": {
"foo": "rab"
}
}
]
}
Simulate 指定的 Pipeline 規則 + 測試的文件
POST /_ingest/pipeline/_simulate { "pipeline" : { "description": "_description", "processors": [ { "set" : { "field" : "field2", "value" : "_value" } } ] }, "docs": [ { "_index": "index", "_id": "id", "_source": { "foo": "bar" } }, { "_index": "index", "_id": "id", "_source": { "foo": "rab" } } ] }
Indexing 資料時,使用 Ingest Pipeline 的方法
- Index API ,指定 pipeline 的名字
POST my-data-stream/_doc?pipeline=my-pipeline
{
"@timestamp": "2099-03-07T11:04:05.000Z",
"my-keyword-field": "foo"
}
- Bulk API
PUT my-data-stream/_bulk?pipeline=my-pipeline
{ "create":{ } }
{ "@timestamp": "2099-03-07T11:04:06.000Z", "my-keyword-field": "foo" }
{ "create":{ } }
{ "@timestamp": "2099-03-07T11:04:07.000Z", "my-keyword-field": "bar" }
- Update By Query
- Reindex
Ingest Pipeline 常用的 Processor
gork
Grok 在針對『非結構化的文字資料,整理成結構化的資料』會是非常常用的一個重要工具,這邊要特別介紹的是, Elasticsearch 在 Grok 的支援上,有許多內建好的 Patterns 可以直接拿來使用
Dissect
Date
Convert
Fingerprint
GeoIP
Pipeline
Ingest Pipeline enrich
Enrich (充實、使豐富),指的是在 Ingest Pipeline 中,透過其他地方取得相關的資料,並加在原來的資料當中,讓資料更為豐富。 在 Elasticsearch Ingest Pipeline 的處理過程中,有定義一個 Enrich Processor ,就是專門提供資料 Enrich 的處理,接著將介紹這個 Enrich Processor 的運作方式。
Enrich Processor
Ingest Pipeline 例外處理
ignore_failure
on_failure
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"rename": {
"description": "Rename 'provider' to 'cloud.provider'",
"field": "provider",
"target_field": "cloud.provider",
"on_failure": [
{
"set": {
"description": "Set 'error.message'",
"field": "error.message",
"value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
"override": false
}
}
]
}
}
]
}
外層的 pipeline 設定 on_failure
PUT _ingest/pipeline/my-pipeline
{
"processors": [ ... ],
"on_failure": [
{
"set": {
"description": "Index document to 'failed-<index>'",
"field": "_index",
"value": "failed-{{{ _index }}}"
}
}
]
}
Gork設定參考
參考設定 : https://gilesfadai.com/elk-grok/
基礎能力檢核
- 能獨立安裝 Elasticsearch Cluster,並且熟悉基本的設定。
-
了解 Elasticsearch Index, Mapping 的基本觀念。
-
了解 Elasticsearch Cluster, 什麼是 Primary Shard, Replica Shard,Elasticsearch Node 共有哪些角色。
-
知道如何建立 Index、設定 Index Settings、設定 Mapping、單一文件的 CRUD、批次處理的 mget, bulk 的操作。
-
知道 Indexing 一份文件時,文字欄位如何被解析、Analyzer 的處理方式、Inverted Index 如何儲存。
-
能清楚的分辨什麼是 Elasticsearch refresh, Lucene flush, Elasticsearch flush, Lucene commit, Segment file, Field data, Doc values。
-
能解釋 Query, Filter 的差異、並且知道如何做選擇。
-
掌握基本的 Search API 的使用、並且使用 Query DSL 與 Aggregation。
-
處理 indexing & searching 的請求時,Coordinator 是什麼樣的角色,這些請求在執行時,背後做了哪些事? 什麼是 query then fetch? 和 DFS query then fetch 的差異?
-
知道以下功能或設定使用的時機或要注意的事項,以及可以避掉什麼樣的坑:Nested Object, Terms Aggregation, max_result_window。
-
Dynamic Mapping 是做什麼用的? fields 這個欄位型態又是做什麼用的?
-
知道在進入 Production 時,一些基本的 Elasticsearch 的設定要如何設置。
-
Cluster 紅燈、黃燈、綠燈,分別代表什麼樣的狀態? 對於資料的存取會有什麼樣的影響?
fluentd
fluent.conf
<match dev_abc.**> logstash_format true logstash_prefix dev_sof logstash_dateformat %Y%m%d pipeline pipeline_id
相關設定介紹
Q&A : https://github.com/uken/fluent-plugin-elasticsearch#pipeline