Flink 实时写入数据到 ElasticSearch 性能调优-程序员宅基地

背景说明

线上业务反应使用 Flink 消费上游 kafka topic 里的轨迹数据出现 backpressure,数据积压严重。单次 bulk 的写入量为:3000/50mb/30s,并行度为 48。针对该问题,为了避免影响线上业务申请了一个与线上集群配置相同的 ES 集群。本着复现问题进行优化就能解决的思路进行调优测试。


测试环境

  • Elasticsearch 2.3.3

  • Flink 1.6.3

  • flink-connector-elasticsearch 2_2.11

  • 八台 SSD,56 核 :3 主 5 从


Rally 分布式压测 ES 集群

640?wx_fmt=jpeg 

  • 从压测结果来看,集群层面的平均写入性能大概在每秒 10 w+ 的 doc。


Flink 写入测试

  • 配置文件

1config.put("cluster.name", ConfigUtil.getString(ES_CLUSTER_NAME, "flinktest"));	
2config.put("bulk.flush.max.actions", ConfigUtil.getString(ES_BULK_FLUSH_MAX_ACTIONS, "3000"));	
3config.put("bulk.flush.max.size.mb", ConfigUtil.getString(ES_BULK_FLUSH_MAX_SIZE_MB, "50"));	
4config.put("bulk.flush.interval.ms", ConfigUtil.getString(ES_BULK_FLUSH_INTERVAL, "3000"));
  • 执行代码片段

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();	
 initEnv(env);	
 Properties properties = ConfigUtil.getProperties(CONFIG_FILE_PATH);	
 //从kafka中获取轨迹数据	
 FlinkKafkaConsumer010<String> flinkKafkaConsumer010 =	
     new FlinkKafkaConsumer010<>(properties.getProperty("topic.name"), new SimpleStringSchema(), properties);	
 //从checkpoint最新处消费	
 flinkKafkaConsumer010.setStartFromLatest();	
 DataStreamSource<String> streamSource = env.addSource(flinkKafkaConsumer010);	
10//Sink2ES	
streamSource.map(s -> JSONObject.parseObject(s, Trajectory.class))	
    .addSink(EsSinkFactory.createSinkFunction(new TrajectoryDetailEsSinkFunction())).name("esSink");	
env.execute("flinktest");
  • 运行时配置

任务容器数为 24 个 container,一共 48 个并发。savepoint 为 15 分钟:

640?wx_fmt=jpeg

  • 运行现象

(1)source 和 Map 算子均出现较高的反压

640?wx_fmt=jpeg

(2)ES 集群层面,目标索引写入速度写入陡降

平均 QPS 为:12 k 左右。

(3)对比取消 sink 算子后的 QPS


640?wx_fmt=jpeg

平均QPS为:116 k 左右。

有无sink参照实验的结论:

取消 sink 2 ES 的操作后,QPS 达到 110 k,是之前 QPS 的十倍。由此可以基本判定: ES 集群写性能导致的上游反压


优化方向

  • 索引字段类型调整

640?wx_fmt=jpeg

bulk 失败的原因是由于集群 dynamic mapping 自动监测,部分字段格式被识别为日期格式而遇到空字符串无法解析报错。

解决方案:关闭索引自动检测。

640?wx_fmt=jpeg

效果: ES 集群写入性能明显提高但 Flink operator 依然存在反压:

640?wx_fmt=jpeg

  • 降低副本数


  • 提高 refresh_interval

针对这种 ToB、日志型、实时性要求不高的场景,我们不需要查询的实时性,通过加大甚至关闭 refresh_interval 的参数提高写入性能。


  • 检查集群各个节点 CPU 核数

在 Flink 执行时,通过 Grafana 观测各个节点 CPU 使用率以及通过 Linux 命令查看各个节点 CPU 核数。发现 CPU 使用率高的节点 CPU 核数比其余节点少。为了排除这个短板效应,我们将在这个节点中的索引 shard 移动到 CPU 核数多的节点。

curl -XPOST {集群地址}/_cluster/reroute  -d'{"commands":[{"move":{"index":"{索引名称}","shard":5,"from_node":"源node名称","to_node":"目标node名称"}}]}' -H "Content-Type:application/json"

以上优化的效果:

640?wx_fmt=jpeg

经过以上的优化,我们发现写入性能提升有限。因此,需要深入查看写入的瓶颈点。

  • 在 CPU 使用率高的节点使用 Arthas 观察线程

640?wx_fmt=jpeg


  • 打印阻塞的线程堆栈

 "elasticsearch[ES-077-079][bulk][T#3]" Id=247 WAITING on java.util.concurrent.LinkedTransferQueue@369223fa	
   at sun.misc.Unsafe.park(Native Method)	
     -  waiting on java.util.concurrent.LinkedTransferQueue@369223fa	
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)	
    at java.util.concurrent.LinkedTransferQueue.awaitMatch(LinkedTransferQueue.java:737)	
    at java.util.concurrent.LinkedTransferQueue.xfer(LinkedTransferQueue.java:647)	
    at java.util.concurrent.LinkedTransferQueue.take(LinkedTransferQueue.java:1269)	
    at org.elasticsearch.common.util.concurrent.SizeBlockingQueue.take(SizeBlockingQueue.java:161)	
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)	
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)	
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)	
    at java.lang.Thread.run(Thread.java:745)

从上面的线程堆栈我们可以看出线程处于等待状态。

关于这个问题的讨论详情查看 https://discuss.elastic.co/t/thread-selection-and-locking/26051/3,这个 issue 讨论大致意思是:节点数不够,需要增加节点。于是我们又增加节点并通过设置索引级别的 total_shards_per_node 参数将索引 shard 的写入平均到各个节点上。

  • 线程队列优化

ES 是将不同种类的操作(index、search…)交由不同的线程池执行,主要的线程池有三:index、search 和 bulk thread_pool。线程池队列长度配置按照官网默认值,我觉得增加队列长度而集群本身没有很高的处理能力线程还是会 await(事实上实验结果也是如此在此不必赘述),因为实验节点机器是 56 核,对照官网:

640?wx_fmt=jpeg

因此修改 size 数值为 56。

640?wx_fmt=jpeg

经过以上的优化,我们发现在 kafka 中的 topic 积压有明显变少的趋势:

640?wx_fmt=jpeg

  • index buffer size 的优化

参照官网:

640?wx_fmt=jpeg


  • translog 优化

索引写入 ES 的基本流程是:

  • 数据写入 buffer 缓冲和 translog; 

  • 每秒 buffer 的数据生成 segment 并进入内存,此时 segment 被打开并供 search 使用查询; 

  • buffer 清空并重复上述步骤 ;

  • buffer 不断添加、清空 translog 不断累加,当达到某些条件触发 commit 操作,刷到磁盘;

ES 默认的刷盘操作为 Request 但容易部分操作比较耗时,在日志型集群、允许数据在刷盘过程中少量丢失可以改成异步 async。

另外一次 commit 操作是在 translog 达到某个阈值执行的,可以把大小(flush_threshold_size )调大,刷新间隔调大。

index.translog.durability : async	
index.translog.flush_threshold_size : 1gb	
index.translog.sync_interval : 30s

效果:

  • Flink 反压从打满 100% 降到 40%(output buffer usage):

640?wx_fmt=jpeg

  • kafka 消费组里的积压明显减少:

640?wx_fmt=jpeg


总结

当 ES 写入性能遇到瓶颈时,我总结的思路应该是这样:

  • 看日志,是否有字段类型不匹配,是否有脏数据。

  • 看 CPU 使用情况,集群是否异构

  • 客户端是怎样的配置?使用的 bulk 还是单条插入

  • 查看线程堆栈,查看耗时最久的方法调用

  • 确定集群类型:ToB 还是 ToC,是否允许有少量数据丢失?

  • 针对 ToB 等实时性不高的集群减少副本增加刷新时间

  • index buffer 优化 translog 优化,滚动重启集群

作者: 张刘毅
原文链接:
https://blog.csdn.net/dtzly/article/details/101006064


▼ Flink 社区推荐 ▼ 

Apache Flink 及大数据领域盛会 Flink Forward Asia 2019 将于 11月28-30日在北京举办,阿里、腾讯、美团、字节跳动、百度、英特尔、DellEMC、Lyft、Netflix 及 Flink 创始团队等近 30 家知名企业资深技术专家齐聚国际会议中心,与全球开发者共同探讨大数据时代核心技术与开源生态。点击「阅读原文」了解更多精彩议程。

▼ 

▼ 
▼ 

点击图片可查看 Flink Forward Asia 2019 详情
你也「 在看 」吗?
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_44904816/article/details/102597950

智能推荐

前端开发之vue-grid-layout的使用和实例-程序员宅基地

文章浏览阅读1.1w次,点赞7次,收藏34次。vue-grid-layout的使用、实例、遇到的问题和解决方案_vue-grid-layout

Power Apps-上传附件控件_powerapps点击按钮上传附件-程序员宅基地

文章浏览阅读218次。然后连接一个数据源,就会在下面自动产生一个添加附件的组件。把这个控件复制粘贴到页面里,就可以单独使用来上传了。插入一个“编辑”窗体。_powerapps点击按钮上传附件

C++ 面向对象(Object-Oriented)的特征 & 构造函数& 析构函数_"object(cnofd[\"ofdrender\"])十条"-程序员宅基地

文章浏览阅读264次。(1) Abstraction (抽象)(2) Polymorphism (多态)(3) Inheritance (继承)(4) Encapsulation (封装)_"object(cnofd[\"ofdrender\"])十条"

修改node_modules源码,并保存,使用patch-package打补丁,git提交代码后,所有人可以用到修改后的_修改 node_modules-程序员宅基地

文章浏览阅读133次。删除node_modules,重新npm install看是否成功。在 package.json 文件中的 scripts 中加入。修改你的第三方库的bug等。然后目录会多出一个目录文件。_修改 node_modules

【】kali--password:su的 Authentication failure问题,&sudo passwd root输入密码时Sorry, try again._password: su: authentication failure-程序员宅基地

文章浏览阅读883次。【代码】【】kali--password:su的 Authentication failure问题,&sudo passwd root输入密码时Sorry, try again._password: su: authentication failure

整理5个优秀的微信小程序开源项目_微信小程序开源模板-程序员宅基地

文章浏览阅读1w次,点赞13次,收藏97次。整理5个优秀的微信小程序开源项目。收集了微信小程序开发过程中会使用到的资料、问题以及第三方组件库。_微信小程序开源模板

随便推点

Centos7最简搭建NFS服务器_centos7 搭建nfs server-程序员宅基地

文章浏览阅读128次。Centos7最简搭建NFS服务器_centos7 搭建nfs server

Springboot整合Mybatis-Plus使用总结(mybatis 坑补充)_mybaitis-plus ruledataobjectattributemapper' and '-程序员宅基地

文章浏览阅读1.2k次,点赞2次,收藏3次。前言mybatis在持久层框架中还是比较火的,一般项目都是基于ssm。虽然mybatis可以直接在xml中通过SQL语句操作数据库,很是灵活。但正其操作都要通过SQL语句进行,就必须写大量的xml文件,很是麻烦。mybatis-plus就很好的解决了这个问题。..._mybaitis-plus ruledataobjectattributemapper' and 'com.picc.rule.management.d

EECE 1080C / Programming for ECESummer 2022 Laboratory 4: Global Functions Practice_eece1080c-程序员宅基地

文章浏览阅读325次。EECE 1080C / Programming for ECESummer 2022Laboratory 4: Global Functions PracticePlagiarism will not be tolerated:Topics covered:function creation and call statements (emphasis on global functions)Objective:To practice program development b_eece1080c

洛谷p4777 【模板】扩展中国剩余定理-程序员宅基地

文章浏览阅读53次。被同机房早就1年前就学过的东西我现在才学,wtcl。设要求的数为\(x\)。设当前处理到第\(k\)个同余式,设\(M = LCM ^ {k - 1} _ {i - 1}\) ,前\(k - 1\)个的通解就是\(x + i * M\)。那么其实第\(k\)个来说,其实就是求一个\(y\)使得\(x + y * M ≡ a_k(mod b_k)\)转化一下就是\(y * M ...

android 退出应用没有走ondestory方法,[Android基础论]为何Activity退出之后,系统没有调用onDestroy方法?...-程序员宅基地

文章浏览阅读1.3k次。首先,问题是如何出现的?晚上复查代码,发现一个activity没有调用自己的ondestroy方法我表示非常的费解,于是我检查了下代码。发现再finish代码之后接了如下代码finish();System.exit(0);//这就是罪魁祸首为什么这样写会出现问题System.exit(0);////看一下函数的原型public static void exit (int code)//Added ..._android 手动杀死app,activity不执行ondestroy

SylixOS快问快答_select函数 导致堆栈溢出 sylixos-程序员宅基地

文章浏览阅读894次。Q: SylixOS 版权是什么形式, 是否分为<开发版税>和<运行时版税>.A: SylixOS 是开源并免费的操作系统, 支持 BSD/GPL 协议(GPL 版本暂未确定). 没有任何的运行时版税. 您可以用她来做任何 您喜欢做的项目. 也可以修改 SylixOS 的源代码, 不需要支付任何费用. 当然笔者希望您可以将使用 SylixOS 开发的项目 (不需要开源)或对 SylixOS 源码的修改及时告知笔者.需要指出: SylixOS 本身仅是笔者用来提升自己水平而开发的_select函数 导致堆栈溢出 sylixos

推荐文章

热门文章

相关标签