Elasticsearch核心概念与Java客户端实战 构建高性能搜索服务

Elasticsearch核心概念与Java客户端实战 构建高性能搜索服务

目录

🎯 先说说我被ES"虐惨"的经历

✨ 摘要

1. 为什么选择Elasticsearch?

1.1 从数据库的痛苦说起

1.2 Elasticsearch的优势

2. ES核心架构解析

2.1 集群架构

2.2 索引与分片

3. Java客户端实战

3.1 客户端选型对比

3.2 RestHighLevelClient配置

3.3 Spring Data Elasticsearch配置

4. 索引设计最佳实践

4.1 索引生命周期管理

4.2 映射设计技巧

5. 查询优化实战

5.1 查询类型对比

5.2 性能优化技巧

6. 批量操作与实时性

6.1 Bulk批量操作

6.2 实时性控制

7. 企业级实战案例

7.1 电商商品搜索系统

7.2 日志分析系统

8. 性能优化与监控

8.1 性能调优

8.2 监控告警

9. 故障排查指南

9.1 常见问题排查

10. 选型与总结

10.1 ES vs 其他方案对比

10.2 我的"ES军规"

11. 最后的话

📚 推荐阅读

官方文档

源码学习

最佳实践

监控工具


🎯 先说说我被ES"虐惨"的经历

我们第一次在电商系统用ES做商品搜索,上线第一天就崩了。用户搜"手机",结果返回了"手纸",分词器配错了。更绝的是,有次大促,ES集群CPU 100%,排查发现是有人用了wildcard查询:"手机"。

去年搞日志系统,用ES存日志,一天几个TB,结果磁盘报警。发现是分片数设错了,一个索引200个分片,集群管理开销巨大。

上个月做实时推荐,用ES做向量搜索,结果发现Java High Level Client内存泄漏,查了三天是BulkProcessor没正确关闭。

这些事让我明白:不懂ES原理的程序员,就是在用搜索引擎埋雷,早晚要炸

✨ 摘要

Elasticsearch是基于Lucene的分布式搜索引擎,通过倒排索引实现毫秒级检索。本文深度解析ES集群架构、分片原理、查询优化机制,揭秘Java客户端的最佳实践。通过完整电商搜索实战,对比不同查询方式的性能差异,提供索引设计、查询优化、集群监控等核心问题的解决方案。包含企业级配置模板、性能调优数据和故障排查手册。

1. 为什么选择Elasticsearch?

1.1 从数据库的痛苦说起

先看个MySQL做搜索的典型问题:

-- MySQL模糊查询 SELECT * FROM products WHERE name LIKE '%手机%' OR description LIKE '%手机%' OR tags LIKE '%手机%' ORDER BY create_time DESC LIMIT 100 OFFSET 0;

代码清单1:MySQL模糊查询

用图表示这个问题:

图1:MySQL搜索的问题

MySQL搜索的痛点

  1. LIKE '%xxx%'导致全表扫描
  2. 多字段OR查询性能极差
  3. 无法支持复杂评分排序
  4. 分词、同义词、拼音搜索不支持

1.2 Elasticsearch的优势

ES的倒排索引(Inverted Index)是核心:

// 倒排索引结构示例 public class InvertedIndex { // 词项 -> 文档列表 Map<String, List<Posting>> index = new HashMap<>(); // 文档1: "华为手机很好用" // 文档2: "小米手机性价比高" // 倒排索引: // "华为" -> [文档1] // "手机" -> [文档1, 文档2] // "小米" -> [文档2] // "很好用" -> [文档1] // "性价比" -> [文档2] }

代码清单2:倒排索引原理

搜索过程对比:

图2:MySQL vs ES搜索流程对比

性能对比测试(1000万商品数据):

场景

MySQL

Elasticsearch

性能差距

单字段模糊查询

3200ms

45ms

71倍

多字段OR查询

8500ms

65ms

130倍

复杂条件+排序

12000ms

85ms

141倍

内存占用

4.2GB

1.8GB

57%

2. ES核心架构解析

2.1 集群架构

图3:ES集群架构

节点类型

  • 主节点(Master):管理集群状态、分片分配
  • 数据节点(Data):存储数据、执行CRUD
  • 协调节点(Coordinating):路由请求、聚合结果
  • 摄取节点(Ingest):数据预处理

2.2 索引与分片

// 索引创建示例 @Configuration public class IndexConfig { public void createProductIndex(RestHighLevelClient client) throws IOException { CreateIndexRequest request = new CreateIndexRequest("products"); // 索引设置 request.settings(Settings.builder() .put("index.number_of_shards", 3) // 主分片数 .put("index.number_of_replicas", 1) // 副本数 .put("index.refresh_interval", "1s") // 刷新间隔 .put("analysis.analyzer.default.type", "ik_max_word")); // 分词器 // Mapping定义 XContentBuilder mapping = JsonXContent.contentBuilder() .startObject() .startObject("properties") .startObject("id") .field("type", "keyword") .endObject() .startObject("name") .field("type", "text") .field("analyzer", "ik_max_word") .field("search_analyzer", "ik_smart") .field("fields", JsonXContent.contentBuilder() .startObject() .startObject("pinyin") .field("type", "text") .field("analyzer", "pinyin_analyzer") .endObject() .startObject("keyword") .field("type", "keyword") .field("ignore_above", 256) .endObject() .endObject()) .endObject() .startObject("price") .field("type", "double") .endObject() .startObject("category_id") .field("type", "integer") .endObject() .startObject("sales") .field("type", "integer") .endObject() .startObject("create_time") .field("type", "date") .field("format", "yyyy-MM-dd HH:mm:ss||epoch_millis") .endObject() .endObject() .endObject(); request.mapping(mapping); CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT); if (response.isAcknowledged()) { log.info("索引创建成功"); } } }

代码清单3:索引创建配置

分片原理

图4:分片与副本分布

分片设计原则

  1. 单个分片不超过50GB
  2. 分片数 = 数据总量 / 50GB
  3. 副本数 = 节点数 - 1(至少1个)
  4. 避免过度分片(每个分片有开销)

3. Java客户端实战

3.1 客户端选型对比

客户端

优点

缺点

推荐场景

RestHighLevelClient

官方维护,功能全

笨重,API复杂

新项目,需要完整功能

Java Low Level Client

轻量,灵活

需要手动处理JSON

简单查询,性能敏感

Spring Data Elasticsearch

简洁,集成Spring

版本兼容问题

Spring Boot项目

JestClient

简单易用

已停止维护

不推荐新项目

我们的选择:新项目用RestHighLevelClient,Spring Boot项目用Spring Data Elasticsearch。

3.2 RestHighLevelClient配置

@Configuration @Slf4j public class ElasticsearchConfig { @Value("${elasticsearch.hosts:localhost:9200}") private String hosts; @Value("${elasticsearch.username:}") private String username; @Value("${elasticsearch.password:}") private String password; @Bean public RestHighLevelClient restHighLevelClient() { // 解析主机列表 String[] hostArray = hosts.split(","); HttpHost[] httpHosts = new HttpHost[hostArray.length]; for (int i = 0; i < hostArray.length; i++) { String[] hostPort = hostArray[i].split(":"); httpHosts[i] = new HttpHost( hostPort[0].trim(), Integer.parseInt(hostPort[1].trim()), "http"); } // 配置Builder RestClientBuilder builder = RestClient.builder(httpHosts) .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder .setConnectTimeout(5000) // 连接超时5秒 .setSocketTimeout(60000) // Socket超时60秒 .setConnectionRequestTimeout(1000)) // 请求超时1秒 .setHttpClientConfigCallback(httpClientBuilder -> { // 连接池配置 httpClientBuilder.setMaxConnTotal(100) // 最大连接数 .setMaxConnPerRoute(50) // 每路由最大连接数 .setKeepAliveStrategy((response, context) -> 60000); // 保活60秒 // 认证配置 if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(password)) { CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials( AuthScope.ANY, new UsernamePasswordCredentials(username, password)); httpClientBuilder.setDefaultCredentialsProvider( credentialsProvider); } return httpClientBuilder; }) .setFailureListener(new RestClient.FailureListener() { @Override public void onFailure(Node node) { log.error("ES节点连接失败: {}", node.getHost()); } }); return new RestHighLevelClient(builder); } // 健康检查 @Scheduled(fixedDelay = 30000) public void checkClusterHealth() { try { ClusterHealthRequest request = new ClusterHealthRequest(); ClusterHealthResponse response = restHighLevelClient().cluster().health(request, RequestOptions.DEFAULT); ClusterHealthStatus status = response.getStatus(); if (status != ClusterHealthStatus.GREEN) { log.warn("ES集群状态异常: {}, 详情: {}", status, response.toString()); // 发送告警 alertService.sendAlert("ES集群状态异常", "状态: " + status + ", 详情: " + response.toString()); } } catch (Exception e) { log.error("ES集群健康检查失败", e); } } }

代码清单4:ES客户端配置

3.3 Spring Data Elasticsearch配置

@Configuration @EnableElasticsearchRepositories(basePackages = "com.example.repository") @Slf4j public class SpringDataESConfig { @Value("${elasticsearch.hosts:localhost:9200}") private String hosts; @Bean public RestHighLevelClient elasticsearchClient() { // 同上面配置 return restHighLevelClient(); } @Bean public ElasticsearchRestTemplate elasticsearchTemplate() { return new ElasticsearchRestTemplate(elasticsearchClient()); } // 实体类映射 @Document(indexName = "products", createIndex = false) @Setting(settingPath = "/settings/product-settings.json") @Mapping(mappingPath = "/mappings/product-mapping.json") @Data @NoArgsConstructor @AllArgsConstructor public class Product { @Id private String id; @Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart") @MultiField(mainField = @Field(type = FieldType.Text, analyzer = "ik_max_word"), otherFields = { @InnerField(suffix = "pinyin", type = FieldType.Text, analyzer = "pinyin"), @InnerField(suffix = "keyword", type = FieldType.Keyword) }) private String name; @Field(type = FieldType.Double) private Double price; @Field(type = FieldType.Integer) private Integer categoryId; @Field(type = FieldType.Integer) private Integer sales; @Field(type = FieldType.Date, format = DateFormat.date_hour_minute_second) private Date createTime; // Geo字段 @GeoPointField private GeoPoint location; } // Repository接口 public interface ProductRepository extends ElasticsearchRepository<Product, String> { // 方法名查询 List<Product> findByName(String name); List<Product> findByPriceBetween(Double minPrice, Double maxPrice); Page<Product> findByCategoryId(Integer categoryId, Pageable pageable); // @Query注解 @Query("{\"match\": {\"name\": \"?0\"}}") List<Product> findByNameCustom(String name); // 原生查询 @Query("{\"bool\": {\"must\": [" + "{\"match\": {\"name\": \"?0\"}}," + "{\"range\": {\"price\": {\"gte\": ?1, \"lte\": ?2}}}" + "]}}") List<Product> searchByNameAndPriceRange(String name, Double minPrice, Double maxPrice); } }

代码清单5:Spring Data ES配置

4. 索引设计最佳实践

4.1 索引生命周期管理

@Component @Slf4j public class IndexLifecycleManager { // 1. 按时间滚动索引 public String getDailyIndex(String indexPrefix) { String date = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy.MM.dd")); return indexPrefix + "-" + date; } // 2. 创建带别名的索引 public void createIndexWithAlias(String indexName, String alias) throws IOException { CreateIndexRequest request = new CreateIndexRequest(indexName); // 索引设置 request.settings(getIndexSettings()); // 创建索引 CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT); if (response.isAcknowledged()) { // 添加别名 IndicesAliasesRequest aliasRequest = new IndicesAliasesRequest(); AliasActions aliasAction = new AliasActions( AliasActions.Type.ADD) .index(indexName) .alias(alias); aliasRequest.addAliasAction(aliasAction); client.indices().updateAliases(aliasRequest, RequestOptions.DEFAULT); log.info("创建索引成功: {},别名: {}", indexName, alias); } } // 3. 索引滚动策略 @Scheduled(cron = "0 0 0 * * ?") // 每天零点执行 public void rolloverIndex() throws IOException { String todayIndex = getDailyIndex("logs"); String alias = "logs-current"; // 检查别名指向的索引 GetAliasesRequest request = new GetAliasesRequest(alias); GetAliasesResponse response = client.indices().getAlias(request, RequestOptions.DEFAULT); Map<String, Set<AliasMetadata>> aliases = response.getAliases(); if (aliases.isEmpty()) { // 第一次创建 createIndexWithAlias(todayIndex, alias); } else { // 检查当前索引大小 String currentIndex = aliases.keySet().iterator().next(); IndexStats stats = getIndexStats(currentIndex); // 如果超过50GB或创建超过7天,创建新索引 if (stats.getStoreSizeInBytes() > 50L * 1024 * 1024 * 1024 || isIndexOlderThan(currentIndex, 7)) { // 创建新索引 createIndexWithAlias(todayIndex, alias); // 切换别名 switchAlias(alias, currentIndex, todayIndex); // 关闭旧索引 closeOldIndex(currentIndex); } } } // 4. 索引关闭和删除 public void manageIndexLifecycle() throws IOException { // 关闭30天前的索引 closeIndicesOlderThan(30); // 删除90天前的索引 deleteIndicesOlderThan(90); // 强制合并segments forceMergeIndicesOlderThan(7); } // 5. 索引模板 public void createIndexTemplate() throws IOException { PutIndexTemplateRequest request = new PutIndexTemplateRequest( "logs-template"); request.patterns(Arrays.asList("logs-*")); request.settings(getIndexSettings()); request.mapping(getIndexMapping()); // 别名配置 request.alias(new Alias("logs-all")); // 优先级 request.order(1); // 版本 request.version(1); PutIndexTemplateResponse response = client.indices() .putTemplate(request, RequestOptions.DEFAULT); if (response.isAcknowledged()) { log.info("索引模板创建成功"); } } }

代码清单6:索引生命周期管理

4.2 映射设计技巧

{ "mappings": { "dynamic": "strict", // 禁止动态字段 "_source": { "enabled": true, "excludes": ["big_field"] }, "properties": { "id": { "type": "keyword", "ignore_above": 256 }, "title": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_smart", "fields": { "pinyin": { "type": "text", "analyzer": "pinyin_analyzer" }, "keyword": { "type": "keyword", "ignore_above": 256 }, "completion": { "type": "completion", "analyzer": "simple" } } }, "price": { "type": "scaled_float", "scaling_factor": 100 }, "location": { "type": "geo_point" }, "tags": { "type": "text", "analyzer": "whitespace" }, "attributes": { "type": "nested", // 嵌套类型 "properties": { "key": { "type": "keyword" }, "value": { "type": "keyword" } } }, "vector": { "type": "dense_vector", // 向量字段 "dims": 128 } } } }

代码清单7:映射设计示例

5. 查询优化实战

5.1 查询类型对比

@Service @Slf4j public class ProductSearchService { // 1. Match查询(最常用) public SearchResponse searchByMatch(String keyword) throws IOException { SearchRequest request = new SearchRequest("products"); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(QueryBuilders.matchQuery("name", keyword) .analyzer("ik_smart") // 指定搜索分词器 .operator(Operator.AND) // 必须包含所有词 .minimumShouldMatch("75%")); // 至少匹配75% sourceBuilder.from(0); sourceBuilder.size(20); sourceBuilder.sort(SortBuilders.scoreSort()); request.source(sourceBuilder); return client.search(request, RequestOptions.DEFAULT); } // 2. MultiMatch查询(多字段) public SearchResponse searchByMultiMatch(String keyword) throws IOException { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(QueryBuilders.multiMatchQuery(keyword) .fields("name^3", "name.pinyin^2", "description") // 权重设置 .type(MultiMatchQueryBuilder.Type.BEST_FIELDS) // 最佳字段策略 .tieBreaker(0.3f)); // 其他字段权重 return executeSearch(sourceBuilder); } // 3. Term查询(精确匹配) public SearchResponse searchByTerm(String category) throws IOException { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(QueryBuilders.termQuery("category.keyword", category)); return executeSearch(sourceBuilder); } // 4. Bool查询(组合查询) public SearchResponse searchByBool(String keyword, Double minPrice, Double maxPrice, String category) throws IOException { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); // MUST: 必须匹配 if (StringUtils.isNotBlank(keyword)) { boolQuery.must(QueryBuilders.matchQuery("name", keyword)); } // FILTER: 过滤,不计算分数 if (minPrice != null || maxPrice != null) { RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("price"); if (minPrice != null) { rangeQuery.gte(minPrice); } if (maxPrice != null) { rangeQuery.lte(maxPrice); } boolQuery.filter(rangeQuery); } // SHOULD: 应该匹配(影响分数) if (StringUtils.isNotBlank(category)) { boolQuery.should(QueryBuilders.termQuery("category.keyword", category)) .minimumShouldMatch(1); } // MUST_NOT: 必须不匹配 boolQuery.mustNot(QueryBuilders.termQuery("status", "deleted")); sourceBuilder.query(boolQuery); return executeSearch(sourceBuilder); } // 5. 聚合查询 public SearchResponse searchWithAggregation() throws IOException { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 查询所有 sourceBuilder.query(QueryBuilders.matchAllQuery()); sourceBuilder.size(0); // 不返回hits,只返回聚合 // 价格区间聚合 AggregationBuilder priceAgg = AggregationBuilders .range("price_ranges") .field("price") .addRange(0, 100) .addRange(100, 500) .addRange(500, 1000) .addRange(1000, 5000) .addRange(5000, Double.MAX_VALUE); // 品类聚合 AggregationBuilder categoryAgg = AggregationBuilders .terms("categories") .field("category.keyword") .size(10); // 嵌套聚合 AggregationBuilder nestedAgg = AggregationBuilders .nested("attributes", "attributes") .subAggregation(AggregationBuilders .terms("attr_keys") .field("attributes.key") .size(10)); sourceBuilder.aggregation(priceAgg); sourceBuilder.aggregation(categoryAgg); sourceBuilder.aggregation(nestedAgg); return executeSearch(sourceBuilder); } // 6. 搜索建议 public SuggestResponse searchSuggest(String prefix) throws IOException { SearchRequest request = new SearchRequest("products"); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.size(0); // Completion Suggester CompletionSuggestionBuilder suggestion = SuggestBuilders.completionSuggestion("name.completion") .prefix(prefix, Fuzziness.AUTO) .skipDuplicates(true) .size(10); SuggestBuilder suggestBuilder = new SuggestBuilder(); suggestBuilder.addSuggestion("product_suggest", suggestion); sourceBuilder.suggest(suggestBuilder); request.source(sourceBuilder); SearchResponse response = client.search(request, RequestOptions.DEFAULT); return response.getSuggest(); } }

代码清单8:各种查询方式实现

5.2 性能优化技巧

@Component @Slf4j public class QueryOptimizer { // 1. 分页优化 public SearchResponse searchWithPagination(String keyword, int page, int size) throws IOException { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 常规分页(深度分页性能差) // sourceBuilder.from((page - 1) * size).size(size); // 推荐:Search After分页 if (page > 1) { // 获取上一页最后一条的sort值 Object[] searchAfter = getSearchAfterValues(page - 1, size); sourceBuilder.searchAfter(searchAfter); } sourceBuilder.size(size); sourceBuilder.sort(SortBuilders.fieldSort("_id")); // 必须有排序 return executeSearch(sourceBuilder); } // 2. 查询重写 public SearchResponse optimizedSearch(String keyword) throws IOException { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 原来的wildcard查询(性能极差) // QueryBuilders.wildcardQuery("name", "*" + keyword + "*"); // 优化方案1:使用ngram QueryBuilders.matchQuery("name.ngram", keyword); // 优化方案2:使用前缀查询 if (keyword.length() < 10) { QueryBuilders.prefixQuery("name.keyword", keyword); } // 优化方案3:使用edge ngram QueryBuilders.matchQuery("name.edge_ngram", keyword); return executeSearch(sourceBuilder); } // 3. 过滤器缓存 public SearchResponse searchWithFilterCache() throws IOException { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); // 经常过滤的条件用filter,可以被缓存 boolQuery.filter(QueryBuilders.termQuery("status", "active")); boolQuery.filter(QueryBuilders.rangeQuery("price") .gte(100).lte(1000)); // 搜索条件 boolQuery.must(QueryBuilders.matchQuery("name", "手机")); sourceBuilder.query(boolQuery); return executeSearch(sourceBuilder); } // 4. 字段数据加载 public SearchResponse searchWithFieldData() throws IOException { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 避免在text字段上排序 // sourceBuilder.sort(SortBuilders.fieldSort("name")); // 错误 // 使用keyword子字段排序 sourceBuilder.sort(SortBuilders.fieldSort("name.keyword")); // 或者使用doc values sourceBuilder.docValueField("name.keyword"); return executeSearch(sourceBuilder); } // 5. 查询超时设置 public SearchResponse searchWithTimeout(String keyword, long timeoutMs) throws IOException { SearchRequest request = new SearchRequest("products"); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.timeout(new TimeValue(timeoutMs, TimeUnit.MILLISECONDS)); sourceBuilder.query(QueryBuilders.matchQuery("name", keyword)); request.source(sourceBuilder); // 设置请求超时 RequestOptions.Builder options = RequestOptions.DEFAULT.toBuilder(); options.setHttpAsyncResponseConsumerFactory( new HttpAsyncResponseConsumerFactory .HeapBufferedResponseConsumerFactory(30 * 1024 * 1024)); return client.search(request, options.build()); } // 6. 并行查询 public List<SearchResponse> parallelSearch(List<String> keywords) throws InterruptedException, ExecutionException { List<CompletableFuture<SearchResponse>> futures = new ArrayList<>(); for (String keyword : keywords) { CompletableFuture<SearchResponse> future = CompletableFuture.supplyAsync(() -> { try { return searchByMatch(keyword); } catch (IOException e) { throw new RuntimeException(e); } }, executor); futures.add(future); } // 等待所有查询完成 CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); allFutures.get(); // 等待完成 return futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); } }

代码清单9:查询性能优化

6. 批量操作与实时性

6.1 Bulk批量操作

@Component @Slf4j public class BulkOperationService { // 1. 简单的Bulk操作 public void bulkIndexProducts(List<Product> products) throws IOException { BulkRequest request = new BulkRequest(); for (Product product : products) { IndexRequest indexRequest = new IndexRequest("products") .id(product.getId()) .source(JsonUtils.toJson(product), XContentType.JSON); request.add(indexRequest); } BulkResponse response = client.bulk(request, RequestOptions.DEFAULT); if (response.hasFailures()) { log.error("Bulk操作失败: {}", response.buildFailureMessage()); // 处理失败项 for (BulkItemResponse item : response.getItems()) { if (item.isFailed()) { log.error("文档{}失败: {}", item.getId(), item.getFailureMessage()); // 重试逻辑 retryFailedItem(item); } } } else { log.info("Bulk操作成功,处理{}个文档", products.size()); } } // 2. 带回调的Bulk public void bulkWithListener(List<Product> products) { BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { log.info("开始执行Bulk,包含{}个请求", request.numberOfActions()); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { log.error("Bulk执行失败: {}", response.buildFailureMessage()); } else { log.info("Bulk执行成功,耗时{}ms", response.getTook().getMillis()); } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { log.error("Bulk执行异常", failure); } }; BulkProcessor.Builder builder = BulkProcessor.builder( (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener); // 配置 builder.setBulkActions(1000) // 每1000个请求执行一次 .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) // 每5MB执行一次 .setFlushInterval(TimeValue.timeValueSeconds(5)) // 每5秒执行一次 .setConcurrentRequests(2) // 并发数 .setBackoffPolicy(BackoffPolicy .exponentialBackoff(TimeValue.timeValueMillis(100), 3)); BulkProcessor processor = builder.build(); // 添加文档 for (Product product : products) { IndexRequest request = new IndexRequest("products") .id(product.getId()) .source(JsonUtils.toJson(product), XContentType.JSON); processor.add(request); } // 关闭processor processor.awaitClose(30, TimeUnit.SECONDS); } // 3. 更新操作 public void bulkUpdateProducts(List<Product> products) throws IOException { BulkRequest request = new BulkRequest(); for (Product product : products) { UpdateRequest updateRequest = new UpdateRequest("products", product.getId()) .doc(JsonUtils.toJson(product), XContentType.JSON) .docAsUpsert(true); // 如果不存在则插入 request.add(updateRequest); } client.bulk(request, RequestOptions.DEFAULT); } // 4. 删除操作 public void bulkDeleteProducts(List<String> ids) throws IOException { BulkRequest request = new BulkRequest(); for (String id : ids) { DeleteRequest deleteRequest = new DeleteRequest("products", id); request.add(deleteRequest); } client.bulk(request, RequestOptions.DEFAULT); } // 5. 批量查询 public MultiGetResponse bulkGetProducts(List<String> ids) throws IOException { MultiGetRequest request = new MultiGetRequest(); for (String id : ids) { request.add(new MultiGetRequest.Item("products", id)); } return client.mget(request, RequestOptions.DEFAULT); } }

代码清单10:批量操作实现

6.2 实时性控制

@Component @Slf4j public class RealtimeControlService { // 1. 立即刷新 public void indexWithRefresh(Product product) throws IOException { IndexRequest request = new IndexRequest("products") .id(product.getId()) .source(JsonUtils.toJson(product), XContentType.JSON) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); // 立即刷新 client.index(request, RequestOptions.DEFAULT); } // 2. 等待刷新 public void indexWithWaitFor(Product product) throws IOException { IndexRequest request = new IndexRequest("products") .id(product.getId()) .source(JsonUtils.toJson(product), XContentType.JSON) .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); // 等待刷新 client.index(request, RequestOptions.DEFAULT); } // 3. 手动刷新 public void manualRefresh() throws IOException { RefreshRequest request = new RefreshRequest("products"); RefreshResponse response = client.indices().refresh(request, RequestOptions.DEFAULT); log.info("手动刷新完成,分片数: {}", response.getTotalShards()); } // 4. 强制合并 public void forceMerge() throws IOException { ForceMergeRequest request = new ForceMergeRequest("products"); request.maxNumSegments(1); // 合并为1个segment request.onlyExpungeDeletes(true); // 只清理删除的文档 ForceMergeResponse response = client.indices().forcemerge(request, RequestOptions.DEFAULT); log.info("强制合并完成,分片数: {}", response.getTotalShards()); } // 5. 刷新间隔设置 public void updateRefreshInterval(int seconds) throws IOException { UpdateSettingsRequest request = new UpdateSettingsRequest("products"); Settings settings = Settings.builder() .put("index.refresh_interval", seconds + "s") .build(); request.settings(settings); AcknowledgedResponse response = client.indices() .putSettings(request, RequestOptions.DEFAULT); if (response.isAcknowledged()) { log.info("更新刷新间隔成功: {}秒", seconds); } } // 6. 实时搜索方案 public SearchResponse realtimeSearch(String keyword) throws IOException { // 先搜索 SearchResponse response = searchByMatch(keyword); // 如果没结果,等待并重试 if (response.getHits().getTotalHits().value == 0) { try { Thread.sleep(1000); // 等待1秒 response = searchByMatch(keyword); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } return response; } }

代码清单11:实时性控制

7. 企业级实战案例

7.1 电商商品搜索系统

@Service @Slf4j public class ECommerceSearchService { // 商品搜索 public SearchResult<Product> searchProducts(ProductSearchRequest request) throws IOException { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 构建Bool查询 BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); // 关键词搜索 if (StringUtils.isNotBlank(request.getKeyword())) { handleKeywordSearch(boolQuery, request.getKeyword()); } // 分类过滤 if (request.getCategoryId() != null) { boolQuery.filter(QueryBuilders.termQuery("category_id", request.getCategoryId())); } // 价格区间 if (request.getMinPrice() != null || request.getMaxPrice() != null) { RangeQueryBuilder priceRange = QueryBuilders.rangeQuery("price"); if (request.getMinPrice() != null) { priceRange.gte(request.getMinPrice()); } if (request.getMaxPrice() != null) { priceRange.lte(request.getMaxPrice()); } boolQuery.filter(priceRange); } // 品牌过滤 if (CollectionUtils.isNotEmpty(request.getBrandIds())) { boolQuery.filter(QueryBuilders.termsQuery("brand_id", request.getBrandIds())); } // 属性过滤 if (CollectionUtils.isNotEmpty(request.getAttributes())) { handleAttributesFilter(boolQuery, request.getAttributes()); } // 地理位置过滤 if (request.getLocation() != null && request.getDistance() != null) { handleGeoFilter(boolQuery, request.getLocation(), request.getDistance()); } sourceBuilder.query(boolQuery); // 排序 handleSorting(sourceBuilder, request.getSortBy(), request.getSortOrder()); // 分页 sourceBuilder.from((request.getPage() - 1) * request.getSize()) .size(request.getSize()); // 高亮 if (StringUtils.isNotBlank(request.getKeyword())) { HighlightBuilder highlightBuilder = new HighlightBuilder(); highlightBuilder.field("name") .preTags("<em>") .postTags("</em>") .fragmentSize(200) .numOfFragments(3); sourceBuilder.highlighter(highlightBuilder); } // 聚合 addAggregations(sourceBuilder); // 执行搜索 SearchResponse response = executeSearch(sourceBuilder); // 处理结果 return processSearchResult(response, request); } private void handleKeywordSearch(BoolQueryBuilder boolQuery, String keyword) { // 多种搜索方式组合 BoolQueryBuilder keywordQuery = QueryBuilders.boolQuery(); // 1. 精确匹配(最高权重) keywordQuery.should(QueryBuilders.matchPhraseQuery("name", keyword) .boost(3.0f)); // 2. 拼音匹配 keywordQuery.should(QueryBuilders.matchQuery("name.pinyin", keyword) .boost(2.0f)); // 3. 分词匹配 keywordQuery.should(QueryBuilders.matchQuery("name", keyword) .boost(1.0f)); // 4. 描述匹配 keywordQuery.should(QueryBuilders.matchQuery("description", keyword) .boost(0.5f)); // 5. 标签匹配 keywordQuery.should(QueryBuilders.matchQuery("tags", keyword) .boost(0.3f)); boolQuery.must(keywordQuery); } private void handleAttributesFilter(BoolQueryBuilder boolQuery, List<AttributeFilter> attributes) { for (AttributeFilter attr : attributes) { NestedQueryBuilder nestedQuery = QueryBuilders.nestedQuery( "attributes", QueryBuilders.boolQuery() .must(QueryBuilders.termQuery("attributes.key", attr.getKey())) .must(QueryBuilders.termsQuery("attributes.value", attr.getValues())), ScoreMode.None); boolQuery.filter(nestedQuery); } } private void handleGeoFilter(BoolQueryBuilder boolQuery, GeoPoint location, String distance) { GeoDistanceQueryBuilder geoQuery = QueryBuilders.geoDistanceQuery( "location") .point(location.getLat(), location.getLon()) .distance(distance); boolQuery.filter(geoQuery); } private void handleSorting(SearchSourceBuilder sourceBuilder, String sortBy, String sortOrder) { if ("price".equals(sortBy)) { sourceBuilder.sort("price", "desc".equals(sortOrder) ? SortOrder.DESC : SortOrder.ASC); } else if ("sales".equals(sortBy)) { sourceBuilder.sort("sales", SortOrder.DESC); } else if ("score".equals(sortBy)) { sourceBuilder.sort(SortBuilders.scoreSort()); } else { // 综合排序:分数*0.6 + 销量*0.3 + 价格*0.1 Script script = new Script( "doc['_score'].value * 0.6 + " + "doc['sales'].value * 0.3 + " + "(10000 - doc['price'].value) * 0.1"); sourceBuilder.sort(SortBuilders.scriptSort(script, ScriptSortBuilder.ScriptSortType.NUMBER)); } } private void addAggregations(SearchSourceBuilder sourceBuilder) { // 价格区间聚合 AggregationBuilder priceAgg = AggregationBuilders .range("price_agg") .field("price") .addRange(0, 100) .addRange(100, 300) .addRange(300, 500) .addRange(500, 1000) .addRange(1000, 5000); // 品牌聚合 AggregationBuilder brandAgg = AggregationBuilders .terms("brand_agg") .field("brand_id") .size(20); // 分类聚合 AggregationBuilder categoryAgg = AggregationBuilders .terms("category_agg") .field("category_id") .size(10); // 属性聚合 AggregationBuilder attributeAgg = AggregationBuilders .nested("attributes_agg", "attributes") .subAggregation(AggregationBuilders .terms("attribute_keys") .field("attributes.key") .size(10) .subAggregation(AggregationBuilders .terms("attribute_values") .field("attributes.value") .size(10))); sourceBuilder.aggregation(priceAgg); sourceBuilder.aggregation(brandAgg); sourceBuilder.aggregation(categoryAgg); sourceBuilder.aggregation(attributeAgg); } private SearchResult<Product> processSearchResult(SearchResponse response, ProductSearchRequest request) { SearchResult<Product> result = new SearchResult<>(); // 总条数 result.setTotal(response.getHits().getTotalHits().value); // 当前页数据 List<Product> products = new ArrayList<>(); for (SearchHit hit : response.getHits()) { Product product = JsonUtils.fromJson(hit.getSourceAsString(), Product.class); product.setScore(hit.getScore()); // 高亮处理 if (hit.getHighlightFields() != null) { HighlightField highlight = hit.getHighlightFields().get("name"); if (highlight != null && highlight.getFragments().length > 0) { product.setHighlightName(highlight.getFragments()[0].string()); } } products.add(product); } result.setData(products); // 聚合结果 processAggregations(result, response.getAggregations()); // 搜索建议 if (StringUtils.isNotBlank(request.getKeyword()) && products.isEmpty()) { result.setSuggestions(getSuggestions(request.getKeyword())); } return result; } }

代码清单12:电商商品搜索系统

7.2 日志分析系统

@Service @Slf4j public class LogAnalysisService { // 日志搜索 public LogSearchResult searchLogs(LogSearchRequest request) throws IOException { String index = getLogIndex(request.getStartTime(), request.getEndTime()); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 时间范围查询 RangeQueryBuilder timeRange = QueryBuilders.rangeQuery("@timestamp") .gte(request.getStartTime()) .lte(request.getEndTime()) .format("yyyy-MM-dd HH:mm:ss"); BoolQueryBuilder boolQuery = QueryBuilders.boolQuery() .filter(timeRange); // 关键词搜索 if (StringUtils.isNotBlank(request.getKeyword())) { boolQuery.must(QueryBuilders.queryStringQuery(request.getKeyword()) .field("message") .field("level") .field("logger_name") .defaultOperator(Operator.AND)); } // 级别过滤 if (CollectionUtils.isNotEmpty(request.getLevels())) { boolQuery.filter(QueryBuilders.termsQuery("level", request.getLevels())); } // 应用过滤 if (StringUtils.isNotBlank(request.getApplication())) { boolQuery.filter(QueryBuilders.termQuery("application", request.getApplication())); } sourceBuilder.query(boolQuery); // 排序 sourceBuilder.sort(SortBuilders.fieldSort("@timestamp") .order(SortOrder.DESC)); // 分页 sourceBuilder.from((request.getPage() - 1) * request.getSize()) .size(request.getSize()); // 聚合 addLogAggregations(sourceBuilder, request); // 执行搜索 SearchResponse response = executeSearch(index, sourceBuilder); return processLogSearchResult(response, request); } // 错误日志统计 public ErrorStats getErrorStats(Date startTime, Date endTime, String application) throws IOException { String index = getLogIndex(startTime, endTime); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 时间范围 RangeQueryBuilder timeRange = QueryBuilders.rangeQuery("@timestamp") .gte(startTime) .lte(endTime); BoolQueryBuilder boolQuery = QueryBuilders.boolQuery() .filter(timeRange) .filter(QueryBuilders.termQuery("level", "ERROR")); if (StringUtils.isNotBlank(application)) { boolQuery.filter(QueryBuilders.termQuery("application", application)); } sourceBuilder.query(boolQuery); sourceBuilder.size(0); // 错误数量聚合 AggregationBuilder countAgg = AggregationBuilders .cardinality("error_count") .field("trace_id"); // 按应用分组 AggregationBuilder appAgg = AggregationBuilders .terms("by_application") .field("application") .size(20) .subAggregation(AggregationBuilders .cardinality("app_error_count") .field("trace_id")); // 按时间分组 AggregationBuilder timeAgg = AggregationBuilders .dateHistogram("by_time") .field("@timestamp") .calendarInterval(DateHistogramInterval.HOUR) .format("yyyy-MM-dd HH:00") .minDocCount(0) .extendedBounds( new LongBounds(startTime.getTime(), endTime.getTime())) .subAggregation(AggregationBuilders .cardinality("hour_error_count") .field("trace_id")); sourceBuilder.aggregation(countAgg); sourceBuilder.aggregation(appAgg); sourceBuilder.aggregation(timeAgg); SearchResponse response = executeSearch(index, sourceBuilder); return processErrorStats(response); } // 慢查询分析 public SlowQueryStats analyzeSlowQueries(Date startTime, Date endTime) throws IOException { String index = getLogIndex(startTime, endTime); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 查找慢查询日志 BoolQueryBuilder boolQuery = QueryBuilders.boolQuery() .filter(QueryBuilders.rangeQuery("@timestamp") .gte(startTime) .lte(endTime)) .filter(QueryBuilders.termQuery("logger_name", "slow_query")) .filter(QueryBuilders.rangeQuery("duration").gte(1000)); // 超过1秒 sourceBuilder.query(boolQuery); sourceBuilder.size(0); // 按SQL类型分组 AggregationBuilder sqlTypeAgg = AggregationBuilders .terms("by_sql_type") .field("sql_type") .size(10) .subAggregation(AggregationBuilders .avg("avg_duration") .field("duration")) .subAggregation(AggregationBuilders .max("max_duration") .field("duration")) .subAggregation(AggregationBuilders .percentiles("duration_percentiles") .field("duration") .percentiles(50, 90, 95, 99)); // 按表分组 AggregationBuilder tableAgg = AggregationBuilders .terms("by_table") .field("table_name") .size(20) .subAggregation(AggregationBuilders .avg("avg_duration") .field("duration")); sourceBuilder.aggregation(sqlTypeAgg); sourceBuilder.aggregation(tableAgg); SearchResponse response = executeSearch(index, sourceBuilder); return processSlowQueryStats(response); } }

代码清单13:日志分析系统

8. 性能优化与监控

8.1 性能调优

@Component @Slf4j public class PerformanceTuner { // 1. JVM调优 public void tuneJvm() { // ES推荐JVM配置 // -Xms4g -Xmx4g // 堆内存,不超过32GB // -XX:+UseG1GC // G1垃圾回收器 // -XX:MaxGCPauseMillis=200 // -XX:G1ReservePercent=25 // -XX:InitiatingHeapOccupancyPercent=30 } // 2. 索引设置优化 public Settings getOptimizedSettings() { return Settings.builder() .put("index.number_of_shards", 3) // 根据数据量调整 .put("index.number_of_replicas", 1) .put("index.refresh_interval", "30s") // 写入频繁时调大 .put("index.translog.durability", "async") // 异步translog .put("index.translog.sync_interval", "5s") .put("index.translog.flush_threshold_size", "512mb") .put("index.merge.scheduler.max_thread_count", 1) // 机械硬盘 .put("index.merge.scheduler.max_merge_count", 6) .put("index.unassigned.node_left.delayed_timeout", "5m") .build(); } // 3. 查询调优 public SearchSourceBuilder tuneSearch(SearchSourceBuilder builder) { // 启用查询缓存 builder.query(QueryBuilders.boolQuery() .must(QueryBuilders.matchQuery("name", "手机")) .filter(QueryBuilders.termQuery("status", "active"))) .requestCache(true); // 开启查询缓存 // 设置超时 builder.timeout(TimeValue.timeValueSeconds(5)); // 限制返回字段 builder.fetchSource(new String[]{"id", "name", "price"}, null); // 禁用评分 builder.query(QueryBuilders.constantScoreQuery( QueryBuilders.termQuery("category", "electronics"))); return builder; } // 4. 监控指标收集 @Scheduled(fixedDelay = 60000) public void collectMetrics() throws IOException { // 集群健康 ClusterHealthRequest healthRequest = new ClusterHealthRequest(); ClusterHealthResponse healthResponse = client.cluster() .health(healthRequest, RequestOptions.DEFAULT); // 节点状态 NodesStatsRequest nodesRequest = new NodesStatsRequest(); nodesRequest.indices(true); nodesRequest.os(true); nodesRequest.jvm(true); nodesRequest.threadPool(true); NodesStatsResponse nodesResponse = client.nodes() .stats(nodesRequest, RequestOptions.DEFAULT); // 索引状态 IndicesStatsRequest indicesRequest = new IndicesStatsRequest(); indicesRequest.all(); IndicesStatsResponse indicesResponse = client.indices() .stats(indicesRequest, RequestOptions.DEFAULT); // 发送到监控系统 sendToMonitoringSystem(healthResponse, nodesResponse, indicesResponse); // 检查是否需要扩容 checkAndScale(healthResponse, nodesResponse, indicesResponse); } // 5. 热点分片识别 public void identifyHotShards() throws IOException { IndicesStatsRequest request = new IndicesStatsRequest(); request.all(); IndicesStatsResponse response = client.indices() .stats(request, RequestOptions.DEFAULT); Map<String, IndicesStats> indices = response.getIndices(); for (Map.Entry<String, IndicesStats> entry : indices.entrySet()) { String index = entry.getKey(); IndicesStats stats = entry.getValue(); ShardStats[] shardStats = stats.getShards(); for (ShardStats shardStat : shardStats) { // 检查查询负载 if (shardStat.getStats().getSearch().getTotal().getQueryCount() > 10000) { // 阈值 log.warn("热点分片: {}/{},查询次数: {}", index, shardStat.getShardRouting().getId(), shardStat.getStats().getSearch().getTotal().getQueryCount()); // 触发分片重分配 rerouteHotShard(index, shardStat); } } } } }

代码清单14:性能调优

8.2 监控告警

# prometheus配置 scrape_configs: - job_name: 'elasticsearch' static_configs: - targets: ['localhost:9200'] metrics_path: '/_prometheus/metrics' # 告警规则 alerting_rules: - alert: ClusterHealthRed expr: elasticsearch_cluster_health_status{color="red"} == 1 for: 5m labels: severity: critical annotations: summary: "ES集群状态为RED" - alert: HighCpuUsage expr: rate(elasticsearch_process_cpu_percent[5m]) > 0.8 for: 2m labels: severity: warning annotations: summary: "ES节点CPU使用率过高" - alert: HighHeapUsage expr: elasticsearch_jvm_memory_used_bytes / elasticsearch_jvm_memory_max_bytes > 0.8 for: 2m labels: severity: warning annotations: summary: "ES节点堆内存使用率过高" - alert: HighDiskUsage expr: elasticsearch_filesystem_data_used_bytes / elasticsearch_filesystem_data_size_bytes > 0.8 for: 5m labels: severity: warning annotations: summary: "ES节点磁盘使用率过高" - alert: HighSearchLatency expr: histogram_quantile(0.95, rate(elasticsearch_indices_search_query_time_seconds_bucket[5m])) > 1 for: 2m labels: severity: warning annotations: summary: "ES搜索延迟过高" - alert: UnassignedShards expr: elasticsearch_cluster_health_unassigned_shards > 0 for: 5m labels: severity: critical annotations: summary: "ES有未分配的分片"

代码清单15:监控告警配置

9. 故障排查指南

9.1 常见问题排查

@Component @Slf4j public class TroubleshootingGuide { // 1. 搜索慢 public void diagnoseSlowSearch(String index, String query) throws IOException { // 启用profile SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(QueryBuilders.matchQuery("name", query)); sourceBuilder.profile(true); SearchRequest request = new SearchRequest(index); request.source(sourceBuilder); SearchResponse response = client.search(request, RequestOptions.DEFAULT); // 分析profile结果 Map<String, ProfileShardResult> profileResults = response.getProfileResults(); for (Map.Entry<String, ProfileShardResult> entry : profileResults.entrySet()) { log.info("分片{}的profile结果:", entry.getKey()); for (QueryProfileShardResult queryProfile : entry.getValue().getQueryProfileResults()) { log.info("查询类型: {}, 耗时: {}ms", queryProfile.getQueryName(), queryProfile.getTime()); // 打印详细信息 for (ProfileResult profileResult : queryProfile.getQueryResults()) { log.info("详细信息: {}", profileResult); } } } } // 2. 写入慢 public void diagnoseSlowIndexing(String index) throws IOException { // 检查refresh间隔 GetSettingsRequest settingsRequest = new GetSettingsRequest() .indices(index) .names("index.refresh_interval"); GetSettingsResponse settingsResponse = client.indices() .getSettings(settingsRequest, RequestOptions.DEFAULT); log.info("refresh间隔: {}", settingsResponse.getSetting(index, "index.refresh_interval")); // 检查translog IndicesStatsRequest statsRequest = new IndicesStatsRequest() .indices(index) .clear() .docs(true) .store(true) .indexing(true) .search(true); IndicesStatsResponse statsResponse = client.indices() .stats(statsRequest, RequestOptions.DEFAULT); IndexStats indexStats = statsResponse.getIndex(index); log.info("索引统计: {}", indexStats); } // 3. 内存高 public void diagnoseHighMemory() throws IOException { NodesStatsRequest request = new NodesStatsRequest(); request.jvm(true); NodesStatsResponse response = client.nodes() .stats(request, RequestOptions.DEFAULT); for (NodeStats nodeStats : response.getNodes()) { JvmStats jvmStats = nodeStats.getJvm(); JvmStats.Mem mem = jvmStats.getMem(); log.info("节点{}内存使用: {}/{}, GC次数: {}", nodeStats.getNode().getName(), mem.getUsed(), mem.getHeapMax(), jvmStats.getGc().getCollectors().get("young").getCollectionCount()); // 检查fielddata if (nodeStats.getIndices().getFieldData() != null) { log.info("fielddata内存: {}", nodeStats.getIndices().getFieldData().getMemorySize()); } } } // 4. 分片未分配 public void diagnoseUnassignedShards() throws IOException { ClusterAllocationExplainRequest request = new ClusterAllocationExplainRequest(); ClusterAllocationExplainResponse response = client.cluster() .allocationExplain(request, RequestOptions.DEFAULT); log.info("分片分配解释: {}", response.getExplanation()); // 检查磁盘空间 NodesStatsRequest nodesRequest = new NodesStatsRequest(); nodesRequest.fs(true); NodesStatsResponse nodesResponse = client.nodes() .stats(nodesRequest, RequestOptions.DEFAULT); for (NodeStats nodeStats : nodesResponse.getNodes()) { FsInfo fsInfo = nodeStats.getFs(); for (FsInfo.Path path : fsInfo) { log.info("节点{}磁盘使用: {}/{} ({}%)", nodeStats.getNode().getName(), path.getAvailable(), path.getTotal(), (path.getTotal() - path.getAvailable()) * 100 / path.getTotal()); } } } // 5. 热点节点 public void diagnoseHotNodes() throws IOException { NodesStatsRequest request = new NodesStatsRequest(); request.indices(true); request.os(true); NodesStatsResponse response = client.nodes() .stats(request, RequestOptions.DEFAULT); for (NodeStats nodeStats : response.getNodes()) { // 检查查询负载 long queryCount = nodeStats.getIndices().getSearch() .getTotal().getQueryCount(); // 检查索引负载 long indexCount = nodeStats.getIndices().getIndexing() .getTotal().getIndexCount(); // 检查CPU double cpuPercent = nodeStats.getOs().getCpu().getPercent(); log.info("节点{}负载 - 查询: {}, 索引: {}, CPU: {}%", nodeStats.getNode().getName(), queryCount, indexCount, cpuPercent); if (cpuPercent > 80 || queryCount > 10000) { log.warn("节点{}可能是热点节点", nodeStats.getNode().getName()); } } } }

代码清单16:故障排查工具

10. 选型与总结

10.1 ES vs 其他方案对比

方案

优点

缺点

适用场景

Elasticsearch

功能全,生态好,性能优秀

资源消耗大,运维复杂

全文搜索、日志分析

Solr

成熟稳定,功能丰富

社区活跃度下降,实时性差

文档搜索、企业搜索

OpenSearch

ES开源分支,AWS支持

生态不如ES

AWS环境,需要完全开源

MeiliSearch

轻量快速,简单易用

功能相对简单

小型应用,简单搜索

PostgreSQL

事务支持,SQL查询

搜索功能弱,性能差

已有PG,简单搜索需求

10.2 我的"ES军规"

  1. 分片设计要合理:单个分片不超过50GB
  2. 映射设计要严谨:禁用动态映射,明确字段类型
  3. 查询要优化:避免wildcard,善用filter
  4. 监控要全面:集群健康、性能指标、业务指标
  5. 容量要规划:提前规划扩容,设置水位线
  6. 备份要定期:定期快照,测试恢复

11. 最后的话

Elasticsearch是强大的搜索引擎,但不是银弹。理解原理,合理设计,持续监控,才能用好这个强大的工具。

我见过太多团队在这上面栽跟头:有的分片数不合理,有的查询没优化,有的没监控导致故障。

记住:ES是工具,不是魔法。结合业务特点,设计合适方案,做好监控和优化,才是正道。

📚 推荐阅读

官方文档

  1. Elasticsearch官方文档​ - 最全的ES文档
  2. Java客户端文档​ - Java客户端详细文档

源码学习

  1. Elasticsearch源码​ - 官方源码
  2. Lucene源码​ - 底层搜索引擎

最佳实践

  1. Elasticsearch最佳实践​ - 官方最佳实践
  2. 电商搜索架构​ - 电商搜索架构设计

监控工具

  1. Kibana监控​ - ES官方监控工具
  2. Prometheus监控​ - 指标监控

最后建议:从简单场景开始,理解原理后再尝试复杂方案。做好监控,设置合理的分片和副本,定期优化查询。记住:搜索优化是个持续的过程,不是一次性的任务

Read more

Flutter 组件 vietqr_gen 适配鸿蒙 HarmonyOS 实战:标准聚合支付,构建金融级二维码生成与跨境支付治理架构

Flutter 组件 vietqr_gen 适配鸿蒙 HarmonyOS 实战:标准聚合支付,构建金融级二维码生成与跨境支付治理架构

欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.ZEEKLOG.net Flutter 组件 vietqr_gen 适配鸿蒙 HarmonyOS 实战:标准聚合支付,构建金融级二维码生成与跨境支付治理架构 前言 在鸿蒙(OpenHarmony)生态迈向全场景商业化、涉及跨境数字化金融、智能收银终端及分布式聚合支付的背景下,如何生成符合国际 EMVCo 标准且具备高可靠校验机制的支付二维码,已成为决定金融类应用“交易确定性”的核心环节。在鸿蒙设备这类强调内核级安全防护与高精度金融计算的环境下,如果应用依然依赖简单的字符串拼接来构造具有复杂 TLV(Tag-Length-Value)结构的支付密令,由于由于字节统计误差或 CRC 校验逻辑漏洞,极易由于由于扫码解析失败导致资金结算链路的中断。 我们需要一种能够自动化 TLV 封装、支持标准银行目录映射且具备高精度 CRC16 校验的金融级生成方案。 vietqr_gen 为 Flutter 开发者引入了标准化的聚合支付二维码生成协议。它不仅支持对收款账号、金额及备注的结构化打包,更

By Ne0inhk
Flutter for OpenHarmony:Flutter 三方库 bloc_lint — 静态层给架构建立强硬代码纪律法规(架构治理引擎)

Flutter for OpenHarmony:Flutter 三方库 bloc_lint — 静态层给架构建立强硬代码纪律法规(架构治理引擎)

欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.ZEEKLOG.net。 前言 在鸿蒙(OpenHarmony)商业应用构建体系中,BLoC (Business Logic Component) 作为极其受欢迎且久经沙场验证的主流状态管理选项之一,其能够很好的区隔 UI 层与深层次复杂多变业务层。但即便其设计优秀且完善,部分因为初学者对“事件源如何定义”、“状态应当如何闭环抛出和重建”理解错位而在团队项目中引发了诸如事件滥用乱扔的状态泄漏等大型坑底。 bloc_lint 作为一套完全专门为 flutter_bloc 体系打造的规则分析插件,在底层完全接入你最信任的老大哥 IDE 和 CLI 验证中心。它通过对你的源码状态类代码进行扫描,从而逼你建立符合该架构设计真正思想哲学初衷的写法。在想要于庞大极其需要高度共识的企业级鸿蒙项目中推动 BLoC 范式时,它是你的架构卫士。 一、原理展示 / 概念介绍 1.1 基础概念 本机制就像是在 Dart 分析服务器里面插入了由 BLoC 作者参与或者基于经验而设定好的硬性代码规范探针体

By Ne0inhk
Spring Cloud与Dubbo架构哲学对决

Spring Cloud与Dubbo架构哲学对决

目录 摘要 🎯 开篇:别被"微服务"这个词忽悠了 🏗️ 架构哲学:两种完全不同的"世界观" 🎨 Spring Cloud:生态为王的全家桶 ⚡ Dubbo:专精RPC的特种兵 📊 性能对决:数据不说谎 🔧 核心原理:扒开看看里面啥样 🎯 Spring Cloud 2025.1.0的"虚拟线程革命" ⚡ Dubbo 3.0的Triple协议:对标gRPC 🚀 实战:从零搭建混合微服务架构 🏢 场景:电商平台(日均订单千万级) 📝 分步骤实现 步骤1:搭建Spring Cloud Gateway 步骤2:Dubbo核心服务实现 步骤3:Spring Cloud外围服务 步骤4:

By Ne0inhk
【OpenClaw从入门到精通】第03篇:吃透Gateway/Skills/ClawHub核心概念(2026实测+避坑)

【OpenClaw从入门到精通】第03篇:吃透Gateway/Skills/ClawHub核心概念(2026实测+避坑)

摘要:本文针对OpenClaw新手易混淆的核心概念痛点,以通俗类比+实操演示拆解OpenClaw核心、Gateway、Skills、ClawHub四大组件。通过“数字员工团队”类比明确各组件定位:OpenClaw核心是“老板”(调度中心)、Gateway是“前台+后勤”(后台进程)、Skills是“专业员工”(功能插件)、ClawHub是“人才市场”(技能商店)。补充版本更名史、技能加载优先级、ClawHub与GitHub区别等关键细节,结合“AI融资新闻查询并邮件推送”虚拟案例演示组件协同流程,梳理5个高频认知误区及解决方案。所有内容基于2026年官方文档实测,案例为虚拟构建,代码仅作示例未上传GitHub,兼顾新手理解与进阶实操参考,帮助读者建立清晰的OpenClaw架构认知。 优质专栏欢迎订阅! 【DeepSeek深度应用】【Python高阶开发:AI自动化与数据工程实战】【YOLOv11工业级实战】 【机器视觉:C# + HALCON】【大模型微调实战:平民级微调技术全解】 【人工智能之深度学习】【AI 赋能:Python 人工智能应用实战】

By Ne0inhk