0%

ElasticSearch使用painless脚本小记

  • 前几天项目中碰到了一个麻烦的问题,统计Doc中一个List中符合筛选条件的项的值的和(简单说就是Doc是商店为主体,统计该商店的某个时间段下单记录),开始用的是二次查询,但是后来数据量大了以后,第二次查询用的IDs查询传了几千个值,超过了ES的限制,最后只好拿出脚本进行解决。
  • 先说一下Doc的结构,不相干的字段就删掉了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
"shopid": 11,
"orderList": [
{
"orderTime":"2015-06-06",
"orderAmount":2254
},
{
"orderTime":"2016-06-06",
"orderAmount":7575.66
},
{
"orderTime":"2017-06-06",
"orderAmount":6533.99
},
{
"orderTime":"2018-06-06",
"orderAmount":7870.20
}
]
}
  • 业务的查询需求是筛选2016-2018年,下单金额累计超过13000的商店(当然还有其他附加条件的查询),orderList还是Nested字段,最开始是进行二次查询,也就是首先将其他条件查询出来,然后对结果进行过滤,在这个日期下的符合条件的取出来ID,进行二次查询(这个条件改成Ids查询),最开始还能正常工作,后来不加上这个条件一下查出来一千条数据的时候就麻烦了,es默认Bool查询条件不能超过1024,硬着头皮改了ES配置,调成了1W,总算是正常工作了,但是不是长久之计,随着订单越来越多,迟早还会有问题,而且查询的时间越来越长了(查出来7000多条数据的时候已经超过10s了。。),跟业务部门沟通了,他们暂时可以接受,但是让我们尽快优化。
  • 网上查找了很多资料,也想使用innerHits解决,但是innerHits结果无法参与aggs,实在没有其他办法,只好决定用脚本来解决。在上家公司用的是ES2.X,脚本groovy默认是关闭的,开启还有重启ES修改配置,觉得麻烦,而且据说有安全隐患,对这个一比较抵触,当下用的是ES5.6,看了一下脚本用的是painless,这个从来没有接触过,看了一下语法,好像也不太复杂,和Java差不太多,那就硬着头皮上。
  • painless什么时候开始支持的没太关注,不过看文档说好像已经是容器内运行,相对比较安全了,而且ES5.6默认已经开启了,尝试过程中发现Nested类型取值只能取到一个内部文档的,list其他对象找不到。冗余出来一个非nested字段,发现日期和金额都是分别排序的,这就很尴尬了。。。最后决定吧订单金额和下单时间拼接成字符串,存到一个数组中去,这样取值的时候,金额和日期就是一一对应的。
  • 修改后的Doc结构如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
{
"shopid": 11,
"orderListStr":[
"2015-06-06@2254",
"2016-06-06@7575.66",
"2017-06-06@6533.99",
"2018-06-06@7870.20"
],
"orderList": [
{
"orderTime":"2015-06-06",
"orderAmount":2254
},
{
"orderTime":"2016-06-06",
"orderAmount":7575.66
},
{
"orderTime":"2017-06-06",
"orderAmount":6533.99
},
{
"orderTime":"2018-06-06",
"orderAmount":7870.20
}
]
}
  • 然后就是对照着文档写painless脚本了,个人水平有限,写的脚本也很低效,不过好赖问题是解决了,附上脚本:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
String fotmat = 'yyyy-MM-dd';
String field = 'orderListStr';
double saleNum = 0.0;
for (int i = 0; i < doc[field].length; ++i) {
def docStr=doc[field][i];
def date = docStr.substring(0,docStr.indexOf('@'));
double value =Double.parseDouble(docStr.substring(docStr.indexOf('@')+1));
SimpleDateFormat sdf = new SimpleDateFormat(fotmat);
if (params.from != null && params.from != '' && params.to != null && params.to != '' ){
if (sdf.parse(params.from).getTime()<=sdf.parse(date).getTime() && sdf.parse(params.to).getTime()>=sdf.parse(date).getTime())
saleNum+=value;
}
else if (params.from != null && params.from != ''){
if( sdf.parse(params.from).getTime()<=sdf.parse(date).getTime())
saleNum+=value;
}
else if (params.to != null && params.to != ''){
if( sdf.parse(params.to).getTime()>=sdf.parse(date).getTime())
saleNum+=value;
}else {
saleNum+=value;
}
}
if (params.inputFrom != null && params.inputTo != null){
return params.inputFrom<=saleNum && saleNum<=params.inputTo;
} else if(params.inputFrom !=null){
return params.inputFrom<=saleNum;
} else if(params.inputTo != null){
return saleNum<=params.inputTo;
}else{
return true;
}
  • 使用脚本解决了时间段下单次数统计和时间段金额统计的查询,时间也从原来的接近10s下降到1s左右(感觉脚本没办法走索引,应该都是全表扫描),这个肯定不是最优方法,但是 现在也想不出来什么其他高效的方法,就先上这个了,最起码目前查询最近5年的订单也是1秒左右,而且不用担心出现bool数量超过限制的问题了。

  • 如果bool查询的查询条件过多会导致TooManyClauses问题:

    1
    2
    "caused_by":{"type":"too_many_clauses","reason":"maxClauseCount is set to 1024"}}}],
    "caused_by":{"type":"query_shard_exception","reason":"failed to create query:
  • 解决方式在配置文件 Elasticsearch.yml中配置:

    • index.query.bool.max_clause_count: 10240
  • 设置最大限制bool查询的条数,过多会导致性能比较慢。

  • 在ElasticSearch 5之后参数有所改动,提示如下:

  • The setting index.query.bool.max_clause_count has been removed. In order to set the maximum number of boolean clauses indices.query.bool.max_clause_count should be used instead.

SQL 优化

负向查询不能使用索引

1
select name from user where id not in (1,3,4);

应该修改为:

1
select name from user where id in (2,5,6);

前导模糊查询不能使用索引

如:

1
select name from user where name like '%zhangsan'

非前导则可以:

1
select name from user where name like 'zhangsan%'

建议可以考虑使用 Lucene 等全文索引工具来代替频繁的模糊查询。

数据区分不明显的不建议创建索引

如 user 表中的性别字段,可以明显区分的才建议创建索引,如身份证等字段。

字段的默认值不要为 null

这样会带来和预期不一致的查询结果。

在字段上进行计算不能命中索引

1
select name from user where FROM_UNIXTIME(create_time) < CURDATE();

应该修改为:

1
select name from user where create_time < FROM_UNIXTIME(CURDATE());

最左前缀问题

如果给 user 表中的 username pwd 字段创建了复合索引那么使用以下SQL 都是可以命中索引:

1
2
3
4
5
select username from user where username='zhangsan' and pwd ='axsedf1sd'

select username from user where pwd ='axsedf1sd' and username='zhangsan'

select username from user where username='zhangsan'

但是使用

1
select username from user where pwd ='axsedf1sd'

是不能命中索引的。

如果明确知道只有一条记录返回

1
select name from user where username='zhangsan' limit 1

可以提高效率,可以让数据库停止游标移动。

不要让数据库帮我们做强制类型转换

1
select name from user where telno=18722222222

这样虽然可以查出数据,但是会导致全表扫描。

需要修改为

1
select name from user where telno='18722222222'

如果需要进行 join 的字段两表的字段类型要相同

不然也不会命中索引。

直接使用AOP的拦截器,调用AfterReturning即可。
废话不多说直接上代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.gs.techpub.filter;

import com.gridsum.techpub.utils.JsonUtil;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
@Aspect
public class ResponseFilter {

private Logger logger = LoggerFactory.getLogger(this.getClass());
@AfterReturning(returning = "ret", pointcut = "execution( * com.gs.techpub.controller.*.*(..))")
public void doAfterReturning(Object ret) {
logger.info("返回值 : " + JsonUtil.getInstance().toJson(ret));
}
}

记得加上依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>

在Filter中读取inputSeream读取一次之后就无法再次读取,解决办法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class LoggerHttpServletRequestWrapper extends HttpServletRequestWrapper {

private final byte[] body;

public LoggerHttpServletRequestWrapper(HttpServletRequest request) throws IOException {
super(request);
body = StreamUtils.readBytes(request.getInputStream());
}

@Override
public BufferedReader getReader() {
return new BufferedReader(new InputStreamReader(getInputStream()));
}

@Override
public ServletInputStream getInputStream() {
final ByteArrayInputStream bais = new ByteArrayInputStream(body);
return new ServletInputStream() {

@Override
public boolean isFinished() {
return false;
}

@Override
public boolean isReady() {
return false;
}

@Override
public void setReadListener(ReadListener readListener) {

}

@Override
public int read() {
return bais.read();
}
};
}

}

调用如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
ServletRequest requestWrapper = null;
if(request instanceof HttpServletRequest) {
requestWrapper = new LoggerHttpServletRequestWrapper((HttpServletRequest) request);
if (((HttpServletRequest) request).getMethod().equals("POST")){
String path = ((HttpServletRequest) request).getServletPath();
String param = StreamUtils.streamToString(requestWrapper.getInputStream());
LoggerFactory.getLogger("filter."+path).info(param);
}else if (((HttpServletRequest) request).getMethod().equals("GET")){
String path = ((HttpServletRequest) request).getServletPath();
String queryString = ((HttpServletRequest) request).getQueryString();
LoggerFactory.getLogger("filter."+path).info(queryString);
}


}
if(requestWrapper == null) {
chain.doFilter(request, response);
} else {
chain.doFilter(requestWrapper, response);
}
}

工具类如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class StreamUtils {

/**
* @param inputStream inputStream
* @return 字符串转换之后的
*/
public static String streamToString(InputStream inputStream) {
try(BufferedReader br =new BufferedReader(new InputStreamReader(inputStream, "UTF-8"))) {
StringBuilder builder = new StringBuilder();
String output;
while((output = br.readLine())!=null){
builder.append(output);
}
return builder.toString();
} catch (IOException e) {
throw new RuntimeException("Http 服务调用失败",e);
}
}



public static byte[] readBytes(ServletInputStream inputStream) {
return streamToString(inputStream).getBytes(Charset.forName("UTF-8"));
}
}

试了一堆插件,最后用的还是 publish over SSH

jenkins基本配置不多说了,就是配置一下git仓储,配置一下gradle执行命令

1
2
clean
bootRepackage

之后执行Send build artifacts over SSH

提前配置好对应的服务器

Send build artifacts over SSH

配置图

麻烦的是执行restart.sh脚本,总是各种奇葩问题,最终结果如下:

1
2
3
4
5
6
7
#/bin/bash
pid=`ps -ef | grep spp.jar | grep -v grep | awk '{print $2}'`
if [ -n "$pid" ]
then
kill -9 $pid
fi
java -jar /data1/javaApp/smartPushPlatform/spp.jar --server.port=30001 > console.log &

这样基本就可以完成启动了,而且可以正常推出

编译日志如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
[Gradle] - Launching build.
[SmartPushPlatform] $ /usr/share/gradle/bin/gradle clean bootRepackage
Starting a Gradle Daemon (subsequent builds will be faster)
:clean
:compileJavaNote: /var/lib/jenkins/workspace/SmartPushPlatform/src/main/java/com/gridsum/techpub/legal/smartpush/service/TagService.java uses unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.

:processResources
:classes
:findMainClass
:jar
:bootRepackage

BUILD SUCCESSFUL in 7s
6 actionable tasks: 6 executed
Build step 'Invoke Gradle script' changed build result to SUCCESS
SSH: Connecting from host [gs-server-3602]
SSH: Connecting with configuration [10.202.81.26] ...
SSH: EXEC: STDOUT/STDERR from command [cd /data1/javaApp/smartPushPlatform
mv SmartPushPlatform-1.1.jar spp.jar
sh restart.sh] ...
SSH: EXEC: completed after 200 ms
SSH: Disconnecting configuration [10.202.81.26] ...
SSH: Transferred 1 file(s)
Finished: SUCCESS

使用Bulk请求进行Index

Bulk请求将产生比单文档index请求有更好的性能。至于Bulk请求中文档数量的大小,建议使用单一节点单一分片进行测试,先试试看100个,然后200个,然后400这样,每次进行翻倍测试,只要速度稳定了,也就是最合适的大小了。但是要注意一下,并不是速度最合适了就OK,因为每次请求总的大小要进行一下控制。并发发送的时候,ES内存压力会很大,一定要避免每次请求超过几十兆,即便是这样插入的性能更好(这个我踩过坑,我这测试超过10M,ES就不接受请求,直接拒绝了)。

使用多个节点或者多线程进行Index

一般来说一个线程,即便是使用了Bulk方式进行Index,也无法达到ES集群的瓶颈,所以为了最大限度的利用集群资源,使用多线程或者多进程的方式进行Index是一个很好的选择。这样不仅最大程度利用了集群资源,还帮助减少了fsync的成本。(这个fsync是什么 意思我暂时也没弄明白,后续补充)。
要注意一下TOO_MANY_REQUESTS (429) 相应(对应Java Client 则是EsRejectedExecutionException), 这说明ES集群已经跟不上你Index的速度了,使用一些适当的方式限制一下速度吧。(官方文档说暂停Index一会或者使用随机指数函数Backoff)。
类似Bulk Index 数量,多线程多进程Index也需要进行人工测试,直到找到一个合适线程数或者进程数。

增加refresh interval

默认的 index.refresh_interval 是1s,在index的时候如果没有实时性检索需求,建议可以设置大一些,比如30S,如果不需要检索,等index完成才进行检索的话,可以设置为-1,也就是禁用,等完成index之后在调整回来。

禁用refresh,降低分片副本数

如果需要一次index大量数据,最好禁用refresh,也就是将refresh_interval设置为-1,同时index.number_of_replicas 设置为0,也就是不需要副本。尽管这样会增加一些风险(真的很小很小),也就是在索引的时候可能导致数据丢失,但是这样可以大幅度增加索引速度,等完成索引后在增加副本,这样也可以保证数据的可靠性。

禁用Swapping

一定确保操作系统禁用了swapping,这对ES性能有很大的提升。

给足够的内存文件系统缓存

你应该分配机器的一半内存给ES使用,用于文件系统的缓存。文件系统缓存用于缓冲I/O操作。

使用系统自动生成id

当你index一个document使用特定的id,ES需要去检查是否在同一个shard存在相同的ID的文档,这是一个相当昂贵的操作,并且随着文档数量的增加,花费呈指数增长。如果使用自动生成id,ES会跳过这个检查,使得Index速度更快。

使用更快的硬件

如果I/O是瓶颈,那么最好考虑为文件系统提供更多内存或者购买更好的服务器。使用SSD硬盘能比一般的硬盘有更好的性能。另外尽量使用本地存储,不要考虑远程存储。也尽可能不要考虑Amazon等虚拟化存储,尽管比较简单的使用,但是性能比本地存储差很多。
还有要尽可能冗余副本,以避免节点故障导致数据丢失。也可以用快照备份还原进一步降低数据出事的风险。

Indexing 缓冲大小

如果节点仅仅是大量Index,确保每个分片 indices.memory.index_buffer_size 大于512M,(尽管大于512M没有什么性能改善)。举个例子,默认值是10%,也是说如果你设置的jvm大小是10G,那么Index缓冲大小是1G,足以支撑2个shard的大量索引。

禁用 _field_names

简单来说,如果你不需要运行exists查询,那么你就可以禁用_field_names。

我使用的是IDEA,直接引入

1
2
3
plugins {
id 'com.github.johnrengelman.shadow' version '1.2.3'
}

放在build.gradle的最上面,然后执行shadowJar即可。

网上说有一种方法

1
2
3
4
5
6
7
8
9
jar {
manifest {
attributes "Main-Class": "$mainClassName"
}

from {
configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }
}
}

这种方法确实打入进去了,但是运行的时候报错,异常如下:

1
2
3
4
5
6
7
Exception in thread "main" java.lang.VerifyError: (class: org/jboss/netty/channel/socket/nio/NioWorkerPool, method: newWorker signature: (Ljava/util/concurrent/Executor;)Lorg/jboss/netty/channel/socket/nio/AbstractNioWorker;) Wrong return type in function
at org.elasticsearch.transport.netty.NettyTransport.createClientBootstrap(NettyTransport.java:354)
at org.elasticsearch.transport.netty.NettyTransport.doStart(NettyTransport.java:290)
at org.elasticsearch.common.component.AbstractLifecycleComponent.start(AbstractLifecycleComponent.java:68)
at org.elasticsearch.transport.TransportService.doStart(TransportService.java:182)
at org.elasticsearch.common.component.AbstractLifecycleComponent.start(AbstractLifecycleComponent.java:68)
at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:162)

不知道什么原因,不过用第三方插件暂时可以解决,原因慢慢排查了。(初步判断是jar包冲突导致,用了ES和Zookeeper,好像都依赖Netty,版本还不太一样)

另外还有一种方法可以运行,不过依赖单独放入一个lib目录下,也就是jar和依赖分离的方法:

1
2
3
4
5
6
7
8
9
jar {
String someString = ''
configurations.runtime.each {someString = someString + " lib//"+it.name}
manifest {
attributes 'Main-Class': 'com.gridsum.techpub.legal.etl.App'
attributes 'Class-Path': someString
}

}

以后用得到的时候再说~

单机搭建了2.6.5的伪分布式集群,写了一个tf-idf计算程序,分词用的是结巴分词,使用standalone模式运行没有任何问题,切换到伪分布式模式运行一直报错:

1
hadoop is running beyond virtual memory limits

大概意思就是使用虚拟内存超出了限制。

网上参考了好几篇博客,几乎都是再说更改hadoop-env和mapred-site.xml

hadoop-env直接更改堆大小

1
export HADOOP_HEAPSIZE=1000

mapred-site.xml 更改opts的大小

1
2
3
4
<property> 
<name>mapred.child.java.opts</name>
<value>-Xmx4000m</value>
</property>

我的机器内存是8G,按理说这个程序运行应该是毫无压力的。。

提示说的虚拟内存,这两个估计是不挂钩,反正改了之后运行依旧报错

既然是虚拟内存不足,那就找虚拟内存的事,google一下找到如下配置

1
2
3
4
<property>
<name>yarn.nodemanager.vmem-pmem-ratio</name>
<value>15.5</value>
</property>

更改yarn-site.xml

我这之前运行给了5.5G,提示5.7G超过5.5G了,kill掉了container,索性一下给了15G,运行可算是正常了,看来出了问题,还是得从错误日志根源找起。