flink内存模型-程序员宅基地

技术标签: flink  java  

jvm中java对象模型缺陷

基于 JVM 的数据分析引擎都需要面对将大量数据存到内存中,当然也包括flink,这就不得不面对JVM内存管理存在的几个问题:
1.有效数据密度低
java对象由三部分构成,对齐填充仅起到占位符的作用,例如:一个只包含 boolean 属性的对象占用了16个字节内存:对象头占了8个,boolean 属性占了1个,对齐填充占了7个。而实际上只需要一个bit(1/8字节)就够了。
在这里插入图片描述

2.垃圾回收机制
海量数据导致空间不足,可能出现秒级甚至分钟级的Full GC,不仅影响效率,其引起的中断可能导致心跳超时被踢出集群

3.OOM问题
当执行Full GC后空间仍然不足,则抛出OOM导致JVM崩溃,影响分布式框架的健壮性和性能

4.缓存未命中问题
从 L1/L2/L3 缓存读取数据的速度比从主内存读取数据的速度快好几个量级。通过性能分析可以发现,CPU时间中的很大一部分都是浪费在等待数据从主内存过来上。如果这些数据可以从 L1/L2/L3 缓存过来,那么这些等待时间可以极大地降低。
CPU将经常访问的数据及其下一步可能访问的附近的数据搬运到缓存中,以便下次访问,但由于java对象在堆中不连续,所以搬运的附近数据往往不是下一步计算需要的,这就是缓存未命中,导致执行效率降低。
在这里插入图片描述

flink自主管理内存来解决jvm的几个问题

所以目前,越来越多的大数据项目开始自己管理JVM内存了,像 Spark、Flink、HBase,为的就是获得像 C 一样的性能以及避免 OOM 的发生。

早期flink内存模型

在flink中,使用固定长度的内存块(默认32KB)来管理java对象(而不是将大量对象存在堆上),即memorySegment,它是flink中内存分配的最小单元。一个java对象序列化为二进制数据流后可能占用1个或多个memorySegment。flink给这样的内存块提供了非常高效的读写方法(它的 DBMS 风格的排序和连接算法尽可能多地对这个二进制数据进行操作,以此将序列化和反序列化开销降到最低,这部分内容暂时不详述)。
在这里插入图片描述

早期时候flink还未使用堆外内存,memorySegement的都是放在堆上的,基于这样的内存管理模式可以改善jvm的部分问题:

  1. 针对有效数据密度低的问题:
    因为只存储实际数据的二进制内容,避免了对齐填充等占位符,节省了内存空间。

  2. 针对Full GC的问题:
    从GC的角度讲,数据以二进制的形式存在memorySegment中一直呆在老年代不会被GC回收,而其他的数据对象基本上是由用户代码生成的短生命周期对象,这部分内存可以看成新生代,可以被Minor GC快速回收,让对象尽量在minor GC时被回收不要进入老年代,老年代空间充足的话就可以降低发生Full GC的概率

  3. 针对OOM的问题:
    分配的内存段的数量是固定的,因此监控剩余的内存资源是非常简单的,在内存吃紧的情况下,算法(sort/join等)会高效地将一大批内存块写到磁盘,之后再读回来,防止OOM。

  4. 针对缓存未命中的问题:
    该二进制形式数据把将算法中需要操作的数据(如sort中的key)连续存储,容易被缓存到L1/L2/L3中,可以减少CPU等待的时间,获得性能的提升。

当前flink内存模型

由于早期flink仅使用堆上内存,所以仍存在一些问题:

  1. JVM堆上内存过大导致JVM启动很慢
  2. 仍然可能发生Full GC,且由于数据大量存储在堆上,所以一旦发生Full GC 可达分钟级
  3. 当task需要与其他task通信时,堆内数据传输至少需要一次拷贝,导致传输性能不高

所以后来flink引入堆外内存,对象也可以存储到堆外了:

  1. 将数据分一些给堆外,减小JVM内存,加速JVM的启动
  2. 对象存储到堆外,保证堆内空间的充裕,进一步降低Full GC的概率,减小GC压力
  3. 当task需要与外界进行数据传输时,提前将二进制数据对象存储在堆外可以实现zero-copy,提高传输效率

但引入堆外内存也有一些缺点:

  1. 分配生命周期短的对象,比起堆内内存,在堆外内存上分配开销更高。
  2. 堆外内存出错时排错更为复杂。
  3. 在flink的测试中,部分操作在堆外内存上会比堆上内存更慢。

所以flink既可以使用堆内内存又可以使用堆外内存,Flink用通过ByteBuffer.allocateDirect(numBytes)来申请堆外内存,用 sun.misc.Unsafe 来操作堆外内存。flink在选择堆内或堆外内存大小上给了用户较大的自由度。

基于以上分析,给出flink现在的内存布局:
在这里插入图片描述

可以看到,flink的内存模型中:

  • flink框架使用了堆上内存和堆外内存,不计入slot资源
  • taks执行的内存也使用了堆上和堆外内存
  • 网路缓冲内存是多个slot共享的,不隔离,放在堆外,可以实现zero-copy

一.flink框架内存

  1. 用途?flink框架本身占用的内存
  2. 内存大小?通常情况下,不建议对框架堆内存和框架堆外内存进行调整,这部分的默认值是框架测试时不调度任务空跑通过测量得到的一个数据。

二.task内存

  1. 用途?userCode和libraies
  2. 内存大小?当使用HeapStateBackend时尤其需要使用Task Heap堆上内存。
  3. 各slot之间时严格隔离的。
  4. task堆上内存由taskmanager.memory.task.heap.size参数配置
  5. 用户申请的堆外内存上限可由taskmanager.memory.task.off-heap.size参数配置

三.网络缓冲内存

  1. 用途?网络传输
  2. 内存大小?网络缓冲内存大小由作业拓扑决定
  3. 内存大小的影响?若内存不足会导致运行失败。
  4. 各slot之间不隔离。

四.托管内存

  1. 用途?托管内存有专门的用途,例如批处理的sort/join算子用来存储中间结果,或者流处理使用的是RocksDBStateBackend时就会使用这部分托管内存,如果作业中不涉及到这些功能,建议把托管内存设为0,避免浪费,省下来的内存可以用于task用户代码,减少GC时间
  2. 内存大小的影响?大或小都能跑,不会运行失败,顶多就是小了会导致性能低。
  3. 各slot之间时严格隔离的。
  4. 由taskmanager.memory.managed.size 参数指定

五. JVM元空间

  1. 用途?存放JVM加载的类的元数据
  2. 内存大小的影响?加载的类越多,需要的空间越大,当作业需要加载大量第三方库,建议调大Metaspace。若不足会出现metaspaceOOM。

六. JVM开销

  1. 用途?线程栈,编译缓存
  2. 大小?这部分内存的上限不受JVM限制,可作为预留空间。

内存相关参数:
-XmX = 框架堆上内存+task堆上内存,达到上限时触发GC,GC后空间仍然不足则触发OOM异常并退出。OutOfMemoryError:Java heap space.

-XX: MaxDirectMemorySize = 框架堆外内存(部分)+task堆外内存(部分)+网络缓冲内存,达到上限时触发GC,GC后空间仍不足则触发OOM异常并退出。OutOfMemoryError:Direct buffer memory.

-XX: MaxMetaspaceSize = JVM元空间内存,达到上限时触发GC,GC后空间仍不足则触发OOM异常并退出。OutOfMemoryError: Metaspace

而剩下的nativeMemory = 框架堆外内存(部分)+task堆外内存(部分)+ managedMemory + JVM Overhead ,这部分内存的上限是不受JVM严格控制的,特别是managedMemory,它的用量上限是flink控制的。

为什么已经在堆外了还是要触发GC后才释放,因为不论是DirectMemory还是Metaspace,它们在堆内都有一个相应buffer对象与其对应,只有堆内这个对应的对象释放后,堆外的才会相应的释放,所以实际上内存释放还是依赖这个GC的。(虽然堆外内存本身不受垃圾回收算法的管辖,但是因为其是由ByteBuffer所创造出来的,因此这个buffer自身作为一个实例化的对象,其自身的信息(例如堆外内存在主存中的起始地址等信息)必须存储在堆内内存中)
在这里插入图片描述

补充内容

flink将对象序列化存储会不会存在高昂的序列化和反序列化代价?

目前 Java 生态圈提供了众多的序列化框架:Java serialization, Kryo, Apache Avro 等等,但是 Flink 实现了自己的序列化框架,对自己更友好的序列化框架,量身定制。
该序列化框架有以下几点考虑因素:

  1. 因为在 Flink 中处理的数据流通常是同一类型,由于数据集对象的类型固定,对于数据集可以只保存一份对象Schema信息,节省大量的存储空间。
  2. 对于固定大小的类型,可通过固定的偏移位置存取。当我们需要访问某个对象成员变量的时候,通过定制的序列化工具,并不需要反序列化整个Java对象,而是可以直接通过偏移量,只是反序列化特定的对象成员变量。如果对象的成员变量较多时,能够大大减少Java对象的创建开销,以及内存数据的拷贝大小。

Flink支持任意的Java或是Scala类型。Flink 在数据类型上有很大的进步,不需要实现一个特定的接口(像Hadoop中的org.apache.hadoop.io.Writable),Flink 能够自动识别数据类型。Flink 通过 Java Reflection 框架分析基于 Java 的 Flink 程序 UDF (User Define Function)的返回类型的类型信息,通过 Scala Compiler 分析基于 Scala 的 Flink 程序 UDF 的返回类型的类型信息。类型信息由 TypeInformation 类表示,TypeInformation 支持以下几种类型:

  • BasicTypeInfo: 任意Java 基本类型(装箱的)或 String 类型。 BasicArrayTypeInfo:
  • 任意Java基本类型数组(装箱的)或 String 数组。
  • WritableTypeInfo: 任意 Hadoop Writable接口的实现类。 、
  • TupleTypeInfo: 任意的 Flink Tuple 类型(支持Tuple1 to Tuple25)。Flink tuples 是固定长度固定类型的Java Tuple实现。
  • CaseClassTypeInfo: 任意的 Scala CaseClass(包括 Scala tuples)。
  • PojoTypeInfo: 任意的 POJO (Java or Scala),例如,Java对象的所有成员变量,要么是 public 修饰符定义,要么有 getter/setter 方法。
  • GenericTypeInfo: 任意无法匹配之前几种类型的类。

前六种数据类型基本上可以满足绝大部分的Flink程序,针对前六种类型数据集,Flink皆可以自动生成对应的TypeSerializer,能非常高效地对数据集进行序列化和反序列化。对于最后一种数据类型,Flink会使用Kryo进行序列化和反序列化。每个TypeInformation中,都包含了serializer,类型会自动通过serializer进行序列化,然后用Java Unsafe接口写入MemorySegments。对于可以用作key的数据类型,Flink还同时自动生成TypeComparator,用来辅助直接对序列化后的二进制数据进行compare、hash等操作。对于 Tuple、CaseClass、POJO 等组合类型,其TypeSerializer和TypeComparator也是组合的,序列化和比较时会委托给对应的serializers和comparators。如下图展示 一个内嵌型的Tuple3<Integer,Double,Person> 对象的序列化过程。
在这里插入图片描述
可以看出这种序列化方式存储密度是相当紧凑的。其中 int 占4字节,double 占8字节,POJO多个一个字节的header,PojoSerializer只负责将header序列化进去,并委托每个字段对应的serializer对字段进行序列化。

Flink 的类型系统可以很轻松地扩展出自定义的TypeInformation、Serializer以及Comparator,来提升数据类型在序列化和比较时的性能。

flink如何直接操作二进制数据进行计算的?

Flink 提供了如 group、sort、join 等操作,这些操作都需要访问海量数据。这里,我们以sort为例,这是一个在 Flink 中使用非常频繁的操作。

首先,Flink 会从 MemoryManager 中申请一批 MemorySegment,我们把这批 MemorySegment 称作 sort buffer,用来存放排序的数据。
在这里插入图片描述
我们会把 sort buffer 分成两块区域。一个区域是用来存放所有对象完整的二进制数据。另一个区域用来存放指向完整二进制数据的指针以及定长的序列化后的key(key+pointer)。如果需要序列化的key是个变长类型,如String,则会取其前缀序列化。如上图所示,当一个对象要加到 sort buffer 中时,它的二进制数据会被加到第一个区域,指针(可能还有key)会被加到第二个区域。

将实际的数据和指针加定长key分开存放有两个目的。第一,交换定长块(key+pointer)更高效,不用交换真实的数据也不用移动其他key和pointer。第二,这样做是缓存友好的,因为key都是连续存储在内存中的,可以大大减少 cache miss。

排序的关键是比大小和交换。Flink 中,会先用 key 比大小,这样就可以直接用二进制的key比较而不需要反序列化出整个对象。因为key是定长的,所以如果key相同(或者没有提供二进制key),那就必须将真实的二进制数据反序列化出来,然后再做比较。之后,只需要交换key+pointer就可以达到排序的效果,真实的数据不用移动。
在这里插入图片描述
最后,访问排序后的数据,可以沿着排好序的key+pointer区域顺序访问,通过pointer找到对应的真实数据,并写到内存或外部。(操作二进制数据的性能,参考 https://blog.csdn.net/gw5205566/article/details/99673187 )

flink怎样提升cpu缓存命中的?

在上面讨论中我们谈到的,Flink 通过定制的序列化框架将算法中需要操作的数据(如sort中的key)连续存储,而完整数据存储在其他地方。因为对于完整的数据来说,key+pointer更容易装进缓存,这大大提高了缓存命中率,从而提高了基础算法的效率。这对于上层应用是完全透明的,可以充分享受缓存友好带来的性能提升。

flink的MemorySegment的两个子类?

基于 Flink 优秀的设计,实现堆外内存是很方便的。Flink 将原来的 MemorySegment 变成了抽象类,并生成了两个子类。HeapMemorySegment 和 HybridMemorySegment。从字面意思上也很容易理解,前者是用来分配堆内存的,后者是用来分配堆外内存和堆内存的。是的,你没有看错,后者既可以分配堆外内存又可以分配堆内存。为什么要这样设计呢?

首先假设HybridMemorySegment只提供分配堆外内存。在上述堆外内存的不足中的第二点谈到,Flink 有时需要分配短生命周期的 buffer,这些buffer用HeapMemorySegment会更高效。那么当使用堆外内存时,为了也满足堆内存的需求,我们需要同时加载两个子类。这就涉及到了 JIT 编译优化的问题。因为以前 MemorySegment 是一个单独的 final 类,没有子类。JIT 编译时,所有要调用的方法都是确定的,所有的方法调用都可以被去虚化(de-virtualized)和内联(inlined),这可以极大地提高性能(MemroySegment的使用相当频繁)。然而如果同时加载两个子类,那么 JIT 编译器就只能在真正运行到的时候才知道是哪个子类,这样就无法提前做优化。实际测试的性能差距在 2.7 被左右。

Flink 使用了两种方案:

方案1:只能有一种 MemorySegment 实现被加载
代码中所有的短生命周期和长生命周期的MemorySegment都实例化其中一个子类,另一个子类根本没有实例化过(使用工厂模式来控制)。那么运行一段时间后,JIT 会意识到所有调用的方法都是确定的,然后会做优化。

方案2:提供一种实现能同时处理堆内存和堆外内存

这里 Flink 优雅地实现了一份代码能同时操作堆和堆外内存。这主要归功于 sun.misc.Unsafe提供的一系列方法,如getLong方法:

sun.misc.Unsafe.getLong(Object reference, long offset)

如果reference不为空,则会取该对象的地址,加上后面的offset,从相对地址处取出8字节并得到 long。这对应了堆内存的场景。
如果reference为空,则offset就是要操作的绝对地址,从该地址处取出数据。这对应了堆外内存的场景。

总结:

对于堆外内存,使用 HybridMemorySegment 能同时用来代表堆和堆外内存。这样只需要一个类就能代表长生命周期的堆外内存和短生命周期的堆内存。既然HybridMemorySegment已经这么全能,为什么还要方案1呢?因为我们需要工厂模式来保证只有一个子类被加载(为了更高的性能),而且HeapMemorySegment比heap模式的HybridMemorySegment要快。

flink的状态后端

StateBackend的意思是状态后端。

状态后端定义了流式应用程序状态如何存储和checkpoint的。不同的状态后端以不同的方式来存储其状态,并且使用不同的数据结构来保存正在运行的应用程序的状态。

MemoryStateBackend:
1 基于内存的状态管理器,聚合类算子的状态会存储在JobManager的内存中

2 单次状态大小默认最大被限制为5MB,可以通过构造函数来指定状态初始化内存大小。无论单次状态大小最大被限制为多少,都不可大于akka的frame大小(1.5MB,JobManager和TaskManager之间传输数据的最大消息容量)。状态的总大小不能超过 JobManager 的内存。

3 是Flink默认的后端状态管理器,默认是异步的

4 主机内存中的数据可能会丢失,任务可能无法恢复

5 将工作state保存在TaskManager的内存中,并将checkpoint数据存储在JobManager的内存中

适用:

本地开发和调试
状态比较少的作业

FsStateBackend:
1 基于文件系统的状态管理器

2 如果使用,默认是异步

3 比较稳定,3个副本,比较安全。不会出现任务无法恢复等问题

4 状态大小受磁盘容量限制

5 将工作state保存在TaskManager的内存中,并将checkpoint数据存储在文件系统中

适用:

状态比较大,窗口比较长,大的KV状态

RocksDBStateBackend:
1 状态数据先写入RocksDB,然后异步的将状态数据写入文件系统。

2 正在进行计算的热数据存储在RocksDB,长时间才更新的数据写入磁盘中(文件系统)存储,体量比较小的元数据状态写入JobManager内存中(将工作state保存在RocksDB中,并且默认将checkpoint数据存在文件系统中)

3 支持的单 key 和单 value 的大小最大为每个 2^31 字节(2GB)

4 RocksDBStateBackend是目前唯一支持incremental的checkpoints的策略

5 如果使用,默认是异步

适用:

非常大的状态,长窗口,大的KV状态
增量checkpoint
性能:MemoryStateBackend>RocksDBStateBackend>FsStateBackend

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_43691132/article/details/123812201

智能推荐

java ee项目案后台模板_12个非常不错的免费HTML后台管理模板-程序员宅基地

文章浏览阅读481次。下面介绍的这些免费后端管理HTML模板,都非常不错。建议您收藏。Charisma是一个响应式管理模板,基于Twitter Bootstrap构建。拥有9种外观主题,包括几乎所有的东西,如表单、图表、按纽、表格、文件管理器、相册等。可到GitHub link下载该模板。INADMIN是一个基于jQuery开发的管理模板,拥有头+顶部导航,提供了tables、forms、messages等样式,还..._java后端查询内容的数据赋值定制的html 模板

vue-amap官网文档链接,vue-amap画圆-程序员宅基地

文章浏览阅读1.7k次。1.4.3 圆 - vue-amap 中文文档 - 文江博客基础示例 组件 | vue-amap

几何光学学习笔记(1)- 1.1 几何光学的基本概念和定律-程序员宅基地

文章浏览阅读5.8k次,点赞11次,收藏33次。几何光学学习笔记(1)1.1 几何光学的基本概念和定律1.绪论2. 几何光学的基本概念3.几何光学的基本规律1.绪论几何光学:直线传播;互不影响;折射反射。物理光学:波动光学,量子光学几何光学和物理光学在一定条件下可以统一起来。例如:几何光学认为光源通过光学系统会成像为一个几何点;而物理光学则认为会成像为一个黑白相间的衍射斑。其中,第一个亮斑的半径为y=1.22λDy=\frac {1.22\lambda} Dy=D1.22λ​当λ→0\lambda\to0λ→0时,y也为0,此时几何光

ios python 越狱_如何使用Frida绕过iOS应用程序中的越狱检测!!!-程序员宅基地

文章浏览阅读685次。注意 注意 请注意:一、Frida介绍Frida是一个可以hook App的动态代码工具包,可以向Windows、macOS、Linux、iOS、Android和QNX的本机应用程序中注入JavaScript或自己的库代码。最开始的时候,它是基于谷歌的V8 Javascript运行,但是从版本9开始,Frida已经开始使用其内部的Duktape运行。列举一些Frida的使用场景:1、hookin..._python hook ios app

shiro权限缓存_shiro-activesessioncache-程序员宅基地

文章浏览阅读935次。在每次操作的时候都会查询一遍权限,说实话,确实有点压力,所以可以考虑采用一下缓存来缓解一下压力Ehcache本地缓存的实现1、<!-- shiro缓存管理器 --> <dependency> <groupId>org.apache.shiro</groupId> <artifactId>shiro-ehcache</artifactId> <vers_shiro-activesessioncache

好久没有写CSDN上的博客了-程序员宅基地

文章浏览阅读325次。今天搜索关于谷歌所使用的技术的时候看到CSDN上的文章,随便看了看自己的,发现自己有快两年没有写过了,真是汗颜啊,所以决定以后关于技术的全放到程序员宅基地上,同时保留163博客上的内容 还希望各位朋友多多捧场

随便推点

notepad++快捷键_notepad加粗快捷键-程序员宅基地

文章浏览阅读1.6k次,点赞2次,收藏2次。notepad++是一款在windows上表现很不错的编辑器,学习成本低,支持的语言多,可拓展性高。 我原本打算用notepad++做为主编辑器,但是发现他对markdown的支持并不好,况且优秀的编辑器有很多。我打算去探索新的编辑器。 目前,notepad++依然有用武之地,因为通过notepadstarterplugin插件能完全取代记事本,即一些默认打开记事本的程序,将会打开notepad++,非常_notepad加粗快捷键

HarmonyOS —— MD5 摘要计算_harmony md5-程序员宅基地

文章浏览阅读760次,点赞8次,收藏7次。计算 MD5 摘要需要导入HarmonyOs提供的统一的密码算法库加解密相关接口。_harmony md5

Windows 解决cmd/dos窗口中文乱码问题_cmd中文乱码解决方法-程序员宅基地

文章浏览阅读6.5w次,点赞10次,收藏55次。Windows 解决cmd/dos窗口中文乱码问题,本文提供了多种解决方案,总有一款适合你!_cmd中文乱码解决方法

FireFox与IE中CSS兼容技术集绵整理-程序员宅基地

文章浏览阅读87次。1.css在不同浏览器下显示效果不同firefox和IE对某些css样式的认定有不少区别,包括:· ul和ol的默认padding值是不一样的,在Firefox中,padding-left默认值为40px左右,而IE中为0,一般设置ul{margin:0;padding:0;}就能解决大部分问题· ..._火狐 p height 多了 0.45

Scala finally块_scala finally-程序员宅基地

文章浏览阅读657次。finally块用于在异常时释放资源。资源可能是文件,网络连接,数据库连接等,finally块执行代码运行保证。以下程序说明了finally块的用法。Scala finally块示例class ExceptionExample{ def divide(a:Int, b:Int) = { try{ a/b ..._scala finally

QNX学习笔记-Neutrino-QNX-boot启动流程分析_qnx启动时序图-程序员宅基地

文章浏览阅读2.8k次。嵌入式系统的启动都是类似的,先启动一个boot程序,然后又boot控制系统的进一步加载运行_qnx启动时序图

推荐文章

热门文章

相关标签