Elasticsearch核心概念与Java客户端实战 构建高性能搜索服务

Elasticsearch核心概念与Java客户端实战 构建高性能搜索服务

目录

🎯 先说说我被ES"虐惨"的经历

✨ 摘要

1. 为什么选择Elasticsearch?

1.1 从数据库的痛苦说起

1.2 Elasticsearch的优势

2. ES核心架构解析

2.1 集群架构

2.2 索引与分片

3. Java客户端实战

3.1 客户端选型对比

3.2 RestHighLevelClient配置

3.3 Spring Data Elasticsearch配置

4. 索引设计最佳实践

4.1 索引生命周期管理

4.2 映射设计技巧

5. 查询优化实战

5.1 查询类型对比

5.2 性能优化技巧

6. 批量操作与实时性

6.1 Bulk批量操作

6.2 实时性控制

7. 企业级实战案例

7.1 电商商品搜索系统

7.2 日志分析系统

8. 性能优化与监控

8.1 性能调优

8.2 监控告警

9. 故障排查指南

9.1 常见问题排查

10. 选型与总结

10.1 ES vs 其他方案对比

10.2 我的"ES军规"

11. 最后的话

📚 推荐阅读

官方文档

源码学习

最佳实践

监控工具


🎯 先说说我被ES"虐惨"的经历

我们第一次在电商系统用ES做商品搜索,上线第一天就崩了。用户搜"手机",结果返回了"手纸",分词器配错了。更绝的是,有次大促,ES集群CPU 100%,排查发现是有人用了wildcard查询:"手机"。

去年搞日志系统,用ES存日志,一天几个TB,结果磁盘报警。发现是分片数设错了,一个索引200个分片,集群管理开销巨大。

上个月做实时推荐,用ES做向量搜索,结果发现Java High Level Client内存泄漏,查了三天是BulkProcessor没正确关闭。

这些事让我明白:不懂ES原理的程序员,就是在用搜索引擎埋雷,早晚要炸

✨ 摘要

Elasticsearch是基于Lucene的分布式搜索引擎,通过倒排索引实现毫秒级检索。本文深度解析ES集群架构、分片原理、查询优化机制,揭秘Java客户端的最佳实践。通过完整电商搜索实战,对比不同查询方式的性能差异,提供索引设计、查询优化、集群监控等核心问题的解决方案。包含企业级配置模板、性能调优数据和故障排查手册。

1. 为什么选择Elasticsearch?

1.1 从数据库的痛苦说起

先看个MySQL做搜索的典型问题:

-- MySQL模糊查询 SELECT * FROM products WHERE name LIKE '%手机%' OR description LIKE '%手机%' OR tags LIKE '%手机%' ORDER BY create_time DESC LIMIT 100 OFFSET 0;

代码清单1:MySQL模糊查询

用图表示这个问题:

图1:MySQL搜索的问题

MySQL搜索的痛点

  1. LIKE '%xxx%'导致全表扫描
  2. 多字段OR查询性能极差
  3. 无法支持复杂评分排序
  4. 分词、同义词、拼音搜索不支持

1.2 Elasticsearch的优势

ES的倒排索引(Inverted Index)是核心:

// 倒排索引结构示例 public class InvertedIndex { // 词项 -> 文档列表 Map<String, List<Posting>> index = new HashMap<>(); // 文档1: "华为手机很好用" // 文档2: "小米手机性价比高" // 倒排索引: // "华为" -> [文档1] // "手机" -> [文档1, 文档2] // "小米" -> [文档2] // "很好用" -> [文档1] // "性价比" -> [文档2] }

代码清单2:倒排索引原理

搜索过程对比:

图2:MySQL vs ES搜索流程对比

性能对比测试(1000万商品数据):

场景

MySQL

Elasticsearch

性能差距

单字段模糊查询

3200ms

45ms

71倍

多字段OR查询

8500ms

65ms

130倍

复杂条件+排序

12000ms

85ms

141倍

内存占用

4.2GB

1.8GB

57%

2. ES核心架构解析

2.1 集群架构

图3:ES集群架构

节点类型

  • 主节点(Master):管理集群状态、分片分配
  • 数据节点(Data):存储数据、执行CRUD
  • 协调节点(Coordinating):路由请求、聚合结果
  • 摄取节点(Ingest):数据预处理

2.2 索引与分片

// 索引创建示例 @Configuration public class IndexConfig { public void createProductIndex(RestHighLevelClient client) throws IOException { CreateIndexRequest request = new CreateIndexRequest("products"); // 索引设置 request.settings(Settings.builder() .put("index.number_of_shards", 3) // 主分片数 .put("index.number_of_replicas", 1) // 副本数 .put("index.refresh_interval", "1s") // 刷新间隔 .put("analysis.analyzer.default.type", "ik_max_word")); // 分词器 // Mapping定义 XContentBuilder mapping = JsonXContent.contentBuilder() .startObject() .startObject("properties") .startObject("id") .field("type", "keyword") .endObject() .startObject("name") .field("type", "text") .field("analyzer", "ik_max_word") .field("search_analyzer", "ik_smart") .field("fields", JsonXContent.contentBuilder() .startObject() .startObject("pinyin") .field("type", "text") .field("analyzer", "pinyin_analyzer") .endObject() .startObject("keyword") .field("type", "keyword") .field("ignore_above", 256) .endObject() .endObject()) .endObject() .startObject("price") .field("type", "double") .endObject() .startObject("category_id") .field("type", "integer") .endObject() .startObject("sales") .field("type", "integer") .endObject() .startObject("create_time") .field("type", "date") .field("format", "yyyy-MM-dd HH:mm:ss||epoch_millis") .endObject() .endObject() .endObject(); request.mapping(mapping); CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT); if (response.isAcknowledged()) { log.info("索引创建成功"); } } }

代码清单3:索引创建配置

分片原理

图4:分片与副本分布

分片设计原则

  1. 单个分片不超过50GB
  2. 分片数 = 数据总量 / 50GB
  3. 副本数 = 节点数 - 1(至少1个)
  4. 避免过度分片(每个分片有开销)

3. Java客户端实战

3.1 客户端选型对比

客户端

优点

缺点

推荐场景

RestHighLevelClient

官方维护,功能全

笨重,API复杂

新项目,需要完整功能

Java Low Level Client

轻量,灵活

需要手动处理JSON

简单查询,性能敏感

Spring Data Elasticsearch

简洁,集成Spring

版本兼容问题

Spring Boot项目

JestClient

简单易用

已停止维护

不推荐新项目

我们的选择:新项目用RestHighLevelClient,Spring Boot项目用Spring Data Elasticsearch。

3.2 RestHighLevelClient配置

@Configuration @Slf4j public class ElasticsearchConfig { @Value("${elasticsearch.hosts:localhost:9200}") private String hosts; @Value("${elasticsearch.username:}") private String username; @Value("${elasticsearch.password:}") private String password; @Bean public RestHighLevelClient restHighLevelClient() { // 解析主机列表 String[] hostArray = hosts.split(","); HttpHost[] httpHosts = new HttpHost[hostArray.length]; for (int i = 0; i < hostArray.length; i++) { String[] hostPort = hostArray[i].split(":"); httpHosts[i] = new HttpHost( hostPort[0].trim(), Integer.parseInt(hostPort[1].trim()), "http"); } // 配置Builder RestClientBuilder builder = RestClient.builder(httpHosts) .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder .setConnectTimeout(5000) // 连接超时5秒 .setSocketTimeout(60000) // Socket超时60秒 .setConnectionRequestTimeout(1000)) // 请求超时1秒 .setHttpClientConfigCallback(httpClientBuilder -> { // 连接池配置 httpClientBuilder.setMaxConnTotal(100) // 最大连接数 .setMaxConnPerRoute(50) // 每路由最大连接数 .setKeepAliveStrategy((response, context) -> 60000); // 保活60秒 // 认证配置 if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(password)) { CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials( AuthScope.ANY, new UsernamePasswordCredentials(username, password)); httpClientBuilder.setDefaultCredentialsProvider( credentialsProvider); } return httpClientBuilder; }) .setFailureListener(new RestClient.FailureListener() { @Override public void onFailure(Node node) { log.error("ES节点连接失败: {}", node.getHost()); } }); return new RestHighLevelClient(builder); } // 健康检查 @Scheduled(fixedDelay = 30000) public void checkClusterHealth() { try { ClusterHealthRequest request = new ClusterHealthRequest(); ClusterHealthResponse response = restHighLevelClient().cluster().health(request, RequestOptions.DEFAULT); ClusterHealthStatus status = response.getStatus(); if (status != ClusterHealthStatus.GREEN) { log.warn("ES集群状态异常: {}, 详情: {}", status, response.toString()); // 发送告警 alertService.sendAlert("ES集群状态异常", "状态: " + status + ", 详情: " + response.toString()); } } catch (Exception e) { log.error("ES集群健康检查失败", e); } } }

代码清单4:ES客户端配置

3.3 Spring Data Elasticsearch配置

@Configuration @EnableElasticsearchRepositories(basePackages = "com.example.repository") @Slf4j public class SpringDataESConfig { @Value("${elasticsearch.hosts:localhost:9200}") private String hosts; @Bean public RestHighLevelClient elasticsearchClient() { // 同上面配置 return restHighLevelClient(); } @Bean public ElasticsearchRestTemplate elasticsearchTemplate() { return new ElasticsearchRestTemplate(elasticsearchClient()); } // 实体类映射 @Document(indexName = "products", createIndex = false) @Setting(settingPath = "/settings/product-settings.json") @Mapping(mappingPath = "/mappings/product-mapping.json") @Data @NoArgsConstructor @AllArgsConstructor public class Product { @Id private String id; @Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart") @MultiField(mainField = @Field(type = FieldType.Text, analyzer = "ik_max_word"), otherFields = { @InnerField(suffix = "pinyin", type = FieldType.Text, analyzer = "pinyin"), @InnerField(suffix = "keyword", type = FieldType.Keyword) }) private String name; @Field(type = FieldType.Double) private Double price; @Field(type = FieldType.Integer) private Integer categoryId; @Field(type = FieldType.Integer) private Integer sales; @Field(type = FieldType.Date, format = DateFormat.date_hour_minute_second) private Date createTime; // Geo字段 @GeoPointField private GeoPoint location; } // Repository接口 public interface ProductRepository extends ElasticsearchRepository<Product, String> { // 方法名查询 List<Product> findByName(String name); List<Product> findByPriceBetween(Double minPrice, Double maxPrice); Page<Product> findByCategoryId(Integer categoryId, Pageable pageable); // @Query注解 @Query("{\"match\": {\"name\": \"?0\"}}") List<Product> findByNameCustom(String name); // 原生查询 @Query("{\"bool\": {\"must\": [" + "{\"match\": {\"name\": \"?0\"}}," + "{\"range\": {\"price\": {\"gte\": ?1, \"lte\": ?2}}}" + "]}}") List<Product> searchByNameAndPriceRange(String name, Double minPrice, Double maxPrice); } }

代码清单5:Spring Data ES配置

4. 索引设计最佳实践

4.1 索引生命周期管理

@Component @Slf4j public class IndexLifecycleManager { // 1. 按时间滚动索引 public String getDailyIndex(String indexPrefix) { String date = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy.MM.dd")); return indexPrefix + "-" + date; } // 2. 创建带别名的索引 public void createIndexWithAlias(String indexName, String alias) throws IOException { CreateIndexRequest request = new CreateIndexRequest(indexName); // 索引设置 request.settings(getIndexSettings()); // 创建索引 CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT); if (response.isAcknowledged()) { // 添加别名 IndicesAliasesRequest aliasRequest = new IndicesAliasesRequest(); AliasActions aliasAction = new AliasActions( AliasActions.Type.ADD) .index(indexName) .alias(alias); aliasRequest.addAliasAction(aliasAction); client.indices().updateAliases(aliasRequest, RequestOptions.DEFAULT); log.info("创建索引成功: {},别名: {}", indexName, alias); } } // 3. 索引滚动策略 @Scheduled(cron = "0 0 0 * * ?") // 每天零点执行 public void rolloverIndex() throws IOException { String todayIndex = getDailyIndex("logs"); String alias = "logs-current"; // 检查别名指向的索引 GetAliasesRequest request = new GetAliasesRequest(alias); GetAliasesResponse response = client.indices().getAlias(request, RequestOptions.DEFAULT); Map<String, Set<AliasMetadata>> aliases = response.getAliases(); if (aliases.isEmpty()) { // 第一次创建 createIndexWithAlias(todayIndex, alias); } else { // 检查当前索引大小 String currentIndex = aliases.keySet().iterator().next(); IndexStats stats = getIndexStats(currentIndex); // 如果超过50GB或创建超过7天,创建新索引 if (stats.getStoreSizeInBytes() > 50L * 1024 * 1024 * 1024 || isIndexOlderThan(currentIndex, 7)) { // 创建新索引 createIndexWithAlias(todayIndex, alias); // 切换别名 switchAlias(alias, currentIndex, todayIndex); // 关闭旧索引 closeOldIndex(currentIndex); } } } // 4. 索引关闭和删除 public void manageIndexLifecycle() throws IOException { // 关闭30天前的索引 closeIndicesOlderThan(30); // 删除90天前的索引 deleteIndicesOlderThan(90); // 强制合并segments forceMergeIndicesOlderThan(7); } // 5. 索引模板 public void createIndexTemplate() throws IOException { PutIndexTemplateRequest request = new PutIndexTemplateRequest( "logs-template"); request.patterns(Arrays.asList("logs-*")); request.settings(getIndexSettings()); request.mapping(getIndexMapping()); // 别名配置 request.alias(new Alias("logs-all")); // 优先级 request.order(1); // 版本 request.version(1); PutIndexTemplateResponse response = client.indices() .putTemplate(request, RequestOptions.DEFAULT); if (response.isAcknowledged()) { log.info("索引模板创建成功"); } } }

代码清单6:索引生命周期管理

4.2 映射设计技巧

{ "mappings": { "dynamic": "strict", // 禁止动态字段 "_source": { "enabled": true, "excludes": ["big_field"] }, "properties": { "id": { "type": "keyword", "ignore_above": 256 }, "title": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_smart", "fields": { "pinyin": { "type": "text", "analyzer": "pinyin_analyzer" }, "keyword": { "type": "keyword", "ignore_above": 256 }, "completion": { "type": "completion", "analyzer": "simple" } } }, "price": { "type": "scaled_float", "scaling_factor": 100 }, "location": { "type": "geo_point" }, "tags": { "type": "text", "analyzer": "whitespace" }, "attributes": { "type": "nested", // 嵌套类型 "properties": { "key": { "type": "keyword" }, "value": { "type": "keyword" } } }, "vector": { "type": "dense_vector", // 向量字段 "dims": 128 } } } }

代码清单7:映射设计示例

5. 查询优化实战

5.1 查询类型对比

@Service @Slf4j public class ProductSearchService { // 1. Match查询(最常用) public SearchResponse searchByMatch(String keyword) throws IOException { SearchRequest request = new SearchRequest("products"); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(QueryBuilders.matchQuery("name", keyword) .analyzer("ik_smart") // 指定搜索分词器 .operator(Operator.AND) // 必须包含所有词 .minimumShouldMatch("75%")); // 至少匹配75% sourceBuilder.from(0); sourceBuilder.size(20); sourceBuilder.sort(SortBuilders.scoreSort()); request.source(sourceBuilder); return client.search(request, RequestOptions.DEFAULT); } // 2. MultiMatch查询(多字段) public SearchResponse searchByMultiMatch(String keyword) throws IOException { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(QueryBuilders.multiMatchQuery(keyword) .fields("name^3", "name.pinyin^2", "description") // 权重设置 .type(MultiMatchQueryBuilder.Type.BEST_FIELDS) // 最佳字段策略 .tieBreaker(0.3f)); // 其他字段权重 return executeSearch(sourceBuilder); } // 3. Term查询(精确匹配) public SearchResponse searchByTerm(String category) throws IOException { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(QueryBuilders.termQuery("category.keyword", category)); return executeSearch(sourceBuilder); } // 4. Bool查询(组合查询) public SearchResponse searchByBool(String keyword, Double minPrice, Double maxPrice, String category) throws IOException { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); // MUST: 必须匹配 if (StringUtils.isNotBlank(keyword)) { boolQuery.must(QueryBuilders.matchQuery("name", keyword)); } // FILTER: 过滤,不计算分数 if (minPrice != null || maxPrice != null) { RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("price"); if (minPrice != null) { rangeQuery.gte(minPrice); } if (maxPrice != null) { rangeQuery.lte(maxPrice); } boolQuery.filter(rangeQuery); } // SHOULD: 应该匹配(影响分数) if (StringUtils.isNotBlank(category)) { boolQuery.should(QueryBuilders.termQuery("category.keyword", category)) .minimumShouldMatch(1); } // MUST_NOT: 必须不匹配 boolQuery.mustNot(QueryBuilders.termQuery("status", "deleted")); sourceBuilder.query(boolQuery); return executeSearch(sourceBuilder); } // 5. 聚合查询 public SearchResponse searchWithAggregation() throws IOException { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 查询所有 sourceBuilder.query(QueryBuilders.matchAllQuery()); sourceBuilder.size(0); // 不返回hits,只返回聚合 // 价格区间聚合 AggregationBuilder priceAgg = AggregationBuilders .range("price_ranges") .field("price") .addRange(0, 100) .addRange(100, 500) .addRange(500, 1000) .addRange(1000, 5000) .addRange(5000, Double.MAX_VALUE); // 品类聚合 AggregationBuilder categoryAgg = AggregationBuilders .terms("categories") .field("category.keyword") .size(10); // 嵌套聚合 AggregationBuilder nestedAgg = AggregationBuilders .nested("attributes", "attributes") .subAggregation(AggregationBuilders .terms("attr_keys") .field("attributes.key") .size(10)); sourceBuilder.aggregation(priceAgg); sourceBuilder.aggregation(categoryAgg); sourceBuilder.aggregation(nestedAgg); return executeSearch(sourceBuilder); } // 6. 搜索建议 public SuggestResponse searchSuggest(String prefix) throws IOException { SearchRequest request = new SearchRequest("products"); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.size(0); // Completion Suggester CompletionSuggestionBuilder suggestion = SuggestBuilders.completionSuggestion("name.completion") .prefix(prefix, Fuzziness.AUTO) .skipDuplicates(true) .size(10); SuggestBuilder suggestBuilder = new SuggestBuilder(); suggestBuilder.addSuggestion("product_suggest", suggestion); sourceBuilder.suggest(suggestBuilder); request.source(sourceBuilder); SearchResponse response = client.search(request, RequestOptions.DEFAULT); return response.getSuggest(); } }

代码清单8:各种查询方式实现

5.2 性能优化技巧

@Component @Slf4j public class QueryOptimizer { // 1. 分页优化 public SearchResponse searchWithPagination(String keyword, int page, int size) throws IOException { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 常规分页(深度分页性能差) // sourceBuilder.from((page - 1) * size).size(size); // 推荐:Search After分页 if (page > 1) { // 获取上一页最后一条的sort值 Object[] searchAfter = getSearchAfterValues(page - 1, size); sourceBuilder.searchAfter(searchAfter); } sourceBuilder.size(size); sourceBuilder.sort(SortBuilders.fieldSort("_id")); // 必须有排序 return executeSearch(sourceBuilder); } // 2. 查询重写 public SearchResponse optimizedSearch(String keyword) throws IOException { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 原来的wildcard查询(性能极差) // QueryBuilders.wildcardQuery("name", "*" + keyword + "*"); // 优化方案1:使用ngram QueryBuilders.matchQuery("name.ngram", keyword); // 优化方案2:使用前缀查询 if (keyword.length() < 10) { QueryBuilders.prefixQuery("name.keyword", keyword); } // 优化方案3:使用edge ngram QueryBuilders.matchQuery("name.edge_ngram", keyword); return executeSearch(sourceBuilder); } // 3. 过滤器缓存 public SearchResponse searchWithFilterCache() throws IOException { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); // 经常过滤的条件用filter,可以被缓存 boolQuery.filter(QueryBuilders.termQuery("status", "active")); boolQuery.filter(QueryBuilders.rangeQuery("price") .gte(100).lte(1000)); // 搜索条件 boolQuery.must(QueryBuilders.matchQuery("name", "手机")); sourceBuilder.query(boolQuery); return executeSearch(sourceBuilder); } // 4. 字段数据加载 public SearchResponse searchWithFieldData() throws IOException { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 避免在text字段上排序 // sourceBuilder.sort(SortBuilders.fieldSort("name")); // 错误 // 使用keyword子字段排序 sourceBuilder.sort(SortBuilders.fieldSort("name.keyword")); // 或者使用doc values sourceBuilder.docValueField("name.keyword"); return executeSearch(sourceBuilder); } // 5. 查询超时设置 public SearchResponse searchWithTimeout(String keyword, long timeoutMs) throws IOException { SearchRequest request = new SearchRequest("products"); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.timeout(new TimeValue(timeoutMs, TimeUnit.MILLISECONDS)); sourceBuilder.query(QueryBuilders.matchQuery("name", keyword)); request.source(sourceBuilder); // 设置请求超时 RequestOptions.Builder options = RequestOptions.DEFAULT.toBuilder(); options.setHttpAsyncResponseConsumerFactory( new HttpAsyncResponseConsumerFactory .HeapBufferedResponseConsumerFactory(30 * 1024 * 1024)); return client.search(request, options.build()); } // 6. 并行查询 public List<SearchResponse> parallelSearch(List<String> keywords) throws InterruptedException, ExecutionException { List<CompletableFuture<SearchResponse>> futures = new ArrayList<>(); for (String keyword : keywords) { CompletableFuture<SearchResponse> future = CompletableFuture.supplyAsync(() -> { try { return searchByMatch(keyword); } catch (IOException e) { throw new RuntimeException(e); } }, executor); futures.add(future); } // 等待所有查询完成 CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); allFutures.get(); // 等待完成 return futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); } }

代码清单9:查询性能优化

6. 批量操作与实时性

6.1 Bulk批量操作

@Component @Slf4j public class BulkOperationService { // 1. 简单的Bulk操作 public void bulkIndexProducts(List<Product> products) throws IOException { BulkRequest request = new BulkRequest(); for (Product product : products) { IndexRequest indexRequest = new IndexRequest("products") .id(product.getId()) .source(JsonUtils.toJson(product), XContentType.JSON); request.add(indexRequest); } BulkResponse response = client.bulk(request, RequestOptions.DEFAULT); if (response.hasFailures()) { log.error("Bulk操作失败: {}", response.buildFailureMessage()); // 处理失败项 for (BulkItemResponse item : response.getItems()) { if (item.isFailed()) { log.error("文档{}失败: {}", item.getId(), item.getFailureMessage()); // 重试逻辑 retryFailedItem(item); } } } else { log.info("Bulk操作成功,处理{}个文档", products.size()); } } // 2. 带回调的Bulk public void bulkWithListener(List<Product> products) { BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { log.info("开始执行Bulk,包含{}个请求", request.numberOfActions()); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { log.error("Bulk执行失败: {}", response.buildFailureMessage()); } else { log.info("Bulk执行成功,耗时{}ms", response.getTook().getMillis()); } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { log.error("Bulk执行异常", failure); } }; BulkProcessor.Builder builder = BulkProcessor.builder( (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener); // 配置 builder.setBulkActions(1000) // 每1000个请求执行一次 .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) // 每5MB执行一次 .setFlushInterval(TimeValue.timeValueSeconds(5)) // 每5秒执行一次 .setConcurrentRequests(2) // 并发数 .setBackoffPolicy(BackoffPolicy .exponentialBackoff(TimeValue.timeValueMillis(100), 3)); BulkProcessor processor = builder.build(); // 添加文档 for (Product product : products) { IndexRequest request = new IndexRequest("products") .id(product.getId()) .source(JsonUtils.toJson(product), XContentType.JSON); processor.add(request); } // 关闭processor processor.awaitClose(30, TimeUnit.SECONDS); } // 3. 更新操作 public void bulkUpdateProducts(List<Product> products) throws IOException { BulkRequest request = new BulkRequest(); for (Product product : products) { UpdateRequest updateRequest = new UpdateRequest("products", product.getId()) .doc(JsonUtils.toJson(product), XContentType.JSON) .docAsUpsert(true); // 如果不存在则插入 request.add(updateRequest); } client.bulk(request, RequestOptions.DEFAULT); } // 4. 删除操作 public void bulkDeleteProducts(List<String> ids) throws IOException { BulkRequest request = new BulkRequest(); for (String id : ids) { DeleteRequest deleteRequest = new DeleteRequest("products", id); request.add(deleteRequest); } client.bulk(request, RequestOptions.DEFAULT); } // 5. 批量查询 public MultiGetResponse bulkGetProducts(List<String> ids) throws IOException { MultiGetRequest request = new MultiGetRequest(); for (String id : ids) { request.add(new MultiGetRequest.Item("products", id)); } return client.mget(request, RequestOptions.DEFAULT); } }

代码清单10:批量操作实现

6.2 实时性控制

@Component @Slf4j public class RealtimeControlService { // 1. 立即刷新 public void indexWithRefresh(Product product) throws IOException { IndexRequest request = new IndexRequest("products") .id(product.getId()) .source(JsonUtils.toJson(product), XContentType.JSON) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); // 立即刷新 client.index(request, RequestOptions.DEFAULT); } // 2. 等待刷新 public void indexWithWaitFor(Product product) throws IOException { IndexRequest request = new IndexRequest("products") .id(product.getId()) .source(JsonUtils.toJson(product), XContentType.JSON) .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); // 等待刷新 client.index(request, RequestOptions.DEFAULT); } // 3. 手动刷新 public void manualRefresh() throws IOException { RefreshRequest request = new RefreshRequest("products"); RefreshResponse response = client.indices().refresh(request, RequestOptions.DEFAULT); log.info("手动刷新完成,分片数: {}", response.getTotalShards()); } // 4. 强制合并 public void forceMerge() throws IOException { ForceMergeRequest request = new ForceMergeRequest("products"); request.maxNumSegments(1); // 合并为1个segment request.onlyExpungeDeletes(true); // 只清理删除的文档 ForceMergeResponse response = client.indices().forcemerge(request, RequestOptions.DEFAULT); log.info("强制合并完成,分片数: {}", response.getTotalShards()); } // 5. 刷新间隔设置 public void updateRefreshInterval(int seconds) throws IOException { UpdateSettingsRequest request = new UpdateSettingsRequest("products"); Settings settings = Settings.builder() .put("index.refresh_interval", seconds + "s") .build(); request.settings(settings); AcknowledgedResponse response = client.indices() .putSettings(request, RequestOptions.DEFAULT); if (response.isAcknowledged()) { log.info("更新刷新间隔成功: {}秒", seconds); } } // 6. 实时搜索方案 public SearchResponse realtimeSearch(String keyword) throws IOException { // 先搜索 SearchResponse response = searchByMatch(keyword); // 如果没结果,等待并重试 if (response.getHits().getTotalHits().value == 0) { try { Thread.sleep(1000); // 等待1秒 response = searchByMatch(keyword); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } return response; } }

代码清单11:实时性控制

7. 企业级实战案例

7.1 电商商品搜索系统

@Service @Slf4j public class ECommerceSearchService { // 商品搜索 public SearchResult<Product> searchProducts(ProductSearchRequest request) throws IOException { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 构建Bool查询 BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); // 关键词搜索 if (StringUtils.isNotBlank(request.getKeyword())) { handleKeywordSearch(boolQuery, request.getKeyword()); } // 分类过滤 if (request.getCategoryId() != null) { boolQuery.filter(QueryBuilders.termQuery("category_id", request.getCategoryId())); } // 价格区间 if (request.getMinPrice() != null || request.getMaxPrice() != null) { RangeQueryBuilder priceRange = QueryBuilders.rangeQuery("price"); if (request.getMinPrice() != null) { priceRange.gte(request.getMinPrice()); } if (request.getMaxPrice() != null) { priceRange.lte(request.getMaxPrice()); } boolQuery.filter(priceRange); } // 品牌过滤 if (CollectionUtils.isNotEmpty(request.getBrandIds())) { boolQuery.filter(QueryBuilders.termsQuery("brand_id", request.getBrandIds())); } // 属性过滤 if (CollectionUtils.isNotEmpty(request.getAttributes())) { handleAttributesFilter(boolQuery, request.getAttributes()); } // 地理位置过滤 if (request.getLocation() != null && request.getDistance() != null) { handleGeoFilter(boolQuery, request.getLocation(), request.getDistance()); } sourceBuilder.query(boolQuery); // 排序 handleSorting(sourceBuilder, request.getSortBy(), request.getSortOrder()); // 分页 sourceBuilder.from((request.getPage() - 1) * request.getSize()) .size(request.getSize()); // 高亮 if (StringUtils.isNotBlank(request.getKeyword())) { HighlightBuilder highlightBuilder = new HighlightBuilder(); highlightBuilder.field("name") .preTags("<em>") .postTags("</em>") .fragmentSize(200) .numOfFragments(3); sourceBuilder.highlighter(highlightBuilder); } // 聚合 addAggregations(sourceBuilder); // 执行搜索 SearchResponse response = executeSearch(sourceBuilder); // 处理结果 return processSearchResult(response, request); } private void handleKeywordSearch(BoolQueryBuilder boolQuery, String keyword) { // 多种搜索方式组合 BoolQueryBuilder keywordQuery = QueryBuilders.boolQuery(); // 1. 精确匹配(最高权重) keywordQuery.should(QueryBuilders.matchPhraseQuery("name", keyword) .boost(3.0f)); // 2. 拼音匹配 keywordQuery.should(QueryBuilders.matchQuery("name.pinyin", keyword) .boost(2.0f)); // 3. 分词匹配 keywordQuery.should(QueryBuilders.matchQuery("name", keyword) .boost(1.0f)); // 4. 描述匹配 keywordQuery.should(QueryBuilders.matchQuery("description", keyword) .boost(0.5f)); // 5. 标签匹配 keywordQuery.should(QueryBuilders.matchQuery("tags", keyword) .boost(0.3f)); boolQuery.must(keywordQuery); } private void handleAttributesFilter(BoolQueryBuilder boolQuery, List<AttributeFilter> attributes) { for (AttributeFilter attr : attributes) { NestedQueryBuilder nestedQuery = QueryBuilders.nestedQuery( "attributes", QueryBuilders.boolQuery() .must(QueryBuilders.termQuery("attributes.key", attr.getKey())) .must(QueryBuilders.termsQuery("attributes.value", attr.getValues())), ScoreMode.None); boolQuery.filter(nestedQuery); } } private void handleGeoFilter(BoolQueryBuilder boolQuery, GeoPoint location, String distance) { GeoDistanceQueryBuilder geoQuery = QueryBuilders.geoDistanceQuery( "location") .point(location.getLat(), location.getLon()) .distance(distance); boolQuery.filter(geoQuery); } private void handleSorting(SearchSourceBuilder sourceBuilder, String sortBy, String sortOrder) { if ("price".equals(sortBy)) { sourceBuilder.sort("price", "desc".equals(sortOrder) ? SortOrder.DESC : SortOrder.ASC); } else if ("sales".equals(sortBy)) { sourceBuilder.sort("sales", SortOrder.DESC); } else if ("score".equals(sortBy)) { sourceBuilder.sort(SortBuilders.scoreSort()); } else { // 综合排序:分数*0.6 + 销量*0.3 + 价格*0.1 Script script = new Script( "doc['_score'].value * 0.6 + " + "doc['sales'].value * 0.3 + " + "(10000 - doc['price'].value) * 0.1"); sourceBuilder.sort(SortBuilders.scriptSort(script, ScriptSortBuilder.ScriptSortType.NUMBER)); } } private void addAggregations(SearchSourceBuilder sourceBuilder) { // 价格区间聚合 AggregationBuilder priceAgg = AggregationBuilders .range("price_agg") .field("price") .addRange(0, 100) .addRange(100, 300) .addRange(300, 500) .addRange(500, 1000) .addRange(1000, 5000); // 品牌聚合 AggregationBuilder brandAgg = AggregationBuilders .terms("brand_agg") .field("brand_id") .size(20); // 分类聚合 AggregationBuilder categoryAgg = AggregationBuilders .terms("category_agg") .field("category_id") .size(10); // 属性聚合 AggregationBuilder attributeAgg = AggregationBuilders .nested("attributes_agg", "attributes") .subAggregation(AggregationBuilders .terms("attribute_keys") .field("attributes.key") .size(10) .subAggregation(AggregationBuilders .terms("attribute_values") .field("attributes.value") .size(10))); sourceBuilder.aggregation(priceAgg); sourceBuilder.aggregation(brandAgg); sourceBuilder.aggregation(categoryAgg); sourceBuilder.aggregation(attributeAgg); } private SearchResult<Product> processSearchResult(SearchResponse response, ProductSearchRequest request) { SearchResult<Product> result = new SearchResult<>(); // 总条数 result.setTotal(response.getHits().getTotalHits().value); // 当前页数据 List<Product> products = new ArrayList<>(); for (SearchHit hit : response.getHits()) { Product product = JsonUtils.fromJson(hit.getSourceAsString(), Product.class); product.setScore(hit.getScore()); // 高亮处理 if (hit.getHighlightFields() != null) { HighlightField highlight = hit.getHighlightFields().get("name"); if (highlight != null && highlight.getFragments().length > 0) { product.setHighlightName(highlight.getFragments()[0].string()); } } products.add(product); } result.setData(products); // 聚合结果 processAggregations(result, response.getAggregations()); // 搜索建议 if (StringUtils.isNotBlank(request.getKeyword()) && products.isEmpty()) { result.setSuggestions(getSuggestions(request.getKeyword())); } return result; } }

代码清单12:电商商品搜索系统

7.2 日志分析系统

@Service @Slf4j public class LogAnalysisService { // 日志搜索 public LogSearchResult searchLogs(LogSearchRequest request) throws IOException { String index = getLogIndex(request.getStartTime(), request.getEndTime()); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 时间范围查询 RangeQueryBuilder timeRange = QueryBuilders.rangeQuery("@timestamp") .gte(request.getStartTime()) .lte(request.getEndTime()) .format("yyyy-MM-dd HH:mm:ss"); BoolQueryBuilder boolQuery = QueryBuilders.boolQuery() .filter(timeRange); // 关键词搜索 if (StringUtils.isNotBlank(request.getKeyword())) { boolQuery.must(QueryBuilders.queryStringQuery(request.getKeyword()) .field("message") .field("level") .field("logger_name") .defaultOperator(Operator.AND)); } // 级别过滤 if (CollectionUtils.isNotEmpty(request.getLevels())) { boolQuery.filter(QueryBuilders.termsQuery("level", request.getLevels())); } // 应用过滤 if (StringUtils.isNotBlank(request.getApplication())) { boolQuery.filter(QueryBuilders.termQuery("application", request.getApplication())); } sourceBuilder.query(boolQuery); // 排序 sourceBuilder.sort(SortBuilders.fieldSort("@timestamp") .order(SortOrder.DESC)); // 分页 sourceBuilder.from((request.getPage() - 1) * request.getSize()) .size(request.getSize()); // 聚合 addLogAggregations(sourceBuilder, request); // 执行搜索 SearchResponse response = executeSearch(index, sourceBuilder); return processLogSearchResult(response, request); } // 错误日志统计 public ErrorStats getErrorStats(Date startTime, Date endTime, String application) throws IOException { String index = getLogIndex(startTime, endTime); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 时间范围 RangeQueryBuilder timeRange = QueryBuilders.rangeQuery("@timestamp") .gte(startTime) .lte(endTime); BoolQueryBuilder boolQuery = QueryBuilders.boolQuery() .filter(timeRange) .filter(QueryBuilders.termQuery("level", "ERROR")); if (StringUtils.isNotBlank(application)) { boolQuery.filter(QueryBuilders.termQuery("application", application)); } sourceBuilder.query(boolQuery); sourceBuilder.size(0); // 错误数量聚合 AggregationBuilder countAgg = AggregationBuilders .cardinality("error_count") .field("trace_id"); // 按应用分组 AggregationBuilder appAgg = AggregationBuilders .terms("by_application") .field("application") .size(20) .subAggregation(AggregationBuilders .cardinality("app_error_count") .field("trace_id")); // 按时间分组 AggregationBuilder timeAgg = AggregationBuilders .dateHistogram("by_time") .field("@timestamp") .calendarInterval(DateHistogramInterval.HOUR) .format("yyyy-MM-dd HH:00") .minDocCount(0) .extendedBounds( new LongBounds(startTime.getTime(), endTime.getTime())) .subAggregation(AggregationBuilders .cardinality("hour_error_count") .field("trace_id")); sourceBuilder.aggregation(countAgg); sourceBuilder.aggregation(appAgg); sourceBuilder.aggregation(timeAgg); SearchResponse response = executeSearch(index, sourceBuilder); return processErrorStats(response); } // 慢查询分析 public SlowQueryStats analyzeSlowQueries(Date startTime, Date endTime) throws IOException { String index = getLogIndex(startTime, endTime); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 查找慢查询日志 BoolQueryBuilder boolQuery = QueryBuilders.boolQuery() .filter(QueryBuilders.rangeQuery("@timestamp") .gte(startTime) .lte(endTime)) .filter(QueryBuilders.termQuery("logger_name", "slow_query")) .filter(QueryBuilders.rangeQuery("duration").gte(1000)); // 超过1秒 sourceBuilder.query(boolQuery); sourceBuilder.size(0); // 按SQL类型分组 AggregationBuilder sqlTypeAgg = AggregationBuilders .terms("by_sql_type") .field("sql_type") .size(10) .subAggregation(AggregationBuilders .avg("avg_duration") .field("duration")) .subAggregation(AggregationBuilders .max("max_duration") .field("duration")) .subAggregation(AggregationBuilders .percentiles("duration_percentiles") .field("duration") .percentiles(50, 90, 95, 99)); // 按表分组 AggregationBuilder tableAgg = AggregationBuilders .terms("by_table") .field("table_name") .size(20) .subAggregation(AggregationBuilders .avg("avg_duration") .field("duration")); sourceBuilder.aggregation(sqlTypeAgg); sourceBuilder.aggregation(tableAgg); SearchResponse response = executeSearch(index, sourceBuilder); return processSlowQueryStats(response); } }

代码清单13:日志分析系统

8. 性能优化与监控

8.1 性能调优

@Component @Slf4j public class PerformanceTuner { // 1. JVM调优 public void tuneJvm() { // ES推荐JVM配置 // -Xms4g -Xmx4g // 堆内存,不超过32GB // -XX:+UseG1GC // G1垃圾回收器 // -XX:MaxGCPauseMillis=200 // -XX:G1ReservePercent=25 // -XX:InitiatingHeapOccupancyPercent=30 } // 2. 索引设置优化 public Settings getOptimizedSettings() { return Settings.builder() .put("index.number_of_shards", 3) // 根据数据量调整 .put("index.number_of_replicas", 1) .put("index.refresh_interval", "30s") // 写入频繁时调大 .put("index.translog.durability", "async") // 异步translog .put("index.translog.sync_interval", "5s") .put("index.translog.flush_threshold_size", "512mb") .put("index.merge.scheduler.max_thread_count", 1) // 机械硬盘 .put("index.merge.scheduler.max_merge_count", 6) .put("index.unassigned.node_left.delayed_timeout", "5m") .build(); } // 3. 查询调优 public SearchSourceBuilder tuneSearch(SearchSourceBuilder builder) { // 启用查询缓存 builder.query(QueryBuilders.boolQuery() .must(QueryBuilders.matchQuery("name", "手机")) .filter(QueryBuilders.termQuery("status", "active"))) .requestCache(true); // 开启查询缓存 // 设置超时 builder.timeout(TimeValue.timeValueSeconds(5)); // 限制返回字段 builder.fetchSource(new String[]{"id", "name", "price"}, null); // 禁用评分 builder.query(QueryBuilders.constantScoreQuery( QueryBuilders.termQuery("category", "electronics"))); return builder; } // 4. 监控指标收集 @Scheduled(fixedDelay = 60000) public void collectMetrics() throws IOException { // 集群健康 ClusterHealthRequest healthRequest = new ClusterHealthRequest(); ClusterHealthResponse healthResponse = client.cluster() .health(healthRequest, RequestOptions.DEFAULT); // 节点状态 NodesStatsRequest nodesRequest = new NodesStatsRequest(); nodesRequest.indices(true); nodesRequest.os(true); nodesRequest.jvm(true); nodesRequest.threadPool(true); NodesStatsResponse nodesResponse = client.nodes() .stats(nodesRequest, RequestOptions.DEFAULT); // 索引状态 IndicesStatsRequest indicesRequest = new IndicesStatsRequest(); indicesRequest.all(); IndicesStatsResponse indicesResponse = client.indices() .stats(indicesRequest, RequestOptions.DEFAULT); // 发送到监控系统 sendToMonitoringSystem(healthResponse, nodesResponse, indicesResponse); // 检查是否需要扩容 checkAndScale(healthResponse, nodesResponse, indicesResponse); } // 5. 热点分片识别 public void identifyHotShards() throws IOException { IndicesStatsRequest request = new IndicesStatsRequest(); request.all(); IndicesStatsResponse response = client.indices() .stats(request, RequestOptions.DEFAULT); Map<String, IndicesStats> indices = response.getIndices(); for (Map.Entry<String, IndicesStats> entry : indices.entrySet()) { String index = entry.getKey(); IndicesStats stats = entry.getValue(); ShardStats[] shardStats = stats.getShards(); for (ShardStats shardStat : shardStats) { // 检查查询负载 if (shardStat.getStats().getSearch().getTotal().getQueryCount() > 10000) { // 阈值 log.warn("热点分片: {}/{},查询次数: {}", index, shardStat.getShardRouting().getId(), shardStat.getStats().getSearch().getTotal().getQueryCount()); // 触发分片重分配 rerouteHotShard(index, shardStat); } } } } }

代码清单14:性能调优

8.2 监控告警

# prometheus配置 scrape_configs: - job_name: 'elasticsearch' static_configs: - targets: ['localhost:9200'] metrics_path: '/_prometheus/metrics' # 告警规则 alerting_rules: - alert: ClusterHealthRed expr: elasticsearch_cluster_health_status{color="red"} == 1 for: 5m labels: severity: critical annotations: summary: "ES集群状态为RED" - alert: HighCpuUsage expr: rate(elasticsearch_process_cpu_percent[5m]) > 0.8 for: 2m labels: severity: warning annotations: summary: "ES节点CPU使用率过高" - alert: HighHeapUsage expr: elasticsearch_jvm_memory_used_bytes / elasticsearch_jvm_memory_max_bytes > 0.8 for: 2m labels: severity: warning annotations: summary: "ES节点堆内存使用率过高" - alert: HighDiskUsage expr: elasticsearch_filesystem_data_used_bytes / elasticsearch_filesystem_data_size_bytes > 0.8 for: 5m labels: severity: warning annotations: summary: "ES节点磁盘使用率过高" - alert: HighSearchLatency expr: histogram_quantile(0.95, rate(elasticsearch_indices_search_query_time_seconds_bucket[5m])) > 1 for: 2m labels: severity: warning annotations: summary: "ES搜索延迟过高" - alert: UnassignedShards expr: elasticsearch_cluster_health_unassigned_shards > 0 for: 5m labels: severity: critical annotations: summary: "ES有未分配的分片"

代码清单15:监控告警配置

9. 故障排查指南

9.1 常见问题排查

@Component @Slf4j public class TroubleshootingGuide { // 1. 搜索慢 public void diagnoseSlowSearch(String index, String query) throws IOException { // 启用profile SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(QueryBuilders.matchQuery("name", query)); sourceBuilder.profile(true); SearchRequest request = new SearchRequest(index); request.source(sourceBuilder); SearchResponse response = client.search(request, RequestOptions.DEFAULT); // 分析profile结果 Map<String, ProfileShardResult> profileResults = response.getProfileResults(); for (Map.Entry<String, ProfileShardResult> entry : profileResults.entrySet()) { log.info("分片{}的profile结果:", entry.getKey()); for (QueryProfileShardResult queryProfile : entry.getValue().getQueryProfileResults()) { log.info("查询类型: {}, 耗时: {}ms", queryProfile.getQueryName(), queryProfile.getTime()); // 打印详细信息 for (ProfileResult profileResult : queryProfile.getQueryResults()) { log.info("详细信息: {}", profileResult); } } } } // 2. 写入慢 public void diagnoseSlowIndexing(String index) throws IOException { // 检查refresh间隔 GetSettingsRequest settingsRequest = new GetSettingsRequest() .indices(index) .names("index.refresh_interval"); GetSettingsResponse settingsResponse = client.indices() .getSettings(settingsRequest, RequestOptions.DEFAULT); log.info("refresh间隔: {}", settingsResponse.getSetting(index, "index.refresh_interval")); // 检查translog IndicesStatsRequest statsRequest = new IndicesStatsRequest() .indices(index) .clear() .docs(true) .store(true) .indexing(true) .search(true); IndicesStatsResponse statsResponse = client.indices() .stats(statsRequest, RequestOptions.DEFAULT); IndexStats indexStats = statsResponse.getIndex(index); log.info("索引统计: {}", indexStats); } // 3. 内存高 public void diagnoseHighMemory() throws IOException { NodesStatsRequest request = new NodesStatsRequest(); request.jvm(true); NodesStatsResponse response = client.nodes() .stats(request, RequestOptions.DEFAULT); for (NodeStats nodeStats : response.getNodes()) { JvmStats jvmStats = nodeStats.getJvm(); JvmStats.Mem mem = jvmStats.getMem(); log.info("节点{}内存使用: {}/{}, GC次数: {}", nodeStats.getNode().getName(), mem.getUsed(), mem.getHeapMax(), jvmStats.getGc().getCollectors().get("young").getCollectionCount()); // 检查fielddata if (nodeStats.getIndices().getFieldData() != null) { log.info("fielddata内存: {}", nodeStats.getIndices().getFieldData().getMemorySize()); } } } // 4. 分片未分配 public void diagnoseUnassignedShards() throws IOException { ClusterAllocationExplainRequest request = new ClusterAllocationExplainRequest(); ClusterAllocationExplainResponse response = client.cluster() .allocationExplain(request, RequestOptions.DEFAULT); log.info("分片分配解释: {}", response.getExplanation()); // 检查磁盘空间 NodesStatsRequest nodesRequest = new NodesStatsRequest(); nodesRequest.fs(true); NodesStatsResponse nodesResponse = client.nodes() .stats(nodesRequest, RequestOptions.DEFAULT); for (NodeStats nodeStats : nodesResponse.getNodes()) { FsInfo fsInfo = nodeStats.getFs(); for (FsInfo.Path path : fsInfo) { log.info("节点{}磁盘使用: {}/{} ({}%)", nodeStats.getNode().getName(), path.getAvailable(), path.getTotal(), (path.getTotal() - path.getAvailable()) * 100 / path.getTotal()); } } } // 5. 热点节点 public void diagnoseHotNodes() throws IOException { NodesStatsRequest request = new NodesStatsRequest(); request.indices(true); request.os(true); NodesStatsResponse response = client.nodes() .stats(request, RequestOptions.DEFAULT); for (NodeStats nodeStats : response.getNodes()) { // 检查查询负载 long queryCount = nodeStats.getIndices().getSearch() .getTotal().getQueryCount(); // 检查索引负载 long indexCount = nodeStats.getIndices().getIndexing() .getTotal().getIndexCount(); // 检查CPU double cpuPercent = nodeStats.getOs().getCpu().getPercent(); log.info("节点{}负载 - 查询: {}, 索引: {}, CPU: {}%", nodeStats.getNode().getName(), queryCount, indexCount, cpuPercent); if (cpuPercent > 80 || queryCount > 10000) { log.warn("节点{}可能是热点节点", nodeStats.getNode().getName()); } } } }

代码清单16:故障排查工具

10. 选型与总结

10.1 ES vs 其他方案对比

方案

优点

缺点

适用场景

Elasticsearch

功能全,生态好,性能优秀

资源消耗大,运维复杂

全文搜索、日志分析

Solr

成熟稳定,功能丰富

社区活跃度下降,实时性差

文档搜索、企业搜索

OpenSearch

ES开源分支,AWS支持

生态不如ES

AWS环境,需要完全开源

MeiliSearch

轻量快速,简单易用

功能相对简单

小型应用,简单搜索

PostgreSQL

事务支持,SQL查询

搜索功能弱,性能差

已有PG,简单搜索需求

10.2 我的"ES军规"

  1. 分片设计要合理:单个分片不超过50GB
  2. 映射设计要严谨:禁用动态映射,明确字段类型
  3. 查询要优化:避免wildcard,善用filter
  4. 监控要全面:集群健康、性能指标、业务指标
  5. 容量要规划:提前规划扩容,设置水位线
  6. 备份要定期:定期快照,测试恢复

11. 最后的话

Elasticsearch是强大的搜索引擎,但不是银弹。理解原理,合理设计,持续监控,才能用好这个强大的工具。

我见过太多团队在这上面栽跟头:有的分片数不合理,有的查询没优化,有的没监控导致故障。

记住:ES是工具,不是魔法。结合业务特点,设计合适方案,做好监控和优化,才是正道。

📚 推荐阅读

官方文档

  1. Elasticsearch官方文档​ - 最全的ES文档
  2. Java客户端文档​ - Java客户端详细文档

源码学习

  1. Elasticsearch源码​ - 官方源码
  2. Lucene源码​ - 底层搜索引擎

最佳实践

  1. Elasticsearch最佳实践​ - 官方最佳实践
  2. 电商搜索架构​ - 电商搜索架构设计

监控工具

  1. Kibana监控​ - ES官方监控工具
  2. Prometheus监控​ - 指标监控

最后建议:从简单场景开始,理解原理后再尝试复杂方案。做好监控,设置合理的分片和副本,定期优化查询。记住:搜索优化是个持续的过程,不是一次性的任务

Read more

Openclaw ubuntu 22.04部署,超详细,对接百炼模型(中文社区版)

一、安装要求 1、node版本必须>=22.0 node下载网址:https://nodejs.org/en/download 2、linux系统版本大于centos7,推荐用centos8或者ubuntu22或更高版本 3、提前准备好对接的AI平台的ApiKey秘钥,例如百炼,Kimi,MiniMax,openai等 4、安装openclaw的机器可访问公网 5、参考文档 官网:https://openclaw.ai/ 中文社区官网:https://clawd.org.cn/ 二、安装步骤 1、安装git sudo apt update && sudo apt install git -y git

By Ne0inhk
时序数据库选型指南:用工程视角理解 Apache IoTDB

时序数据库选型指南:用工程视角理解 Apache IoTDB

摘要:在工业物联网(IIoT)数据爆发式增长的今天,通用数据库已难以应对海量测点的高频写入与复杂聚合查询。本文将从工程落地的角度出发,探讨时序数据库(TSDB)的选型关键维度,并深入解析 Apache IoTDB 在架构设计、数据模型及端边云协同方面的技术特性。 文章目录 * 一、 引言:为什么我们需要专用的时序数据库? * 二、 选型核心维度与 IoTDB 的设计哲学 * 2.1 数据模型:树形结构 vs 标签模型 * 2.2 存储引擎:LSM Tree 与 TsFile 的深度优化 * 核心技术拆解 * 架构流程图:IoTDB 写入与压缩流程 * 2.3 分布式架构:MPP 与 共识协议 * 三、 实战演练:从定义到分析 * 3.

By Ne0inhk

告别VNC!Ubuntu 22.04原生RDP远程桌面配置全攻略(含高分屏适配技巧)

告别VNC!Ubuntu 22.04原生RDP远程桌面配置全攻略(含高分屏适配技巧) 如果你和我一样,长期在Windows和Linux双系统之间切换,或者需要远程管理一台Ubuntu桌面服务器,那么“远程桌面”这个需求一定不陌生。过去,我们通常会选择VNC方案,比如TigerVNC、RealVNC,但体验过的人都知道,VNC在跨平台、网络带宽占用、尤其是高分屏支持上,总是差那么点意思——画面卡顿、色彩失真、缩放模糊,这些问题在需要精细操作的设计或开发工作中尤为恼人。 好消息是,从Ubuntu 22.04 LTS “Jammy Jellyfish”开始,GNOME桌面环境正式集成了对微软RDP(Remote Desktop Protocol) 协议的原生支持。这意味着,你现在可以直接使用Windows系统自带的“远程桌面连接”(mstsc)或macOS上的Microsoft Remote Desktop,像连接另一台Windows电脑一样,无缝接入你的Ubuntu桌面。这不仅仅是换了个协议那么简单,它带来的是更低的延迟、更好的图形压缩效率、原生剪贴板共享、

By Ne0inhk
ARM Linux 驱动开发篇--- Linux 并发与竞争实验(自旋锁实现 LED 设备互斥访问)--- Ubuntu20.04自旋锁实验

ARM Linux 驱动开发篇--- Linux 并发与竞争实验(自旋锁实现 LED 设备互斥访问)--- Ubuntu20.04自旋锁实验

🎬 渡水无言:个人主页渡水无言 ❄专栏传送门: 《linux专栏》《嵌入式linux驱动开发》《linux系统移植专栏》 ❄专栏传送门: 《freertos专栏》《STM32 HAL库专栏》 ⭐️流水不争先,争的是滔滔不绝  📚博主简介:第二十届中国研究生电子设计竞赛全国二等奖 |国家奖学金 | 省级三好学生 | 省级优秀毕业生获得者 | ZEEKLOG新星杯TOP18 | 半导纵横专栏博主 | 211在读研究生 在这里主要分享自己学习的linux嵌入式领域知识;有分享错误或者不足的地方欢迎大佬指导,也欢迎各位大佬互相三连 目录 前言 一、实验基础说明 1.1、自旋锁简介 1.2 本次实验设计思路 二、硬件原理分析(看过之前博客的可以忽略) 三、实验程序编写 3.1 自旋锁 LED 驱动代码(spinlock.c) 3.2、驱动代码分段解析 3.2.

By Ne0inhk