
在分布式系统与微服务架构普及的今天,日志分析、数据检索、可视化监控已成为研发与运维工作的核心需求。ELK Stack(Elasticsearch、Logstash、Kibana)作为开源领域最成熟的日志与数据分析解决方案,凭借其高可扩展性、实时性和易用性,被阿里、腾讯、字节跳动等大厂广泛应用于日志收集、业务检索、运维监控等场景。
本文将从底层逻辑到实战落地,手把手带你掌握ELK Stack的核心用法。全文基于最新稳定版本(Elasticsearch 8.15.0、Logstash 8.15.0、Kibana 8.15.0)编写,所有示例均经过JDK 17、MySQL 8.0环境验证,可直接编译运行。无论你是研发工程师、运维人员,还是想入门数据可视化的学习者,都能通过本文夯实基础、解决实际问题。
ELK Stack是Elastic公司推出的一套开源数据处理与可视化套件,由三个核心组件组成,各司其职又协同工作:

工作流程拆解:
# 创建用户
useradd elk
# 设置密码
passwd elk
# 授权sudo权限
echo "elk ALL=(ALL) NOPASSWD: ALL" >> /etc/sudoers
sudo systemctl stop firewalld
sudo systemctl disable firewalld
sudo sed -i 's/SELINUX=enforcing/SELINUX=disabled/' /etc/selinux/config
source /etc/selinux/config
# 编辑内核参数文件
sudo vim /etc/sysctl.conf
# 添加以下内容
vm.max_map_count=262144 # ES要求的虚拟内存最大值
fs.file-max=65536 # 系统最大文件句柄数
# 生效配置
sudo sysctl -p
# 编辑限制文件
sudo vim /etc/security/limits.conf
# 添加以下内容(针对elk用户)
elk soft nofile 65536
elk hard nofile 65536
elk soft nproc 4096
elk hard nproc 4096
# 切换到elk用户
su - elk
# 下载ES 8.15.0(官网地址:https://www.elastic.co/cn/downloads/elasticsearch)
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.15.0-linux-x86_64.tar.gz
# 解压
tar -zxvf elasticsearch-8.15.0-linux-x86_64.tar.gz
# 重命名(方便操作)
mv elasticsearch-8.15.0 elasticsearch
编辑ES配置文件 elasticsearch/config/elasticsearch.yml:
# 集群名称(默认elasticsearch,集群部署时所有节点需一致)
cluster.name:elk-cluster
# 节点名称(单节点随意,集群部署时需唯一)
node.name:node-1
# 数据存储目录(建议单独挂载磁盘,避免占用系统盘)
path.data:/home/elk/elasticsearch/data
# 日志存储目录
path.logs:/home/elk/elasticsearch/logs
# 绑定地址(0.0.0.0允许外部访问,开发环境使用,生产环境建议指定具体IP)
network.host:0.0.0.0
# HTTP端口(默认9200)
http.port:9200
# 集群初始化节点(单节点为当前节点)
cluster.initial_master_nodes:["node-1"]
# 关闭HTTPS(开发环境,生产环境建议开启)
xpack.security.enabled:false
xpack.security.http.ssl.enabled:false
# 允许跨域(方便前端/Kibana访问)
http.cors.enabled:true
http.cors.allow-origin:"*"
# 进入ES目录
cd /home/elk/elasticsearch
# 后台启动
./bin/elasticsearch -d
# 访问ES健康检查接口
curl http://localhost:9200/_cat/health?v
成功输出示例(status为green表示健康):
epoch timestamp cluster status node.total node.data shards pri relo init unassign pending_tasks max_task_wait_time active_shards_percent
1699876532 10:35:32 elk-cluster green 1 1 0 0 0 0 0 0 - 100.0%
# 下载Logstash 8.15.0
wget https://artifacts.elastic.co/downloads/logstash/logstash-8.15.0-linux-x86_64.tar.gz
# 解压
tar -zxvf logstash-8.15.0-linux-x86_64.tar.gz
# 重命名
mv logstash-8.15.0 logstash
Logstash的核心是管道配置(输入→过滤→输出),配置文件放在 logstash/config 目录下,创建 mysql-to-es.conf:
# 输入插件:从MySQL采集数据
input {
jdbc {
# MySQL驱动路径(需提前下载mysql-connector-java-8.0.33.jar,放入logstash/lib目录)
jdbc_driver_library => "/home/elk/logstash/lib/mysql-connector-java-8.0.33.jar"
# MySQL驱动类名
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
# MySQL连接地址(数据库名:demo,字符集:utf8mb4)
jdbc_connection_string => "jdbc:mysql://localhost:3306/demo?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8mb4"
# MySQL用户名
jdbc_user => "root"
# MySQL密码
jdbc_password => "root123456"
# 开启分页查询
jdbc_paging_enabled => true
# 每页查询条数
jdbc_page_size => 1000
# 定时任务:每10秒执行一次(语法:分 时 日 月 周)
schedule => "*/10 * * * *"
# 查询SQL(查询demo表中id大于上次记录的新数据,实现增量同步)
statement => "SELECT id, username, age, email, create_time FROM user WHERE id > :sql_last_value"
# 记录上次查询的id值(存储在logstash/data目录下)
use_column_value => true
# 作为增量标识的字段(id为自增主键)
tracking_column => "id"
# 增量标识字段类型(numeric:数字,timestamp:时间戳)
tracking_column_type => "numeric"
# 初始化时的默认值(首次执行时查询id>0的所有数据)
last_run_metadata_path => "/home/elk/logstash/data/sql_last_value.txt"
}
}
# 过滤插件:数据清洗与转换
filter {
# 字段类型转换(将age从字符串转为整数,create_time转为时间类型)
mutate {
convert => { "age" => "integer" }
}
# 时间格式转换(将MySQL的datetime格式转为ES支持的ISO8601格式)
date {
match => { "create_time" => "yyyy-MM-dd HH:mm:ss" }
target => "@timestamp" # 覆盖ES默认的@timestamp字段
}
# 移除无用字段(如logstash自动添加的@version)
mutate {
remove_field => ["@version"]
}
}
# 输出插件:将处理后的数据写入ES
output {
elasticsearch {
# ES连接地址
hosts => ["http://localhost:9200"]
# 写入ES的索引名(用户数据:user-index)
index => "user-index"
# 用MySQL的id作为ES文档的_id(避免重复数据)
document_id => "%{id}"
}
# 调试输出:将处理后的数据打印到控制台
stdout {
codec => rubydebug
}
}
下载MySQL驱动包 mysql-connector-java-8.0.33.jar,放入 logstash/lib 目录(驱动版本需与MySQL版本匹配,MySQL 8.0对应8.x驱动)。
cd /home/elk/logstash
# 指定配置文件启动
./bin/logstash -f config/mysql-to-es.conf
user 表并插入测试数据:CREATE DATABASEIFNOTEXISTS demo DEFAULTCHARACTERSET utf8mb4 COLLATE utf8mb4_unicode_ci;
USE demo;
CREATETABLEIFNOTEXISTSuser (
idBIGINT AUTO_INCREMENT PRIMARY KEYCOMMENT'主键ID',
username VARCHAR(50) NOTNULLCOMMENT'用户名',
age INTCOMMENT'年龄',
email VARCHAR(100) COMMENT'邮箱',
create_time DATETIME NOTNULLDEFAULTCURRENT_TIMESTAMPCOMMENT'创建时间'
) COMMENT'用户表';
-- 插入测试数据
INSERTINTOuser (username, age, email) VALUES ('zhangsan', 25, 'zhangsan@demo.com'), ('lisi', 30, 'lisi@demo.com');
查看Logstash控制台输出,若能看到处理后的用户数据,且访问ES接口验证索引存在:
curl http://localhost:9200/user-index/_search?q=*
能查询到插入的用户数据,说明Logstash数据采集与同步成功。
# 下载Kibana 8.15.0
wget https://artifacts.elastic.co/downloads/kibana/kibana-8.15.0-linux-x86_64.tar.gz
# 解压
tar -zxvf kibana-8.15.0-linux-x86_64.tar.gz
# 重命名
mv kibana-8.15.0 kibana
编辑Kibana配置文件 kibana/config/kibana.yml:
# 服务器端口(默认5601)
server.port: 5601
# 绑定地址(0.0.0.0允许外部访问)
server.host: "0.0.0.0"
# ES连接地址(需与ES配置一致)
elasticsearch.hosts: ["http://localhost:9200"]
# 界面语言(中文)
i18n.locale: "zh-CN"
cd /home/elk/kibana
# 后台启动
nohup ./bin/kibana &
http://服务器IP:5601,若能看到Kibana中文登录界面(开发环境ES关闭了安全验证,可直接进入),说明启动成功。Elasticsearch的高效检索能力源于倒排索引(Inverted Index),这是一种将“关键词”映射到“文档”的索引结构,与传统数据库的“文档→关键词”正向索引相反。

_id(可手动指定或自动生成);_doc)。text(支持分词,用于全文检索)、keyword(不支持分词,用于精确匹配、聚合);integer、long、float、double;date(支持多种时间格式,默认ISO8601);boolean、ip、geo_point(地理位置)等。curl -X PUT "http://localhost:9200/product-index" -H "Content-Type: application/json" -d '
{
"settings": {
"number_of_shards": 3, # 主分片数量3
"number_of_replicas": 1 # 副本分片数量1
},
"mappings": {
"properties": {
"product_id": { "type": "long", "index": true }, # 商品ID,长整型,可索引
"product_name": { "type": "text", "analyzer": "ik_max_word" }, # 商品名称,文本类型,使用IK分词器
"category": { "type": "keyword" }, # 商品分类,关键字类型,精确匹配
"price": { "type": "double" }, # 商品价格,双精度浮点型
"create_time": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss" } # 创建时间,指定格式
}
}
}'
说明:使用IK分词器(需提前安装,用于中文分词),ik_max_word表示最大粒度分词。
curl -X PUT "http://localhost:9200/product-index/_doc/1001" -H "Content-Type: application/json" -d '
{
"product_id": 1001,
"product_name": "华为Mate 60 Pro",
"category": "手机",
"price": 6999.00,
"create_time": "2024-01-15 10:30:00"
}'
curl -X GET "http://localhost:9200/product-index/_doc/1001"
curl -X GET "http://localhost:9200/product-index/_search" -H "Content-Type: application/json" -d '
{
"query": {
"match": {
"product_name": "华为"
}
}
}'
curl -X GET "http://localhost:9200/product-index/_search" -H "Content-Type: application/json" -d '
{
"size": 0, # 不返回具体文档,只返回聚合结果
"aggs": {
"category_count": {
"terms": {
"field": "category" # 按category字段分组
}
}
}
}'
curl -X DELETE "http://localhost:9200/product-index"
Elasticsearch官方推荐使用elasticsearch-java客户端(替代旧版TransportClient),支持同步/异步操作,以下是实战示例。
<dependencies>
<!-- Elasticsearch Java客户端 -->
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>8.15.0</version>
</dependency>
<!-- Jackson依赖(客户端依赖Jackson处理JSON) -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
<!-- Lombok(日志、Getter/Setter) -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
<!-- Spring Boot Test(测试用) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>3.1.5</version>
<scope>test</scope>
</dependency>
</dependencies>
package com.jam.demo.es;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.springframework.util.ObjectUtils;
import java.io.Closeable;
import java.io.IOException;
/**
* Elasticsearch客户端工具类
* 功能:创建并管理ElasticsearchClient实例
* @author ken
*/
@Slf4j
publicclass EsClientUtil implements Closeable {
privatestatic ElasticsearchClient esClient;
privatestatic RestClient restClient;
privatestatic ElasticsearchTransport transport;
/**
* 初始化Elasticsearch客户端
* @param host ES主机地址
* @param port ES端口
* @return ElasticsearchClient实例
*/
public static ElasticsearchClient initClient(String host, int port) {
if (ObjectUtils.isEmpty(esClient)) {
// 1. 创建RestClient
restClient = RestClient.builder(new HttpHost(host, port)).build();
// 2. 创建传输层(使用Jackson处理JSON)
transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
// 3. 创建ElasticsearchClient
esClient = new ElasticsearchClient(transport);
log.info("Elasticsearch客户端初始化成功,host:{},port:{}", host, port);
}
return esClient;
}
/**
* 关闭客户端资源
*/
@Override
public void close() throws IOException {
if (!ObjectUtils.isEmpty(transport)) {
transport.close();
}
if (!ObjectUtils.isEmpty(restClient)) {
restClient.close();
}
log.info("Elasticsearch客户端资源已关闭");
}
}
package com.jam.demo.es.entity;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 商品实体类(对应ES中的product-index索引)
* @author ken
*/
@Data
publicclass Product {
/**
* 商品ID(对应ES文档的product_id字段)
*/
private Long productId;
/**
* 商品名称(对应ES文档的product_name字段)
*/
private String productName;
/**
* 商品分类(对应ES文档的category字段)
*/
private String category;
/**
* 商品价格(对应ES文档的price字段)
*/
private Double price;
/**
* 创建时间(对应ES文档的create_time字段)
*/
private LocalDateTime createTime;
}
package com.jam.demo.es;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.query_dsl.MatchQuery;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.core.bulk.CreateOperation;
import co.elastic.clients.elasticsearch.indices.CreateIndexRequest;
import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;
import co.elastic.clients.elasticsearch.indices.DeleteIndexRequest;
import com.jam.demo.es.entity.Product;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
/**
* 商品ES操作服务类
* 功能:实现商品索引的创建、文档的CRUD、检索、聚合等操作
* @author ken
*/
@Slf4j
publicclass EsProductService {
privatefinal ElasticsearchClient esClient;
privatestaticfinal String PRODUCT_INDEX = "product-index";
/**
* 构造方法(初始化ES客户端)
* @param host ES主机地址
* @param port ES端口
*/
public EsProductService(String host, int port) {
this.esClient = EsClientUtil.initClient(host, port);
}
/**
* 创建商品索引(指定映射)
* @return 是否创建成功
* @throws IOException ES操作异常
*/
public boolean createProductIndex() throws IOException {
// 构建索引映射配置
String mapping = "{\n" +
" \"settings\": {\n" +
" \"number_of_shards\": 3,\n" +
" \"number_of_replicas\": 1\n" +
" },\n" +
" \"mappings\": {\n" +
" \"properties\": {\n" +
" \"productId\": { \"type\": \"long\", \"index\": true },\n" +
" \"productName\": { \"type\": \"text\", \"analyzer\": \"ik_max_word\" },\n" +
" \"category\": { \"type\": \"keyword\" },\n" +
" \"price\": { \"type\": \"double\" },\n" +
" \"createTime\": { \"type\": \"date\", \"format\": \"yyyy-MM-dd HH:mm:ss\" }\n" +
" }\n" +
" }\n}";
CreateIndexRequest request = CreateIndexRequest.of(builder -> builder
.index(PRODUCT_INDEX)
.withJson(mapping.getBytes())
);
CreateIndexResponse response = esClient.indices().create(request);
log.info("创建商品索引结果:{},索引名:{}", response.acknowledged(), PRODUCT_INDEX);
return response.acknowledged();
}
/**
* 插入单个商品文档
* @param product 商品实体
* @param docId ES文档ID
* @return 是否插入成功
* @throws IOException ES操作异常
*/
public boolean insertProductDoc(Product product, String docId) throws IOException {
if (ObjectUtils.isEmpty(product) || StringUtils.isEmpty(docId)) {
log.error("插入商品文档失败:商品信息或文档ID为空");
returnfalse;
}
IndexRequest<Product> request = IndexRequest.of(builder -> builder
.index(PRODUCT_INDEX)
.id(docId)
.document(product)
);
IndexResponse response = esClient.index(request);
log.info("插入商品文档结果:{},文档ID:{}", response.result(), docId);
return"created".equals(response.result().jsonValue()) || "updated".equals(response.result().jsonValue());
}
/**
* 批量插入商品文档
* @param productList 商品列表
* @return 批量操作结果(成功数量)
* @throws IOException ES操作异常
*/
public int bulkInsertProductDocs(List<Product> productList) throws IOException {
if (CollectionUtils.isEmpty(productList)) {
log.error("批量插入商品文档失败:商品列表为空");
return0;
}
// 构建批量操作列表
List<BulkOperation> operations = productList.stream()
.map(product -> {
// 用商品ID作为文档ID
String docId = product.getProductId().toString();
return BulkOperation.of(builder -> builder
.create(CreateOperation.of(cBuilder -> cBuilder
.index(PRODUCT_INDEX)
.id(docId)
.document(product)
))
);
})
.collect(Collectors.toList());
BulkRequest request = BulkRequest.of(builder -> builder.operations(operations));
BulkResponse response = esClient.bulk(request);
if (response.errors()) {
log.error("批量插入商品文档存在失败记录");
response.items().forEach(item -> {
if (item.error() != null) {
log.error("文档ID:{},插入失败:{}", item.id(), item.error().reason());
}
});
}
int successCount = (int) response.items().stream()
.filter(item -> item.error() == null)
.count();
log.info("批量插入商品文档完成,总数量:{},成功数量:{}", productList.size(), successCount);
return successCount;
}
/**
* 根据文档ID查询商品
* @param docId 文档ID
* @return 商品实体
* @throws IOException ES操作异常
*/
public Product getProductDocById(String docId) throws IOException {
if (StringUtils.isEmpty(docId)) {
log.error("查询商品文档失败:文档ID为空");
returnnull;
}
GetRequest request = GetRequest.of(builder -> builder
.index(PRODUCT_INDEX)
.id(docId)
);
GetResponse<Product> response = esClient.get(request, Product.class);
if (response.found()) {
log.info("查询商品文档成功,文档ID:{}", docId);
return response.source();
} else {
log.info("未查询到商品文档,文档ID:{}", docId);
returnnull;
}
}
/**
* 全文检索商品(根据商品名称)
* @param productName 商品名称关键词
* @return 商品列表
* @throws IOException ES操作异常
*/
public List<Product> searchProductByTitle(String productName) throws IOException {
if (StringUtils.isEmpty(productName)) {
log.error("检索商品失败:商品名称关键词为空");
returnnull;
}
// 构建match查询(全文检索)
Query query = MatchQuery.of(mBuilder -> mBuilder
.field("productName")
.query(productName)
)._toQuery();
SearchRequest request = SearchRequest.of(sBuilder -> sBuilder
.index(PRODUCT_INDEX)
.query(query)
);
SearchResponse<Product> response = esClient.search(request, Product.class);
List<Product> productList = response.hits().hits().stream()
.map(hit -> hit.source())
.collect(Collectors.toList());
log.info("检索商品完成,关键词:{},匹配数量:{}", productName, productList.size());
return productList;
}
/**
* 删除商品索引
* @return 是否删除成功
* @throws IOException ES操作异常
*/
public boolean deleteProductIndex() throws IOException {
DeleteIndexRequest request = DeleteIndexRequest.of(builder -> builder.index(PRODUCT_INDEX));
esClient.indices().delete(request);
log.info("删除商品索引成功,索引名:{}", PRODUCT_INDEX);
returntrue;
}
}
package com.jam.demo.es;
import com.jam.demo.es.entity.Product;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
importstatic org.junit.Assert.*;
/**
* 商品ES操作服务测试类
* @author ken
*/
publicclass EsProductServiceTest {
private EsProductService esProductService;
private EsClientUtil esClientUtil;
@Before
public void init() {
// 初始化ES客户端和服务类(本地ES,端口9200)
esClientUtil = new EsClientUtil();
esProductService = new EsProductService("localhost", 9200);
}
@Test
public void testProductIndexOperation() throws IOException {
// 1. 创建商品索引
boolean createResult = esProductService.createProductIndex();
assertTrue(createResult);
// 2. 插入单个商品文档
Product product1 = new Product();
product1.setProductId(1001L);
product1.setProductName("华为Mate 60 Pro");
product1.setCategory("手机");
product1.setPrice(6999.00);
product1.setCreateTime(LocalDateTime.of(2024, 1, 15, 10, 30, 0));
boolean insertSingleResult = esProductService.insertProductDoc(product1, "1001");
assertTrue(insertSingleResult);
// 3. 批量插入商品文档
Product product2 = new Product();
product2.setProductId(1002L);
product2.setProductName("苹果iPhone 15");
product2.setCategory("手机");
product2.setPrice(7999.00);
product2.setCreateTime(LocalDateTime.of(2024, 2, 20, 14, 15, 0));
Product product3 = new Product();
product3.setProductId(1003L);
product3.setProductName("小米笔记本Pro");
product3.setCategory("笔记本电脑");
product3.setPrice(5999.00);
product3.setCreateTime(LocalDateTime.of(2024, 3, 10, 9, 45, 0));
List<Product> productList = Arrays.asList(product2, product3);
int bulkInsertCount = esProductService.bulkInsertProductDocs(productList);
assertEquals(2, bulkInsertCount);
// 4. 根据ID查询商品
Product queryProduct = esProductService.getProductDocById("1001");
assertNotNull(queryProduct);
assertEquals("华为Mate 60 Pro", queryProduct.getProductName());
// 5. 全文检索商品
List<Product> searchProducts = esProductService.searchProductByTitle("手机");
assertNotNull(searchProducts);
assertEquals(2, searchProducts.size());
// 6. 删除商品索引(测试完成后清理)
boolean deleteResult = esProductService.deleteProductIndex();
assertTrue(deleteResult);
}
@After
public void destroy() throws IOException {
// 关闭客户端资源
esClientUtil.close();
}
}
ES默认的分词器(如Standard Analyzer)对中文支持较差,会将中文词语拆分为单个汉字(如“华为手机”拆分为“华”“为”“手”“机”),无法满足中文全文检索需求。IK分词器是专门为中文设计的分词器,支持自定义词典,能准确拆分中文词语。
# 进入ES插件目录
cd /home/elk/elasticsearch/plugins
# 创建ik目录
mkdir ik
# 下载IK分词器(官网地址:https://github.com/medcl/elasticsearch-analysis-ik/releases)
wget https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v8.15.0/elasticsearch-analysis-ik-8.15.0.zip
# 解压到ik目录
unzip elasticsearch-analysis-ik-8.15.0.zip -d ik
# 删除压缩包
rm -rf elasticsearch-analysis-ik-8.15.0.zip
# 查看ES插件列表
/home/elk/elasticsearch/bin/elasticsearch-plugin list
# 输出“analysis-ik”表示安装成功
IK分词器提供两种分词模式:
ik_max_word:最大粒度分词(将文本拆分为尽可能多的词语);ik_smart:最小粒度分词(将文本拆分为最合理的词语)。测试分词效果(REST API):
# ik_max_word模式
curl -X POST "http://localhost:9200/_analyze" -H "Content-Type: application/json" -d '
{
"analyzer": "ik_max_word",
"text": "华为Mate 60 Pro手机"
}'
# ik_smart模式
curl -X POST "http://localhost:9200/_analyze" -H "Content-Type: application/json" -d '
{
"analyzer": "ik_smart",
"text": "华为Mate 60 Pro手机"
}'
elasticsearch/plugins/ik/config/custom.dic,添加自定义词语:华为Mate 60 Pro
小米笔记本Pro
elasticsearch/plugins/ik/config/IKAnalyzer.cfg.xml,指定自定义词典:<properties>
<comment>IK Analyzer 扩展配置</comment>
<!-- 自定义扩展词典 -->
<entry key="ext_dict">custom.dic</entry>
<!-- 自定义停用词词典 -->
<entry key="ext_stopwords">stopword.dic</entry>
</properties>
curl -X POST "http://localhost:9200/_analyze" -H "Content-Type: application/json" -d '
{
"analyzer": "ik_max_word",
"text": "华为Mate 60 Pro手机"
}'
输出结果中应包含“华为Mate 60 Pro”作为一个独立词语。
index: false);integer而非long);index.refresh_interval: -1),导入完成后恢复(默认1秒);curl -X PUT "http://localhost:9200/product-index/_settings" -H "Content-Type: application/json" -d '{"index.refresh_interval": "-1"}'。_source指定需要返回的字段,避免返回全量字段;size参数限制返回文档数量,避免大量数据传输;from+size不超过10000),深度分页建议使用search_after或scroll;index.queries.cache.size)。elasticsearch/config/jvm.options:-Xms4g -Xmx4g(根据实际内存调整)。/etc/fstab 注释swap分区,重启生效。Logstash的核心是管道,每个管道由三个组件组成,形成数据处理流程:
管道工作流程(流程图):

input {
file {
path => ["/var/log/application/*.log"] # 日志文件路径(支持通配符)
start_position => "beginning" # 从文件开头读取(默认end:从文件末尾)
sincedb_path => "/home/elk/logstash/data/sincedb" # 记录文件读取位置(避免重复读取)
stat_interval => 1 # 检查文件变化的时间间隔(秒)
codec => "json" # 日志格式(JSON格式,若为普通文本可省略)
}
}
input {
kafka {
bootstrap_servers => "kafka-node1:9092,kafka-node2:9092" # Kafka集群地址
topics => ["app-log-topic"] # 要消费的主题
group_id => "logstash-consumer-group" # 消费者组ID
auto_offset_reset => "latest" # 无偏移量时从最新消息开始消费
consumer_threads => 5 # 消费线程数
}
}
%{字段类型:字段名:转换类型},常用字段类型:IP:IP地址;NUMBER:数字(可指定int/float,如NUMBER:int);DATA:任意字符(不包含空格);TIMESTAMP_ISO8601:ISO8601格式时间;HTTP_METHOD:HTTP请求方法(GET/POST等);URIPATH:URI路径;核心配置示例(解析Nginx访问日志):
filter {
grok {
# 匹配Nginx日志格式(日志格式:$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent" "$http_x_forwarded_for" $request_time)
match => { "message" => "%{IP:client_ip} - %{DATA:remote_user} \[%{HTTPDATE:request_time}\] \"%{HTTP_METHOD:request_method} %{URIPATH:request_path}(%{URIPARAM:request_params})? HTTP/%{NUMBER:http_version}\" %{NUMBER:status:int} %{NUMBER:body_bytes_sent:int} \"%{DATA:http_referer}\" \"%{DATA:user_agent}\" \"%{DATA:http_x_forwarded_for}\" %{NUMBER:request_duration:float}" }
# 匹配失败时添加标签
tag_on_failure => ["grok_parse_failure"]
# 移除原始message字段(解析后无需保留)
remove_field => ["message"]
}
}
验证Grok匹配规则:可通过Kibana的「Dev Tools」→「Grok Debugger」工具测试,输入日志文本和Grok表达式,实时查看解析结果。
filter {
mutate {
# 1. 类型转换(将status从字符串转为整数,request_duration转为浮点型)
convert => { "status" => "integer" "request_duration" => "float" }
# 2. 重命名字段(将http_x_forwarded_for改为forwarded_ip)
rename => { "http_x_forwarded_for" => "forwarded_ip" }
# 3. 添加新字段(固定值字段)
add_field => { "service_name" => "nginx" "env" => "prod" }
# 4. 移除无用字段
remove_field => ["remote_user", "http_referer"]
# 5. 字符串替换(将user_agent中的空字符替换为-)
gsub => [ "user_agent", " ", "-" ]
}
}
@timestamp字段(或自定义时间字段),便于后续时间范围查询; 核心配置示例:filter {
date {
# 匹配日志中的时间字段(request_time为Nginx日志中的时间,格式:dd/MMM/yyyy:HH:mm:ss Z)
match => { "request_time" => "dd/MMM/yyyy:HH:mm:ss Z" }
# 目标字段(覆盖ES默认的@timestamp字段)
target => "@timestamp"
# 时间 zone(避免时区偏移)
timezone => "Asia/Shanghai"
# 匹配失败时的默认时间(当前时间)
fallback_to_current_time => true
}
}
常用时间格式匹配符:
yyyy:4位年份;MM:2位月份;dd:2位日期;HH:24小时制小时;mm:分钟;ss:秒;Z:时区(如+0800);MMM:英文月份缩写(如Jan、Feb)。filter {
# 丢弃状态码为404的日志
if [status] == 404 {
drop { }
}
# 丢弃测试环境的日志
if [env] == "test" {
drop { }
}
}
filter {
geoip {
# 要解析的IP字段(client_ip为Grok提取的客户端IP)
source => "client_ip"
# 存储地理位置信息的字段名(默认geoip)
target => "geoip"
# 数据库路径(默认使用插件内置的GeoLite2数据库,需定期更新)
database => "/home/elk/logstash/config/GeoLite2-City.mmdb"
# 要获取的地理位置字段(默认全量,可指定减少数据量)
fields => ["country_name", "region_name", "city_name", "location"]
}
}
说明:GeoLite2数据库需从MaxMind官网下载(免费版),定期更新以保证IP解析准确性。
输出插件负责将过滤后的结构化数据写入目标系统,支持多输出(同时写入ES和文件、ES和Kafka等)。
output {
elasticsearch {
hosts => ["http://localhost:9200"] # ES集群地址(多个用逗号分隔)
index => "nginx-access-log-%{+YYYY.MM.dd}" # 按天生成索引(日志按天分割,便于管理)
document_id => "%{[@timestamp]}-%{client_ip}-%{request_path}" # 自定义文档ID,避免重复
# 批量写入优化(提升性能)
flush_size => 5000 # 每5000条数据批量提交一次
idle_flush_time => 10 # 若10秒内未达到flush_size,也批量提交
# 认证配置(生产环境ES开启安全验证时启用)
user => "elastic" # ES默认管理员用户
password => "elastic123" # ES用户密码
# 索引模板自动加载(提前创建索引模板,定义映射)
template => "/home/elk/logstash/config/nginx-template.json"
template_name => "nginx-access-template"
template_overwrite => true # 覆盖已存在的模板
}
}
索引模板示例(nginx-template.json):提前定义索引映射,避免ES自动映射导致字段类型错误:
{
"index_patterns": ["nginx-access-log-*"], # 匹配所有nginx访问日志索引
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"client_ip": { "type": "ip" },
"request_time": { "type": "date", "format": "dd/MMM/yyyy:HH:mm:ss Z" },
"request_method": { "type": "keyword" },
"request_path": { "type": "keyword" },
"status": { "type": "integer" },
"request_duration": { "type": "float" },
"user_agent": { "type": "text", "analyzer": "ik_smart" },
"geoip": {
"properties": {
"country_name": { "type": "keyword" },
"region_name": { "type": "keyword" },
"city_name": { "type": "keyword" },
"location": { "type": "geo_point" } # 地理位置类型,支持地图可视化
}
}
}
}
}
output {
kafka {
bootstrap_servers => "kafka-node1:9092,kafka-node2:9092" # Kafka集群地址
topic_id => "processed-nginx-log" # 目标主题
# 分区策略(按client_ip哈希分区,保证同一IP的日志进入同一分区)
partitioner => "org.apache.kafka.clients.producer.internals.DefaultPartitioner"
partition_key => "%{client_ip}"
# 批量发送优化
batch_size => 16384 # 批量发送大小(字节)
linger_ms => 5 # 等待5毫秒,凑够批量大小再发送
# 序列化方式(JSON格式)
codec => "json"
}
}
output {
file {
path => "/home/elk/logstash/output/nginx-log-%{+YYYY.MM.dd}.log" # 按天生成备份文件
codec => "json_lines" # 每行一个JSON对象
flush_interval => 5 # 每5秒刷新一次缓冲区
gzip => true # 启用Gzip压缩,减少磁盘占用
}
}
output {
# 当请求响应时间超过5秒时,发送邮件告警
if [request_duration] > 5 {
email {
to => "admin@demo.com" # 接收告警邮件地址
from => "logstash-alert@demo.com" # 发送邮件地址
subject => "【告警】Nginx请求响应时间过长" # 邮件主题
body => "客户端IP:%{client_ip}\n请求路径:%{request_path}\n响应时间:%{request_duration}秒\n发生时间:%{[@timestamp]}" # 邮件内容
address => "smtp.demo.com" # SMTP服务器地址
port => 465 # SMTP端口(SSL)
username => "logstash-alert@demo.com" # SMTP用户名
password => "alert123" # SMTP密码
use_tls => true # 启用TLS加密
}
}
}
# 输入:读取Nginx访问日志
input {
file {
path => ["/var/log/nginx/access.log"]
start_position => "beginning"
sincedb_path => "/home/elk/logstash/data/sincedb_nginx"
stat_interval => 1
}
}
# 过滤:解析、清洗、转换数据
filter {
# 1. Grok解析Nginx日志字段
grok {
match => { "message" => "%{IP:client_ip} - %{DATA:remote_user} \[%{HTTPDATE:request_time}\] \"%{HTTP_METHOD:request_method} %{URIPATH:request_path}(%{URIPARAM:request_params})? HTTP/%{NUMBER:http_version}\" %{NUMBER:status:int} %{NUMBER:body_bytes_sent:int} \"%{DATA:http_referer}\" \"%{DATA:user_agent}\" \"%{DATA:forwarded_ip}\" %{NUMBER:request_duration:float}" }
tag_on_failure => ["grok_parse_failure"]
remove_field => ["message", "remote_user", "http_referer"]
}
# 2. 时间格式标准化
date {
match => { "request_time" => "dd/MMM/yyyy:HH:mm:ss Z" }
target => "@timestamp"
timezone => "Asia/Shanghai"
fallback_to_current_time => true
}
# 3. 字段加工
mutate {
rename => { "forwarded_ip" => "x_forwarded_ip" }
add_field => { "service" => "nginx" "env" => "prod" }
}
# 4. 地理位置解析
geoip {
source => "client_ip"
target => "geoip"
fields => ["country_name", "region_name", "city_name", "location"]
}
# 5. 丢弃404日志
if [status] == 404 {
drop { }
}
}
# 输出:写入ES(按天分区索引)
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "nginx-access-log-%{+YYYY.MM.dd}"
document_id => "%{[@timestamp]}-%{client_ip}-%{request_path}"
flush_size => 5000
idle_flush_time => 10
template => "/home/elk/logstash/config/nginx-template.json"
template_name => "nginx-access-template"
template_overwrite => true
}
# 调试输出:控制台打印(生产环境可注释)
stdout {
codec => rubydebug
}
}
sudo chmod 644 /var/log/nginx/access.log
sudo chown elk:elk /var/log/nginx/access.log
cd /home/elk/logstash
./bin/logstash -f config/nginx-log-to-es.conf
# 查询当天的Nginx日志索引
curl http://localhost:9200/nginx-access-log-$(date +%Y.%m.%d)/_search?q=*
若返回包含解析后字段(client_ip、request_method、geoip等)的文档,说明配置生效。
nginx-access-log-*,然后在「Discover」中查看日志数据,可按时间范围、客户端IP等条件筛选。sincedb避免重复读取,Kafka插件增加consumer_threads(消费线程数,建议等于Kafka主题分区数);flush_size(批量提交条数,建议5000-10000)和idle_flush_time(空闲刷新时间,建议10-30秒),减少HTTP请求次数。drop过滤放在最前面,提前丢弃无效数据,减少后续处理压力;codec => "json"解析,无需Grok;remove_field移除无用字段,减少数据传输和存储压力。- pipeline.id:nginx-log-pipeline
path.config:"/home/elk/logstash/config/nginx-log-to-es.conf"
pipeline.workers:4# 工作线程数(建议等于CPU核心数)
pipeline.batch.size:1000# 每个线程批量处理条数
-pipeline.id:app-log-pipeline
path.config:"/home/elk/logstash/config/app-log-to-es.conf"
pipeline.workers:4
pipeline.batch.size:1000
启动时无需指定配置文件,直接启动Logstash即可自动加载多管道配置。
logstash/config/jvm.options,根据服务器内存调整堆内存(建议4-8GB,不超过物理内存的50%):-Xms4g
-Xmx4g
pipeline.workers:工作线程数,建议等于CPU核心数(如8核CPU设置为8);pipeline.batch.size:每个线程批量处理条数,建议1000-2000(结合flush_size调整,避免内存溢出)。
Kibana提供多个功能模块,覆盖数据检索、可视化、监控、告警等全流程需求,核心模块如下:
索引模式是Kibana关联ES索引的桥梁,用于指定Kibana需要处理的ES索引(支持通配符),步骤如下:
nginx-access-log-*,匹配所有Nginx访问日志索引),点击「Next step」;@timestamp,用于时间范围过滤),点击「Create index pattern」;nginx-access-log-*);client_ip:192.168.1.100,检索指定IP的日志;request_duration:>3,检索响应时间超过3秒的日志);client_ip、request_path、status),右侧将只展示选中字段。{
"query": {
"bool": {
"must": [
{ "match": { "status": 200 } },
{ "range": { "request_duration": { "gt": 2 } } }
]
}
}
}
以创建「Nginx请求响应时间分布柱状图」和「各省份访问量地图」为例:
nginx-access-log-*);nginx-access-log-*);将多个可视化图表整合到仪表盘,集中展示Nginx访问日志核心指标:
当Nginx请求响应时间超过5秒的请求数在1分钟内达到10次时,发送邮件告警。
nginx-access-log-*);request_duration:>5(响应时间超过5秒);admin@demo.com;最近1分钟内响应时间超过5秒的Nginx请求数达到{{context.value}}次,请及时排查!;通过模拟响应时间超过5秒的请求(如在Nginx日志中手动添加相关记录),观察是否收到告警邮件,验证告警规则生效。
nginx-access-log-*),避免逐个添加索引。编辑kibana/config/kibana.yml,调整内存配置:
server.maxPayloadBytes: 10485760 # 最大请求 payload 大小(10MB)
elasticsearch.requestTimeout: 30000 # ES 请求超时时间(30秒)
部署多个Kibana节点,通过负载均衡(如Nginx)分发请求,提升并发处理能力。
Java应用(Spring Boot 3.x)输出日志,通过Logstash采集日志并解析,写入ES,最终在Kibana可视化展示;同时实现应用日志的全文检索、异常日志告警。
<dependencies>
<!-- Spring Boot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>3.1.5</version>
</dependency>
<!-- MyBatis-Plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.4</version>
</dependency>
<!-- MySQL驱动 -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.0.33</version>
<scope>runtime</scope>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
<!-- Fastjson2 -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.32</version>
</dependency>
<!-- Swagger3 -->
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>2.1.0</version>
</dependency>
<!-- Spring Boot Test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>3.1.5</version>
<scope>test</scope>
</dependency>
</dependencies>
spring:
application:
name:elk-demo-app
datasource:
url:jdbc:mysql://localhost:3306/demo?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8mb4
username:root
password:root123456
driver-class-name:com.mysql.cj.jdbc.Driver
mybatis-plus:
mapper-locations:classpath:mapper/*.xml
type-aliases-package:com.jam.demo.entity
configuration:
map-underscore-to-camel-case:true# 下划线转驼峰
log-impl:org.apache.ibatis.logging.stdout.StdOutImpl# 打印MyBatis日志
# 日志配置(输出JSON格式日志,便于Logstash解析)
logging:
file:
name:/var/log/elk-demo-app/app.log# 日志文件路径
pattern:
file:'{"timestamp":"%d{yyyy-MM-dd HH:mm:ss.SSS}","level":"%p","thread":"%t","class":"%c{1}","message":"%m","exception":"%ex{full}"}'# JSON格式
level:
root:info
com.jam.demo:debug
# Swagger3配置
springdoc:
api-docs:
path:/api-docs
swagger-ui:
path:/swagger-ui.html
operationsSorter:method
packages-to-scan:com.jam.demo.controller
package com.jam.demo.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 用户实体类
* @author ken
*/
@Data
@TableName("user")
publicclass User {
/**
* 主键ID
*/
@TableId(type = IdType.AUTO)
private Long id;
/**
* 用户名
*/
private String username;
/**
* 年龄
*/
private Integer age;
/**
* 邮箱
*/
private String email;
/**
* 创建时间
*/
private LocalDateTime createTime;
}
package com.jam.demo.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.User;
import org.springframework.stereotype.Repository;
/**
* 用户Mapper接口
* @author ken
*/
@Repository
public interface UserMapper extends BaseMapper<User> {
}
package com.jam.demo.service;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.jam.demo.entity.User;
import com.jam.demo.mapper.UserMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.time.LocalDateTime;
/**
* 用户服务类
* @author ken
*/
@Slf4j
@Service
publicclass UserService extends ServiceImpl<UserMapper, User> {
/**
* 新增用户
* @param user 用户信息
* @return 新增结果(true:成功,false:失败)
*/
public boolean addUser(User user) {
// 参数校验
if (ObjectUtils.isEmpty(user)) {
log.error("新增用户失败:用户信息为空");
returnfalse;
}
if (StringUtils.isEmpty(user.getUsername(), "用户名不能为空")) {
log.error("新增用户失败:用户名不能为空");
returnfalse;
}
// 设置创建时间
user.setCreateTime(LocalDateTime.now());
boolean saveResult = save(user);
if (saveResult) {
log.info("新增用户成功:用户名={},用户ID={}", user.getUsername(), user.getId());
} else {
log.error("新增用户失败:用户名={}", user.getUsername());
}
return saveResult;
}
/**
* 根据ID查询用户
* @param id 用户ID
* @return 用户信息
*/
public User getUserById(Long id) {
if (ObjectUtils.isEmpty(id)) {
log.error("查询用户失败:用户ID为空");
returnnull;
}
User user = getById(id);
if (ObjectUtils.isEmpty(user)) {
log.warn("查询用户不存在:用户ID={}", id);
} else {
log.info("查询用户成功:用户ID={},用户名={}", id, user.getUsername());
}
return user;
}
}
package com.jam.demo.controller;
import com.jam.demo.entity.User;
import com.jam.demo.service.UserService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import org.springframework.util.ObjectUtils;
import javax.annotation.Resource;
/**
* 用户控制器
* @author ken
*/
@Slf4j
@RestController
@RequestMapping("/user")
@Tag(name = "用户管理接口", description = "提供用户新增、查询等接口")
publicclass UserController {
@Resource
private UserService userService;
/**
* 新增用户
* @param user 用户信息
* @return 新增结果
*/
@PostMapping("/add")
@Operation(summary = "新增用户", description = "传入用户信息(用户名、年龄、邮箱),新增用户")
@ApiResponse(responseCode = "200", description = "新增成功", content = @Content(schema = @Schema(implementation = Boolean.class)))
public ResponseEntity<Boolean> addUser(@RequestBody User user) {
boolean result = userService.addUser(user);
returnnew ResponseEntity<>(result, HttpStatus.OK);
}
/**
* 根据ID查询用户
* @param id 用户ID
* @return 用户信息
*/
@GetMapping("/get/{id}")
@Operation(summary = "根据ID查询用户", description = "传入用户ID,查询用户详细信息")
@ApiResponse(responseCode = "200", description = "查询成功", content = @Content(schema = @Schema(implementation = User.class)))
public ResponseEntity<User> getUserById(
@Parameter(description = "用户ID", required = true) @PathVariable Long id
) {
User user = userService.getUserById(id);
returnnew ResponseEntity<>(user, HttpStatus.OK);
}
}
package com.jam.demo;
import com.baomidou.mybatisplus.annotation.DbType;
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
/**
* 应用启动类
* @author ken
*/
@SpringBootApplication
@MapperScan("com.jam.demo.mapper")
publicclass ElkDemoAppApplication {
public static void main(String[] args) {
SpringApplication.run(ElkDemoAppApplication.class, args);
}
/**
* MyBatis-Plus分页插件
* @return 分页拦截器
*/
@Bean
public MybatisPlusInterceptor mybatisPlusInterceptor() {
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
return interceptor;
}
}
创建Logstash配置文件app-log-to-es.conf:
# 输入:读取Java应用日志(JSON格式)
input {
file {
path => ["/var/log/elk-demo-app/app.log"]
start_position => "beginning"
sincedb_path => "/home/elk/logstash/data/sincedb_app"
stat_interval => 1
codec => "json" # 直接解析JSON格式日志,无需Grok
}
}
# 过滤:数据清洗与转换
filter {
# 时间格式标准化(将timestamp字段转为@timestamp)
date {
match => { "timestamp" => "yyyy-MM-dd HH:mm:ss.SSS" }
target => "@timestamp"
timezone => "Asia/Shanghai"
remove_field => ["timestamp"] # 移除原始timestamp字段
}
# 字段加工
mutate {
add_field => { "service_name" => "elk-demo-app" "env" => "prod" }
# 异常日志添加标签
if [level] == "ERROR" {
add_tag => ["error_log"]
}
}
}
# 输出:写入ES(按天生成索引)
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "app-log-%{+YYYY.MM.dd}"
document_id => "%{[@timestamp]}-%{thread}-%{class}"
flush_size => 5000
idle_flush_time => 10
template => "/home/elk/logstash/config/app-template.json"
template_name => "app-log-template"
template_overwrite => true
}
# 调试输出(生产环境可注释)
stdout {
codec => rubydebug
}
}
应用日志索引模板(app-template.json):
{
"index_patterns": ["app-log-*"],
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"level": { "type": "keyword" },
"thread": { "type": "keyword" },
"class": { "type": "keyword" },
"message": { "type": "text", "analyzer": "ik_smart" },
"exception": { "type": "text", "analyzer": "ik_smart" },
"service_name": { "type": "keyword" },
"env": { "type": "keyword" }
}
}
}
mvn clean package -Dmaven.test.skip=true
java -jar target/elk-demo-app-0.0.1-SNAPSHOT.jar
http://localhost:8080/swagger-ui.html,调用/user/add和/user/get/{id}接口,生成应用日志。cd /home/elk/logstash
./bin/logstash -f config/app-log-to-es.conf
app-log-*;level:ERROR可查看异常日志;在Kibana中配置异常日志告警:当1分钟内ERROR级别日志数达到5次时,发送邮件告警,配置步骤参考5.4节,核心筛选条件为level:ERROR。
适用于数据量较大(日日志量100GB+)、高可用要求高的场景,架构如下:

核心组件说明:
适用于数据量较小(日日志量10GB以内)、架构简单的场景,架构如下:

# elasticsearch.yml
cluster.name:elk-prod-cluster
node.name:node-1
path.data:/data/elasticsearch/data
path.logs:/var/log/elasticsearch
network.host:192.168.1.101
http.port:9200
# 集群节点发现
discovery.seed_hosts:["192.168.1.101","192.168.1.102","192.168.1.103"]
# 初始化主节点
cluster.initial_master_nodes:["node-1","node-2","node-3"]
# 启用安全验证(生产环境必须开启)
xpack.security.enabled:true
xpack.security.http.ssl.enabled:true
# 内存锁定(避免ES内存被交换到磁盘)
bootstrap.memory_lock:true
# 最大分片数限制
cluster.routing.allocation.total_shards_per_node:20
# jvm.options
-Xms16g # 堆内存16GB(物理内存32GB时)
-Xmx16g
-XX:+UseG1GC # 使用G1垃圾收集器
-XX:MaxGCPauseMillis=200 # 最大GC暂停时间200ms
# pipelines.yml
-pipeline.id:main
path.config:"/home/elk/logstash/config/*.conf"
pipeline.workers:8# 8核CPU设置8个工作线程
pipeline.batch.size:2000# 每个线程批量处理2000条
pipeline.batch.delay:5# 批量延迟5ms
queue.type:persisted# 启用持久化队列(避免宕机数据丢失)
queue.max_bytes:10gb# 队列最大容量10GB
queue.checkpoint.writes:1000# 每1000条数据 checkpoint 一次
# kibana.yml
server.port:5601
server.host:"192.168.1.104"
elasticsearch.hosts:["https://192.168.1.101:9200","https://192.168.1.102:9200","https://192.168.1.103:9200"]
# 认证配置
elasticsearch.username:"kibana_system"
elasticsearch.password:"kibana123"
# 优化会话超时时间
xpack.security.session.idleTimeout:86400000# 24小时
# 启用监控
xpack.monitoring.enabled:true
xpack.monitoring.ui.container.elasticsearch.enabled:true
# 1. 创建备份仓库
curl -X PUT "https://localhost:9200/_snapshot/elk_backup" -H "Content-Type: application/json" -u elastic:elastic123 -d '
{
"type": "fs",
"settings": {
"location": "/mnt/elk_backup",
"compress": true
}
}'
# 2. 创建快照(备份所有索引)
curl -X PUT "https://localhost:9200/_snapshot/elk_backup/snapshot_$(date +%Y%m%d)" -u elastic:elastic123
# 3. 定时备份(通过crontab)
0 0 * * * curl -X PUT "https://localhost:9200/_snapshot/elk_backup/snapshot_$(date +%Y%m%d)" -u elastic:elastic123
yellow状态(主分片正常,副本分片未分配)
现象:curl http://localhost:9200/_cat/health?v 输出 status=yellow
常见原因:
解决方案:
curl -X PUT "http://localhost:9200/_all/_settings" -H "Content-Type: application/json" -d '
{
"index.number_of_replicas": 0
}'
curl -X GET "http://localhost:9200/_cluster/allocation/explain" -H "Content-Type: application/json" -d '
{
"index": "nginx-access-log-2024.10.01",
"shard": 0,
"primary": false
}'
red状态(主分片未分配,数据丢失风险)
现象:curl http://localhost:9200/_cat/health?v 输出 status=red,部分索引不可用
常见原因:
解决方案:
curl -X POST "http://localhost:9200/_snapshot/elk_backup/snapshot_20241001/_restore" -H "Content-Type: application/json" -d '
{
"indices": "nginx-access-log-2024.10.01",
"ignore_unavailable": true,
"include_global_state": false
}'
curl -X DELETE "http://localhost:9200/nginx-access-log-2024.10.01"
现象:Logstash启动无报错,但ES中查询不到对应索引或数据 常见原因:
解决方案:
查看Logstash日志(关键排查步骤):
tail -f /home/elk/logstash/logs/logstash-plain.log
日志中若出现 grok_parse_failure(Grok匹配失败)、connection refused(连接失败)、permission denied(权限不足)等关键词,可定位具体问题。
验证输入插件:
若为File插件:检查路径是否正确,文件是否存在,Logstash是否有读取权限:
ls -l /var/log/nginx/access.log # 确认文件存在
sudo chmod 644 /var/log/nginx/access.log # 赋予读取权限
若为JDBC插件:检查数据库连接信息,手动执行SQL语句验证是否能查询到数据,确认驱动包已放入logstash/lib目录。
drop插件,在输出插件中添加stdout { codec => rubydebug },查看控制台输出的过滤后数据,确认是否有数据被错误过滤。检查ES地址是否正确,手动访问ES接口验证连通性:
curl http://localhost:9200/_cat/indices?v
若ES开启认证,确认Logstash输出插件中配置了正确的user和password。
现象:Kibana界面报错“Kibana server is not ready yet”或“无法连接到Elasticsearch” 常见原因:
elasticsearch.hosts与ES实际地址不匹配;解决方案:
检查Kibana配置文件:
cat /home/elk/kibana/config/kibana.yml | grep elasticsearch.hosts
确认配置的ES地址正确(如http://192.168.1.101:9200),若ES集群部署,需配置所有节点地址。
验证ES认证信息: 若ES开启安全验证,确认Kibana配置文件中配置了正确的认证信息:
elasticsearch.username: "kibana_system" # Kibana默认服务用户
elasticsearch.password: "kibana123" # 对应密码
可通过以下命令验证用户名/密码是否正确:
curl -u kibana_system:kibana123 http://localhost:9200/_cluster/health
检查网络连通性: 在Kibana节点上执行以下命令,验证是否能访问ES的9200端口:
telnet 192.168.1.101 9200 # 替换为ES节点IP和端口
若无法连接,检查防火墙规则,开放9200端口:
sudo firewall-cmd --permanent --add-port=9200/tcp
sudo firewall-cmd --reload
检查ES集群健康状态: 在ES节点上执行以下命令,确认ES集群状态为green:
curl -u elastic:elastic123 http://localhost:9200/_cat/health?v
若为red/yellow,先解决ES集群问题(参考7.4.1节)。
现象:Kibana检索数据时耗时超过10秒,或报错“query_shard_exception”“timeout” 常见原因:
解决方案:
生产环境建议每个主分片大小控制在50-100GB,单节点主分片数量不超过3个;
对于已创建的索引,可通过_reindex重新分配分片:
curl -X POST "http://localhost:9200/_reindex" -H "Content-Type: application/json" -d '
{
"source": { "index": "old-index" },
"dest": { "index": "new-index", "settings": { "number_of_shards": 3, "number_of_replicas": 1 } }
}'
keyword类型而非text;index: false;避免深度分页:使用search_after替代from+size(适用于滚动查询):
{
"query": { "match": { "service_name": "nginx" } },
"sort": [ { "@timestamp": "asc" } ],
"search_after": [ "2024-10-01T00:00:00Z" ], # 上一页最后一条数据的sort值
"size": 100
}
限制查询时间范围:在查询中添加range条件,只查询最近的数据;
过滤条件前置:将过滤条件(如状态码、服务名)放在bool.filter中,利用ES缓存提升性能。
elasticsearch/config/jvm.options,将堆内存调整为物理内存的50%(不超过32GB);top(CPU)、iostat -x 1(磁盘IO)命令,确认是否存在资源瓶颈,若存在,升级硬件或迁移节点。nginx-access-log-2024.10.01),查询时只指定需要的时间范围索引,减少数据扫描量;现象:应用节点日志文件有数据,但Logstash/ES中无对应数据 常见原因:
解决方案:
查看Filebeat日志:
tail -f /var/log/filebeat/filebeat.log
日志中若出现 connection refused(连接失败)、file not found(文件未找到)等关键词,可定位问题。
验证Filebeat配置:
检查输出地址(Logstash/Kafka)是否正确,日志路径是否包含通配符(如/var/log/app/*.log):
# filebeat.yml 核心配置
filebeat.inputs:
- type: log
paths:
- /var/log/app/*.log # 正确的日志路径
output.logstash:
hosts: ["192.168.1.105:5044"] # 正确的Logstash地址(5044为Filebeat输入插件默认端口)
开启Filebeat持久化队列(避免宕机数据丢失):
queue.mem:
events: 4096 # 内存队列大小
queue.disk:
path: /var/lib/filebeat/queue # 磁盘队列路径
max_size: 10GB # 磁盘队列最大容量
处理日志切割问题:
确保应用日志切割时使用“创建新文件+重命名旧文件”的方式(如app.log→app.log.20241001),Filebeat默认支持这种切割方式;若使用“清空文件”方式,需配置Filebeat跟踪文件inode:
filebeat.inputs:
-type:log
paths:
-/var/log/app/app.log
harvester_limit:1000
close_inactive:5m
scan_frequency:10s
tail_files:false
本文从ELK Stack的核心概念入手,逐步深入讲解了环境搭建、各组件核心原理与实战操作,最终落地到Java应用整合与生产环境部署。通过本文的学习,你应掌握以下核心知识点:
所有示例均基于最新稳定版本(8.15.0)编写,经过JDK 17、MySQL 8.0环境验证,可直接应用于实际项目。
通过本文的基础学习与进阶探索,相信你能将ELK Stack灵活应用于日志分析、业务检索、运维监控等多种场景,为企业数据驱动决策提供有力支撑。