elasticsearch作为平台搜索在项目中的使用

elasticsearch介绍:

Elasticsearch(ES)是一个基于Lucene构建的开源、分布式、RESTful接口的全文搜索引擎。Elasticsearch还是一个分布式文档数据库,其中每个字段均可被索引,而且每个字段的数据均可被搜索,ES能够横向扩展至数以百计的服务器存储以及处理PB级的数据。可以在极短的时间内存储、搜索和分析大量的数据。

项目使用es作为全平台搜索,考虑的是elasticsearch的良好的搜索性,支持中文分词、高亮显示、搜索排序算法、富文本标签过滤处理,大数据量搜索等的特性。

elasticsearch重要概念介绍:

  • index

index是索引,相似文档的集合。项目使用中把mysql中的表的作为各自的索引,这么使用的好处一个是业务逻辑清晰,还有相同文档放在一个index中,无论是同步和mapping都非常便捷,这个后续会提到。

  • document

文档,在项目使用中对应mysql的一行数据。

  • type

类型,这个要注意,在7.x之前版本type是可以有多个的,7.x之后版本,type只有一个,是_doc,所以在接口搜索上面会有区别。

  • Shards & Replicas

分片和副本,分片和副本是es分布式和高可用的体现。控制分片所在节点和控制分片大小来优化es集群,提高es吞吐能力。

  • field

字段,类似mysql中字段的概念。一个document中可以多个字段,字段有不用的类型:long、int、date、string等。

  • mapping

类似mysql的表结构定义。对index进行mapping,可以对document的field类型进行定义,使用的分词器定义、是否对字段进行索引 等功能。
目前5.x 、6.x 7.x的mapping一经定义,是不可以动态修改的。所以需要重新创建index才可以重新定义mapping。

elasticsearch项目中的运用

线上环境使用的是es6.4.3版本,自己测试的单机版本是最近的7.3.0版本,但是7.x版本包括spring-data-elasticsearch的api并不兼容,而且7.x取消了type可以设置多个,所以很多api或者查询并以一致,这里面有很多坑,要踩。

说下项目中的运用,之前是直接通过springboot-data-elasticsearch的api,来创建document,然后搜索也是调用springboot-data-elasticsearch的api来进行搜索。但是有几个问题存在:

  • es侵入到了业务代码中,比如个人文件从回收站回收要重新记录到es里面,但是协同工作,功能的更改,可以要修改es代码,未来维护起来比较麻烦。
  • 历史数据的导入问题,重新创建es索引,或者数据库本身就存在的历史数据、或者项目迁移,es环境更换,都会导致历史数据不能同步的问题

所以是用来logstash-input-jdbc的同步方案。

logstash-input-jdbc同步方案介绍

logstash-input-jdbc是logstash对接关系型数据库的插件,轻量的定时同步工具,logstash天然的和elasticsearch匹配的便利性。支持增量同步和全量同步。但是在实际的项目运行中遇到了一些相对麻烦需要研究的地方:

同步是根据sql来进行,所以设计到复杂数据结构和复杂的sql查询数据处理会比较复杂,要通过logstash的filter的各种插件来进行数据的转化,具体项目中使用的filter后续说明。

logstash-input-jdbc部署

  1. 安装logstash

  2. 安装logstash-input-jdbc插件

    cd bin
    ./logstash-plugin install logstash-input-jdbc
    
  1. 在logstash文件夹下创建:

    mkdir config-mysql
    cd config-mysql
    vim mysql.conf 
    
  1. 输入:(这个只是案例)

    input {
        stdin {
        }
        jdbc {
          # 数据库
          jdbc_connection_string => "jdbc:mysql://localhost:3306/test01"
          # 用户名密码(你的数据库的用户名和密码)
          jdbc_user => "root"
          jdbc_password => "password"
          # jar包的位置
          jdbc_driver_library => "/usr/local/logstash-5.6.3/mysql-connector-java-5.1.41.jar"
          # mysql的Driver
          jdbc_driver_class => "com.mysql.jdbc.Driver"
          jdbc_paging_enabled => "true"
          jdbc_page_size => "50000"
          # statement_filepath代码sql文件,statement是sql,二选一就可以
          #statement_filepath => "config-mysql/test02.sql"
          statement => "select * from test02"
          # schedule 定时器分别代表“分 小时 天  月 年”,最小的单位是1分钟
          schedule => "* * * * *"
          #索引的类型,在filter和output进行逻辑判断时候使用
          type => "test02"
        }
    }
    
    filter {
        json {
            source => "message"
            remove_field => ["message"]
        }
    }
    
    output {
        elasticsearch {
            hosts => "127.0.0.1:9200"
            # index名
            index => "test01"
            # 需要关联的数据库中有有一个id字段,对应索引的id号
            document_id => "%{id}"
        }
        stdout {
            codec => json_lines
        }
    }
    
  2. 启动:

    ./logstash -f config-mysql/mysql.conf
    

项目中对es Mapping的使用

项目最开始创建mapping是通过使用springboot-data-elasticsearch的注解方式来创建mapping

@Document(indexName = "article",type = "docs")
public class EsArticlesEntity {

    /**
     * id
     */
    @Id
    private Long id;

    /**
     * GUID
     */
    @Field(type=FieldType.Text)
    private String guid;

    /**
     * 标题
     */
    @Field(type=FieldType.Text, analyzer="ik_max_word",store=true)
    private String title;
}

但是在通过es的查询mapping 接口,发现只有index和field创建成功了,但是字段的类型并没有生效,而是自动生成的,spring-boot-elasticsearch api并没有完全紧跟es的更新步伐,可能是注解接口没有生效导致的。

这里使用了ik_max_word的analyzer(分析器),是通过es安装的ik分词器,ik有很好的中文分词效果,ik_max_word是把一段话用最小的颗粒度来进行分词,ik_smart是智能分词,颗粒度较大。

es安装ik:

cd elasticsearch-7.3.0/bin/
./elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.3.0/elasticsearch-analysis-ik-7.3.0.zip
cd analysis-ik/
vim IKAnalyzer.cfg.xml

修改:

<!--用户可以在这里配置自己的扩展字典 -->
<entry key="ext_dict">extra_single_word_full.dic</entry>

重启即可

自定义analyzer

项目实际中有富文本,所以会有大量的标签页,对这些HTML标签需要过滤,否则会影响搜索的结果。所以需要使用自定义的分析器,analyzer由几部分构成:从文档中提取词元(Token)的算法称为分词器(Tokenizer),在分词前预处理的算法称为字符过滤器(Character Filter),进一步处理词元的算法称为词元过滤器(Token Filter),最后得到词(Term)。这整个分析算法称为分析器(Analyzer)。使用html_strip的过滤器可以对html标签进行过滤,不过这里在6.x版本需要注意,使用的时候需要先停用索引,创建好analyzer在开启索引

curl -XPOST 'localhost:9200/article/_close'

curl -XPUT "http://localhost:9200/article/_settings" -H 'Content-Type: application/json' -d'
{
  "index": {
    "analysis": {
      "analyzer": {
        "ik-html": {
          "char_filter": [ "html_strip" ],
          "tokenizer": "ik_max_word"
        }
      }
    }
  }
}'

curl -XPOST 'localhost:9200/article/_open'

ik-html是自己定义的名称,之后可以在创建mapping中可以使用ik-html这个analyzer了。

项目实际运用的重点代码梳理

概述

重要字段:

字段 | 类型 |  描述  
-    |-    |  -
id | long |  |
guid | text |  |
title | text |标题  |
subTitle | text |副标题  |
createTime | long |  |
content | text | 内容 |
organizationIds | long |可见组织(鉴权),是数组  |
hasOpen | integer |是否可见(鉴权)  |
validityDate | long | 有效期限(文章可见的条件)  |
validityDateType | integer | 是否永久 |
creatorName | text |  |

搜索需要根据title、subtitle、content这3个字段的来进行字符串搜索,通过organizationIds、hasopen、validitydate和validitydatetype字段来控制搜索范围,具有一定权限的和在有效期限的document才可以被搜索到。

springboot-data-elasticsearch重要代码

//用户所在组织是否有读取权限,获取用户所在组织id,组织builder
BoolQueryBuilder organizationBuilder = QueryBuilders.boolQuery();
Long[] organizations = null ;
if (StringUtils.isNotBlank(user.getOrganizations())) {
    String[] orgIds = user.getOrganizations().split(",");
    organizations = (Long[]) ConvertUtils.convert(orgIds, Long.class);
}
if(organizations!=null&&organizations.length>0){
    for(int i = 0;i<organizations.length;i++){
        organizationBuilder.should(QueryBuilders.termQuery("organizationIds",organizations[i]));
    }
}
organizationBuilder.should(QueryBuilders.termQuery("hasOpen",1));
organizationBuilder.should(QueryBuilders.termQuery("userIds",user.getUserId()));

// 判断文件是否在有效期内
BoolQueryBuilder validityTimeBuilder = QueryBuilders.boolQuery();
validityTimeBuilder.should(QueryBuilders.termQuery("validityDateType",1));
validityTimeBuilder.should(QueryBuilders.rangeQuery("validityDate").gt(new Date().getTime()));
//生成查询builder
QueryBuilder boolBuilder = QueryBuilders.boolQuery().must(organizationBuilder).must(validityTimeBuilder);

// 查询q,搜索title、内容、subtitle
if(StringUtils.isNotBlank(form.getQ())){
    //QueryBuilder stringBuilder = QueryBuilders.queryStringQuery(form.getQ()).field("title").field("content").field("subTitle");

    MultiMatchQueryBuilder matchQuery = QueryBuilders.multiMatchQuery(form.getQ(),"title","content","subTitle");

    ((BoolQueryBuilder) boolBuilder).must(matchQuery);
}

es搜索should相当于mysql的or,must相当于mysql的and

term搜索是精确查询,这里用来对权限和有效期间进行过滤,match是模糊查询,用来查询title、content、subtitle等字段

SearchQuery searchQuery = new NativeSearchQueryBuilder()
                //设置查询条件
                .withIndices(str)
                .withQuery(boolBuilder)
                // 高亮设置
                .withHighlightFields(new HighlightBuilder.Field("content")
                                .preTags("<em>")
                                .postTags("</em>"),
                        new HighlightBuilder.Field("title")
                                .preTags("<em>")
                                .postTags("</em>"),
                        new HighlightBuilder.Field("subTitle")
                                .preTags("<em>")
                                .postTags("</em>")
                )
                //设置分页信息
                .withPageable(pageable)
                //创建SearchQuery对象
                .build();

构建searchquery,对需要的高亮字段进行处理。

创建mapping

这里以article索引为例:

curl -XDELETE "http://localhost:9200/article"

curl -XPUT "http://localhost:9200/article/"

curl -XPOST 'localhost:9200/article/_close'

curl -XPUT "http://localhost:9200/article/_settings" -H 'Content-Type: application/json' -d'
{
  "index": {
    "analysis": {
      "analyzer": {
        "ik-html": {
          "char_filter": [ "html_strip" ],
          "tokenizer": "ik_max_word"
        }
      }
    }
  }
}'

curl -XPOST 'localhost:9200/article/_open'

curl -XPOST "http://localhost:9200/article/_mapping" -H 'Content-Type: application/json' -d'
{
    "properties" : {
          "content" : {
            "type" : "text",
            "analyzer": "ik-html",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "createTime" : {
            "type" : "date"
          },
     "validityDate" : {
            "type" : "date"
          },
          "creatorName" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "guid" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "hasOpen" : {
            "type" : "long"
          },
          "id" : {
            "type" : "long"
          },
          "subTitle" : {
            "type" : "text",
            "analyzer": "ik_max_word",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "title" : {
            "type" : "text",
            "analyzer": "ik_max_word",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "validityDateType" : {
            "type" : "long"
          }
        }
}'

logstash-input-jdbc重要片段

在mysql.conf的input jdbc中需要加上这段话,否者默认会有小写转化

lowercase_column_names => false

logstash配置的mysql.conf 文件,organizationIds需要把“11,22”的字符串转化成[11,22]的整形数组;

letterType存放json格式的文件,这里需要转化下,把1转成{“letterType”:1}的形式

filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
    if[type] == "document"{
      mutate {
        split => ["organizationIds",","]  #organizationIds把“11,22”的字符串转化成[“11”,“22”]的字符串数组
      }
      mutate {
        convert => {"organizationIds" => "integer"}  #这里直接通过convert可以将字符串数组[“11”,“22”]转成整形数组[11,22]
      }
    }
    if[type] == "letter"{
      mutate {
        update => { "letterType" => "{\"letterType\":%{letterType}}"}
        split => ["userIds",","]
      }
      mutate {
        convert => {"userIds" => "integer"}
      }
    }
    。。。。。省略
}

mysql同步文件,最后一句通过和sql_last_value的比较进行增量查询,如果去掉的话,就是全量查询

SELECT
        oa_documents.document_id as id,
        document_guid as guid,
        document_name as title,
        oa_documents.create_time as createTime,
        expire_infinity as validityDateType,
        expire_time as validityDate,
                IF(b.organizationIds = 0, 1, 0) as hasOpen,
                b.organizationIds
FROM
        oa_documents,
        (
SELECT
        oa_documents.document_id,
        group_concat(oa_document_organization_rss.organization_id) organizationIds
FROM
        oa_documents
        LEFT JOIN oa_document_organization_rss ON oa_documents.document_id = oa_document_organization_rss.document_id
GROUP BY
        oa_documents.document_id
        ) b
WHERE
        oa_documents.document_id IN ( b.document_id ) and oa_documents.has_directory=2 and
        oa_articles.create_time >= :sql_last_value

后续

通过项目es的基本使用掌握差不多了,后续通过项目进展有2点改进方向:

  1. 搜索结果的优化:搜索结果的优化方式很多,可以通过调整分片、调整field得分比重等多个方面根据业务进行优化
  2. 联想搜索:对用户输入进行联想搜索,提高用户体验