Flink最全面教程(自己总结的)_flink教程-程序员宅基地

技术标签: flink  学习  kafka  大数据  

DataSet/Stream API

1.1 Environment

1.1.1 getExecutionEnvironment

创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment 会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。

// 初始化环境
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment

如果没有设置并行度,会以 flink-conf.yaml 中的配置为准,默认是 1

1.2 Source

1.2.1 基于本地集合的source

在一个本地内存中,生成一个集合作为Flink处理的source。
离线处理代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object ListSource {
    
  def main(args: Array[String]): Unit = {
    
    val env = ExecutionEnvironment.getExecutionEnvironment
    val listDataSet: DataSet[String] = env.fromCollection(List("hadoop spark","hive hbase"))
    listDataSet.print()
  }
}

实时处理代码如下:

import org.apache.flink.api.scala.{
    ExecutionEnvironment, _}
import org.apache.flink.streaming.api.scala.{
    DataStream, StreamExecutionEnvironment}

object ListSourceStream {
    
  def main(args: Array[String]): Unit = {
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val listDataStream: DataStream[String] = env.fromCollection(List("hadoop spark","hive hbase"))
    listDataStream.print()
    env.execute("ListSourceStream is runned")
  }
}

1.2.2 基于本地文件的source

导入本地文本数据作为数据源。
离线处理代码如下:

import org.apache.flink.api.scala.{
    DataSet, ExecutionEnvironment}

object FileSource {
    
  def main(args: Array[String]): Unit = {
    
    val env = ExecutionEnvironment.getExecutionEnvironment
    val fileDataSet = env.readTextFile("C:\\Users\\thinkpad\\Desktop\\words.txt")
fileDataSet.print()
  }
}

实时处理代码如下:

import org.apache.flink.streaming.api.scala.{
    DataStream, StreamExecutionEnvironment}

object FileSourceStream {
    
  def main(args: Array[String]): Unit = {
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val fileDataStream: DataStream[String] = env.readTextFile("C:\\Users\\thinkpad\\Desktop\\words.txt")
    fileDataStream.print()
    env.execute("FileSourceStream is runned")
  }
}

1.2.3 基于HDFS的source

读取hdfs文件,作为数据源。
离线处理代码如下:

import org.apache.flink.api.scala.{
    DataSet, ExecutionEnvironment}

object hdfsSource {
    
  def main(args: Array[String]): Unit = {
    
    val env = ExecutionEnvironment.getExecutionEnvironment
    val hdfsDataSet: DataSet[String] = env.readTextFile("hdfs://linux01:9000/a.txt")
    hdfsDataSet.print()
  }
}

实时处理代码如下:

import org.apache.flink.streaming.api.scala.{
    DataStream, StreamExecutionEnvironment}
object hdfsSourceStream {
    
  def main(args: Array[String]): Unit = {
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val hdfsDataStream: DataStream[String] = env.readTextFile("hdfs://linux01:9000/a.txt")
    hdfsDataStream.print()
    env.execute("hdfsSourceStream is runned")
  }
}

1.2.4 基于 kafka 消息队列的source

处理代码如下:

import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{
    DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.api.scala._

object kafkaSourceStream {
    
  def main(args: Array[String]): Unit = {
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val props = new Properties()
    props.setProperty("bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092")
    props.setProperty("group.id", "consumer-group")
    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.setProperty("auto.offset.reset", "latest")
//SimpleStringSchema反序列化工具
    val kafkaDataStream: DataStream[String] = 
env.addSource(new FlinkKafkaConsumer010[String]("test",new SimpleStringSchema(),props))
    kafkaDataStream.print()
    env.execute(“kafkaSourceStream is runned”)
  }
}

1.2.5 自定义 Source作为数据源

除了以上的source数据来源,我们还可以自定义source,只是继承SourceFunction即可。
自定义source代码如下:

import org.apache.flink.streaming.api.functions.source.SourceFunction

class MySource extends SourceFunction[String] {
    
  //定义标志位用来标记是否正常运行
  var running = true

  override def cancel(): Unit = {
    
    running = false
  }

  override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
    
    val data: Range.Inclusive = 1.to(10)
    while (running) {
    
      data.foreach(t => {
    
        sourceContext.collect(t.toString)
      })
    }
  }
}

调用自定义source代码如下:

import org.apache.flink.streaming.api.scala.{
    DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._

object DefineSource {
    
  def main(args: Array[String]): Unit = {
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val defineSource: DataStream[String] = env.addSource(new MySource())
    defineSource.print()
    env.execute("DefineSource is runned")
  }
}

1.3 Sink

sink 也就是Flink运行完后,最终要将数据输出到哪儿。

1.3.1基于本地内存集合的sink

将数据最终输出到内存中的集合中。
示例代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object listSink {
    
  def main(args: Array[String]): Unit = {
    
    val env = ExecutionEnvironment.getExecutionEnvironment
    val listDataSet: DataSet[String] = env.fromCollection(List("hadoop","spark","hive"))
    val list: Seq[String] = listDataSet.collect()
    list.foreach(println(_))
  }
}

1.3.2基于本地文件的sink
将结果输出到本地文件系统中。
示例代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object fileSink {
    
  def main(args: Array[String]): Unit = {
    
    val env = ExecutionEnvironment.getExecutionEnvironment
    val fileDataSet: DataSet[String] = env.fromCollection(List("hadoop","spark"))
    fileDataSet.writeAsText("C:\\Users\\thinkpad\\Desktop\\print.txt")
    env.execute("fileSink is runned")
  }
}

1.3.3基于HDFS文件系统的sink

将结果输出到hdfs文件系统中。
示例代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object hdfsSink {
    
  def main(args: Array[String]): Unit = {
    
    val env = ExecutionEnvironment.getExecutionEnvironment
    val hdfsDataSet: DataSet[String] = env.fromCollection(List("hadoop","spark"))
    hdfsDataSet.writeAsText("hdfs://linux01:9000/hdfsSink")
    env.execute("hdfsSink is runned")
  }
}

1.3.4基于Kafka消息队列的sink

将结果输出到kafka文件系统中,用flink作为kafka的生产者。
示例代码如下:

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{
    DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010

object kafkaSink {
    
  def main(args: Array[String]): Unit = {
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val props = new Properties()
    props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")
    props.setProperty("group.id", "consumer-group")
    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.setProperty("auto.offset.reset", "latest")
    val listDataStream: DataStream[String] = env.fromCollection(List("hadoop","spark"))
    listDataStream.addSink(new FlinkKafkaProducer010[String]("linux01:9092,linux02:9092,linux03:9092","test",new SimpleStringSchema()))
    env.execute("kafkaSink is runned")
  }
}

1.3.5基于JDBC自定义sink

将计算结果存储到关系数据库中,如mysql等。
导入依赖:

 <dependency>
     <groupId>mysql</groupId>
     <artifactId>mysql-connector-java</artifactId>
     <version>5.1.47</version>
</dependency>

实现MyJdbcSink类,继承RichSinkFunction,用来是实现保存到mysql中调用的命令。

import java.sql
import java.sql.DriverManager
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{
    RichSinkFunction, SinkFunction}
//为什么继承的是富函数
class MyJdbcSink extends RichSinkFunction[String] {
    
  //定义连接参数成员属性
  var conn: Connection = _
  var prepare: PreparedStatement = _

  //打开连接
  override def open(parameters: Configuration): Unit = {
    
    conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata",
      "root", "root")
    prepare= conn.prepareStatement("INSERT INTO infoTest VALUES (?, ?)")
  }

  //执行sql语句
  override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {
    
    prepare.setString(1,value)
    prepare.setString(2,value)
    prepare.execute()
  }

  //关闭资源
  override def close(): Unit = {
    
    prepare.close()
    conn.close()
  }
}

将结果写入mysql,调用自定义mysql类,代码如下:

import org.apache.flink.streaming.api.scala.{
    DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._

object mysqlSInk {
    
  def main(args: Array[String]): Unit = {
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val listDataSream: DataStream[String] = env.fromCollection(List("hadoop","spark"))
    listDataSream.addSink(new MyJdbcSink())
    env.execute("mysqlSInk is runned")
  }
}

1.3.5基于Redis非关系型数据库的sink

将计算结果存储到redis非关系数据库中。
导入flink-redis依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.1.0</version>
</dependency>

定义一个redis的mapper类,继承RedisMapper类,用于定义保存到 redis时调用的命令,代码如下:

import org.apache.flink.streaming.connectors.redis.common.mapper.{
    RedisCommand, RedisCommandDescription, RedisMapper}

class MyRedisMapper extends RedisMapper[String]{
    
//定义保存到redis中的命令
  override def getCommandDescription: RedisCommandDescription = {
    
    new RedisCommandDescription(RedisCommand.HSET,"redis")
  }

  override def getKeyFromData(t: String): String = {
    
    t.hashCode.toString
  }

  override def getValueFromData(t: String): String = {
    
    t
  }
}

将结果输入到redis代码如下:

import org.apache.flink.streaming.api.scala.{
    DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig

object RedisSink {
    
  def main(args: Array[String]): Unit = {
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val listDataStream: DataStream[String] = env.fromCollection(List("hadoop","spark"))
    val conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).setDatabase(0).build()
    listDataStream.addSink(new RedisSink[String](conf,new MyRedisMapper))
    env.execute("RedisSink is runned")
  }
}

1.4 Transform

在flink中有类似于spark的一类转换算子,就是transform,在Flink的编程体系中,我们获取到数据源之后,需要经过一系列的处理即transformation操作,再将最终结果输出到目的Sink使数据落地。

常用的transform转换算子如下:

Transformation 说明
map 将DataSet中的每一个元素转换为另外一个元素
flatMap 将DataSet中的每一个元素转换为0…n个元素
mapPartition 将一个分区中的元素转换为另一个元素
filter 过滤出来一些符合条件的元素
reduce 可以对一个dataset或者一个group来进行聚合计算,最终聚合成一个元素
reduceGroup 将一个dataset或者一个group聚合成一个或多个元素
aggregate 按照内置的方式来进行聚合。例如:SUM/MIN/MAX…
distinct 去重
join 将两个DataSet按照一定条件连接到一起,形成新的DataSet
union 将两个DataSet取并集,并自动进行去重
KeyBy 逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素,在内部以 hash 的形式实现的
Split 根据某些特征把一个 DataStream 拆分成两个或者多个
Select 从一个 SplitStream 中获取一个或者多个 DataStream
Connect 连接两个保持他们类型的数据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。
CoMap,CoFlatMap 跟map and flatMap类似,只不过作用在ConnectedStreams上
rebalance 让每个分区的数据均匀分布,避免数据倾斜
partitionByHash 按照指定的key进行hash分区
sortPartition 指定字段对分区中的数据进行排序

1.4.1 map

将DataSet中的每一个元素转换为另外一种形式的元素
示例代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object Transform {
    
  def main(args: Array[String]): Unit = {
    
    val env = ExecutionEnvironment.getExecutionEnvironment
    val listDataSet: DataSet[Int] = env.fromCollection(List(1,2,3))
    val result: DataSet[Int] = listDataSet.map(_*2)
    result.print()
  }
}

1.4.2 flatMap

flatMap也是一种类似于遍历循环,是将每一个元素按照特定的标识切分,变成多个元素。
如将集合中每个元素按照空格切分。

示例代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object Transform {
    
  def main(args: Array[String]): Unit = {
    
    val env = ExecutionEnvironment.getExecutionEnvironment
    val listDataSet: DataSet[String] = env.fromCollection(List("hadoop spark","hive mysql","hbase kafka"))
    val result: DataSet[String] = listDataSet.flatMap(_.split(" "))
    result.print()
  }
}

1.4.3 mapPartition

mapPartition:中的函数是在每个分区运行一次

map :每个元素运行一次

mapPartition是按照分区进行处理数据,传入是一个迭代,是将分区中的元素进行转换,map 和 mapPartition 的效果是一样的,但如果在map的函数中,需要访问一些外部存储。例如:
访问mysql数据库,需要打开连接,此时map效率较低。而使用 mapPartition 可以有效减少连接数,提高效率。

示例代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object Transform {
    
  def main(args: Array[String]): Unit = {
    
    val env = ExecutionEnvironment.getExecutionEnvironment
    val listDataSet: DataSet[String] = env.fromCollection(List("hadoop spark","hive mysql","hbase kafka"))
    val result: DataSet[String] = listDataSet.mapPartition(iter => {
    
      iter.flatMap(_.split(" "))
    })
    result.print()
  }
}

1.4.4 Filter

filter是遍历循环dataset中每一个元素,filter中满足表达式的过滤出来,不满足表达式的过滤掉。

示例代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object Transform {
    
  def main(args: Array[String]): Unit = {
    
    val env = ExecutionEnvironment.getExecutionEnvironment
    val listDataSet: DataSet[String] = env.fromCollection(List("hadoop spark","hive","hbase kafka"))
    val result: DataSet[String] = listDataSet.filter(_.length>=5)
    result.print()
  }
}

1.4.5 reduce

reduce是对一个 dataset 或者一个 group 来进行聚合计算,按照表达逻辑最终聚合成一个元素。

示例代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object Transform {
    
  def main(args: Array[String]): Unit = {
    
    val env = ExecutionEnvironment.getExecutionEnvironment
    val listDataSet: DataSet[Int] = env.fromCollection(List(1,2,3,4))
    val result = listDataSet.reduce(_+_)
    result.print()
  }
}

Window操作

2.1 Window概述

streaming 流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而 window 是一种切割无限数据为有限块进行处理的手段。
Window 是无限数据流处理的核心,Window 将一个无限的 stream 拆分成有

2.2 Window类型

Window 可以分成两类:CountWindow:按照指定的数据条数生成一个 Window,与时间无关;TimeWindow:按照时间生成 Window。

2.2.1 CountWindow

CountWindow 根据窗口中相同 key 元素的数量来触发执行,执行时只计算元素数量达到窗口大小的 key 对应的结果。

注意:CountWindow 的 window_size 指的是相同 Key 的元素的个数,不是输入的所有元素的总数。

(1) 滚动窗口

默认的 CountWindow 是一个滚动窗口,只需要指定窗口大小即可,当相同key元素数量达到窗口大小时,就会触发窗口的执行。

object Windows {
    
    def main(args: Array[String]): Unit = {
    
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        val file: DataStream[String] = env.socketTextStream("node01",9999)
        val countStream: DataStream[(String, Int)] = file.flatMap(_.split(" "))
            .map((_, 1))
            .keyBy(0)
            .countWindow(2)
            .sum(1)
        countStream.print()
        env.execute("Windows is runned")
    }
}
(2) 滑动窗口

滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是 window_size,一个是 sliding_size。

下面代码中的 sliding_size 设置为了 2,也就是说,每收到两个相同 key 的数据就计算一次,每一次计算的 window 范围是 5 个元素。

object Windows {
    
    def main(args: Array[String]): Unit = {
    
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        val file: DataStream[String] = env.socketTextStream("node01",9999)
        val countStream: DataStream[(String, Int)] = file.flatMap(_.split(" "))
            .map((_, 1))
            .keyBy(0)
            .countWindow(5,2)
            .sum(1)
        countStream.print()
        env.execute("Windows is runned")
    }
}

2.2.2 TimeWindow

对于 TimeWindow,可以根据窗口实现原理的不同分成三类:

  • 滚动窗口(Tumbling Window)

将数据依据固定的窗口长度对数据进行切片。
特点:时间对齐,窗口长度固定,没有重叠。所有的数据只能落在一个窗口里面
滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。例如:如果你指定了一个 5 分钟大小的滚动窗口

适用场景: 适合做 BI 统计等(做每个时间段的聚合计算)。

  • 滑动窗口(Sliding Window)

滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。一次数据统计的时间长度 每次统计移动多长的时间
特点:时间对齐,窗口长度固定,可以有重叠。一个数据可以被统计多次,滑动间隔、窗口长度是某个数值的整数倍
滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。
例如,你有 10 分钟的窗口和 5 分钟的滑动,那么每个窗口中 5 分钟的窗口里包含着上个 10 分钟产生的数据

  • 会话窗口(Session Window)

电商网站: 登录一个系统之后,多长时间没有操作,session就失效。

手机银行: 登录一个系统之后,多长时间没有操作,session就失效要求重新登录。

由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。

特点: 时间无对齐。多长时间之内没有收到数据,这个不是人为能规定的。

session 窗口分配器通过 session 活动来对元素进行分组,session 窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那这个窗口就会关闭。一个 session 窗口通过一个 session 间隔来配置,这个 session 间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的 session 将关闭并且后续的元素将被分配到新的 session 窗口中去

2.2 Window Function

window function 定义了要对窗口中收集的数据做的计算操作,主要可以分为两类:

2.2.1 增量聚合函数(incremental aggregation functions)

每条数据到来就进行计算,保持一个简单的状态。典型的增量聚合函数有ReduceFunction, AggregateFunction。

2.2.2 全窗口函数(full window functions)

先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。ProcessWindowFunction 就是一个全窗口函数。

2.3 Window 其他操作

2.3.1 trigger()

触发器 定义 window 什么时候关闭,触发计算并输出结果

2.3.2 evitor()

移除器 定义移除某些数据的逻辑

2.3.3 allowedLateness()

允许处理迟到的数据

2.3.4 sideOutputLateData()

将迟到的数据放入侧输出流

2.3.5 getSideOutput()

获取侧输出流

Table&SQL

3.1 概述

Table API是流处理和批处理通用的关系型 API,Table API 可以基于流输入或者批输入来运行而不需要进行任何修改。Table API 是 SQL 语言的超集并专门为 Apache Flink 设计的,Table API 是 Scala 和 Java 语言集成式的 API。与常规 SQL 语言中将查询指定为字符串不同,Table API 查询是以 Java 或 Scala 中的语言嵌入样式来定义的,具有 IDE 支持如:自动完成和语法检测;允许以非常直观的方式组合关系运算符的查询,例如 select,filter 和 join。Flink SQL 的支持是基于实现了SQL标准的 Apache Calcite。无论输入是批输入(DataSet)还是流输入(DataStream),任一接口中指定的查询都具有相同的语义并指定相同的结果。

3.2 Table API

3.2.1 依赖

<!-- flink-table&sql -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table</artifactId>
    <version>1.9.1</version>
    <type>pom</type>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.11</artifactId>
    <version>1.9.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.11</artifactId>
    <version>1.9.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
    <version>1.9.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
    <version>1.9.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>1.9.1</version>
</dependency>

3.2.2 TableEnvironment

TableEnvironment 是 Table API 和SQL集成的核心概念,它负责:

  • 在内部目录中注册表
  • 注册外部目录
  • 执行SQL查询
  • 注册用户定义的函数
  • DataStream 或 DataSet 转换为 Table
  • 持有 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用
    Table总是与特定的TableEnvironment 绑定。不能在同一查询中组合不同 TableEnvironments 的表(例如,union 或 join)。

创建 TableEnvironment:

// 基于流的tableEnv
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for streaming queries
val sTableEnv = StreamTableEnvironment.create(sEnv)
// 基于批的bTableEnv
val bEnv: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val bTableEnv: BatchTableEnvironment = BatchTableEnvironment.create(bEnv)

3.2.3 数据加载

数据加载通常有两种:一者基于流/批,一者基于TableSource,但是后者在Flink1.11中已经被废弃,所以不建议使用。

基于批
case class Student(id:Int,name:String,age:Int,gender:String,course:String,score:Int)
object FlinkBatchTableOps {
    
    def main(args: Array[String]): Unit = {
    
        //构建batch的executionEnvironment
        val env = ExecutionEnvironment.getExecutionEnvironment
        val bTEnv = BatchTableEnvironment.create(env)
        val dataSets: DataSet[Student] = env.readCsvFile[Student]("E:\\data\\student.csv",
            //是否忽略文件的第一行数据(主要考虑表头数据)
            ignoreFirstLine = true,
            //字段之间的分隔符
            fieldDelimiter = "|")
        //table 就相当于sparksql中的dataset
        val table: Table = bTEnv.fromDataSet(dataSets)
        //条件查询
        val result: Table = table.select("name,age").where("age=25")
        //打印输出
        bTEnv.toDataSet[Row](result).print()
    }
}
基于流
case class Goods(id: Int,brand:String,category:String)
object FlinkStreamTableOps {
    
    def main(args: Array[String]): Unit = {
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val sTEnv = StreamTableEnvironment.create(env)
        val dataStream: DataStream[Goods] = env.fromElements(
            "001|mi|mobile",
            "002|mi|mobile",
            "003|mi|mobile",
            "004|mi|mobile",
            "005|huawei|mobile",
            "006|huawei|mobile",
            "007|huawei|mobile",
            "008|Oppo|mobile",
            "009|Oppo|mobile",
            "010|uniqlo|clothing",
            "011|uniqlo|clothing",
            "012|uniqlo|clothing",
            "013|uniqlo|clothing",
            "014|uniqlo|clothing",
            "015|selected|clothing",
            "016|selected|clothing",
            "017|selected|clothing",
            "018|Armani|clothing",
            "019|lining|sports",
            "020|nike|sports",
            "021|adidas|sports",
            "022|nike|sports",
            "023|anta|sports",
            "024|lining|sports"
        ).map(line => {
    
            val fields = line.split("\\|")
            Goods(fields(0), fields(1), fields(2))
        })
        //load data from external system
        var table = sTEnv.fromDataStream(dataStream)
        // stream table api
        table.printSchema()
        // 高阶api的操作
        table = table.select("category").distinct()
        /*
            将一个table转化为一个DataStream的时候,有两种选择:
            1. toAppendStream  :在没有聚合操作的时候使用
            2. toRetractStream(缩放的含义) :在进行聚合操作之后使用
         */
        sTEnv.toRetractStream[Row](table).print()
        env.execute("FlinkStreamTableOps")
    }
}

3.2.4 sqlQuery

sql仍然是最主要的分析工具,使用dsl当然也能完成业务分析,但是灵活性,简易性上都不及sql。FlinkTable通过sqlQuery来完成sql的查询操作。

object FlinkSQLOps {
    
    def main(args: Array[String]): Unit = {
    
        val env = ExecutionEnvironment.getExecutionEnvironment
        val sTEnv = BatchTableEnvironment.create(env)
        val dataStream: DataSet[Goods] = env.fromElements(
            "001|mi|mobile",
            "002|mi|mobile",
            "003|mi|mobile",
            "004|mi|mobile",
            "005|huawei|mobile",
            "006|huawei|mobile",
            "007|huawei|mobile",
            "008|Oppo|mobile",
            "009|Oppo|mobile",
            "010|uniqlo|clothing",
            "011|uniqlo|clothing",
            "012|uniqlo|clothing",
            "013|uniqlo|clothing",
            "014|uniqlo|clothing",
            "015|selected|clothing",
            "016|selected|clothing",
            "017|selected|clothing",
            "018|Armani|clothing",
            "019|lining|sports",
            "020|nike|sports",
            "021|adidas|sports",
            "022|nike|sports",
            "023|anta|sports",
            "024|lining|sports"
        ).map(line => {
    
            val fields = line.split("\\|")
            Goods(fields(0), fields(1), fields(2))
        })
        //load data from external system
        sTEnv.registerTable("goods", dataStream)
        //sql操作
        var sql =
            """
              |select
              |   id,
              |   brand,
              |   category
              |from goods
              |""".stripMargin
        sql =
            """
              |select
              |   category,
              |   count(1) counts
              |from goods
              |group by category
              |order by counts desc
              |""".stripMargin
        table = sTEnv.sqlQuery(sql)
        sTEnv.toDataSet[Row](table).print()
    }
}

3.2.5 基于滚动窗口的Table操作

基于EventTIme滑动窗口操作
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.Table
import org.apache.flink.types.Row
//基于滚动窗口Table操作
object FlinkTrumblingWindowTableOps {
    
    def main(args: Array[String]): Unit = {
    
        //1、获取流式执行环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.setParallelism(1)
        // 2、获取table执行环境
        val tblEnv = StreamTableEnvironment.create(env)
        //3、获取数据源
        //输入数据:
        val ds = env.socketTextStream("node01", 9999)
                .map(line => {
    
                    val fields = line.split("\t")
                    UserLogin(fields(0), fields(1), fields(2), fields(3).toInt,
 fields(4))
                })
                .assignTimestampsAndWatermarks(
                    new BoundedOutOfOrdernessTimestampExtractor[UserLogin](Time.seconds(2)) {
    
                        override def extractTimestamp(userLogin: UserLogin): Long = {
    
                            userLogin.dataUnix * 1000
                        }
                    }
                )
        //4、将DataStream转换成table
        //引入隐式
        //某天每隔2秒的输入记录条数:
        import org.apache.flink.table.api.scala._
        val table: Table = tblEnv.fromDataStream[UserLogin](ds , 'platform, 'server, 'status, 'ts.rowtime)
//        tblEnv.toAppendStream[Row](table).print()
        tblEnv.sqlQuery(
            s"""
               |select
               |platform,
               |count(1) counts
               |from ${
      table}
               |where status = 'LOGIN'
               |group by platform, tumble(ts,interval '2' second)
               |""".stripMargin)
            .toAppendStream[Row]
            .print("每隔2秒不同平台登录用户->")
        env.execute()
    }
}
/** 用户登录
  *
  * @param platform 所在平台 id(e.g. H5/IOS/ADR/IOS_YY)
  * @param server   所在游戏服 id
  * @param uid      用户唯一 id
  * @param dataUnix 事件时间/s 时间戳
  * @param status   登录动作(LOGIN/LOGOUT)
  */
case class UserLogin(platform: String, server: String, uid: String,  dataUnix: Int, status: String)
基于窗口的processTime
object FlinkTrumblingWindowTableOps2 {
    
    def main(args: Array[String]): Unit = {
    
        //1、获取流式执行环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        // 2、获取table执行环境
        val tblEnv = StreamTableEnvironment.create(env)
        //3、获取数据源
        //输入数据:
        val ds = env.socketTextStream("node01", 9999)
                .map(line => {
    
                    val fields = line.split("\t")
                    UserLogin(fields(0), fields(1), fields(2), fields(3).toInt, fields(4))
                })
        //4、将DataStream转换成table
        //引入隐式
        //某天每隔2秒的输入记录条数:
        import org.apache.flink.table.api.scala._
        val table: Table = tblEnv.fromDataStream[UserLogin](ds , 'platform, 'server, 'status, 'ts.proctime)
        // tblEnv.toAppendStream[Row](table).print()
        tblEnv.sqlQuery(
            s"""
               |select
               |  platform,
               |  count(1) counts
               |from ${
      table}
               |where status = 'LOGIN'
               |group by platform, tumble(ts,interval '2' second)
               |""".stripMargin)
            .toAppendStream[Row]
            .print("prcotime-每隔2秒不同平台登录用户->")
        env.execute()
    }
}

3.3 Flink Table UDF

3.3.1 说明

自定义标量函数(User Defined Scalar Function)。一行输入一行输出。

3.3.2 数据

某个用户在某个时刻浏览了某个商品,以及商品的价值

{
    "userID": 2, "eventTime": "2020-10-01 10:02:00", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{
    "userID": 1, "eventTime": "2020-10-01 10:02:02", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{
    "userID": 2, "eventTime": "2020-10-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{
    "userID": 1, "eventTime": "2020-10-01 10:02:10", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{
    "userID": 2, "eventTime": "2020-10-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{
    "userID": 2, "eventTime": "2020-10-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{
    "userID": 1, "eventTime": "2020-10-01 10:02:12", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{
    "userID": 2, "eventTime": "2020-10-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20.99
{
    "userID": 2, "eventTime": "2020-10-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{
    "userID": 1, "eventTime": "2020-10-01 10:02:15", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{
    "userID": 1, "eventTime": "2020-10-01 10:02:16", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}

3.3.3 需求

  • UDF时间转换
  • UDF需要继承ScalarFunction抽象类,主要实现eval方法。
  • 自定义UDF,实现将eventTime转化为时间戳

3.3.4 实现

object FlinkTableUDFOps {
    
    def main(args: Array[String]): Unit = {
    
        val env = ExecutionEnvironment.getExecutionEnvironment
        val bTEnv = BatchTableEnvironment.create(env)
        val ds = env.fromElements(
            "{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:00\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 1, \"eventTime\": \"2020-10-01 10:02:02\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:06\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 1, \"eventTime\": \"2020-10-01 10:02:10\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:06\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:06\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 1, \"eventTime\": \"2020-10-01 10:02:12\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:06\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:06\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 1, \"eventTime\": \"2020-10-01 10:02:15\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 1, \"eventTime\": \"2020-10-01 10:02:16\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}"
        ).map(line => {
    
            val jsonObj = new JSONObject(line)
            val userID = jsonObj.getInt("userID")
            val eventTime = jsonObj.getString("eventTime")
            val eventType = jsonObj.getString("eventType")
            val productID = jsonObj.getString("productID")
            val productPrice = jsonObj.getDouble("productPrice")
            UserBrowseLog(userID, eventTime, eventType, productID, productPrice)
        })
        //自定义udf
        bTEnv.registerFunction("to_time", new TimeScalarFunction())
        bTEnv.registerFunction("myLen", new LenScalarFunction())
        val table = bTEnv.fromDataSet(ds)
        val sql =
            s"""
              |select
              |  userID,
              |  eventTime,
              |  myLen(eventTime) my_len_et,
              |  to_time(eventTime) timestamps
              |from ${
      table}
              |""".stripMargin
        val ret = bTEnv.sqlQuery(sql)

        bTEnv.toDataSet[Row](ret).print
    }
}
case class UserBrowseLog(
    userID: Int,
    eventTime: String,
    eventType: String,
    productID: String,
    productPrice: Double
)

/*
    自定义类去扩展ScalarFunction 复写其中的方法:eval
    at least one method named 'eval' which is public, not
 */
class TimeScalarFunction extends ScalarFunction {
    
    //2020-10-01 10:02:16
    private val df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    def eval(eventTime: String): Long = {
    
        df.parse(eventTime).getTime
    }
}

class LenScalarFunction extends ScalarFunction {
    
    //2020-10-01 10:02:16
    def eval(str: String): Int = {
    
        str.length
    }
}
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/MoLeft/article/details/124613097

智能推荐

输出光绘文件_轻松上手GoPro“光绘残影”视频制作-程序员宅基地

文章浏览阅读462次。视频教程图文教程步骤1:导入素材一、素材1、本视频的素材是使用GoPro Fusion 全景相机进行拍摄的,使用的道具是钢丝棉,在使用钢丝棉的过程中,请注意不要伤到自己与其他人,以及远离可燃物!2、其实拍摄的素材不仅仅可以用GoPro Fusion拍摄,还可以使用GoPro其他的设备选一个超棒的角度,也可以得到意想不到的效果!3、拍摄的素材还可以是星轨、光绘等,等着..._视频转光绘图片

类型别名type、接口interface与文字类型_给interface 导出一个别名-程序员宅基地

文章浏览阅读545次。类型别名类型别名就是给类型定义一个名字,然后用这个名字来代替这个类型,使用 type 来声明。可以定义对象类型,可以是联合类型,也可以是基元类型,或者其他的类型,一般使用大驼峰命名法。注意:type 定义的只是个类型,后面等于的就是类型的名字,并不是变量来的。对象类型例子type Point = { x: number y: number}function printCoord(pt: Point) { console.log(pt)}printCoord({ _给interface 导出一个别名

SpringBoot+Thymleaf+Python+Echarts+Neo4j实现知识图谱可视化_集成neo4j知识图谱可视化-程序员宅基地

文章浏览阅读4.6k次。方案一:SpringBoot配置多数据源配置Mysql多数据库具体可以参考:https://www.w3cschool.cn/article/50807439.html配置Mysql数据库与Neo4j数据库参考:https://gitee.com/baomidou/dynamic-datasource-spring-boot-starter具体如下:1.引入dynamic-datasource-spring-boot-starter。<dependency> <group_集成neo4j知识图谱可视化

vue-cli4 打包优化之 moment 和 lodash_lodashmodulereplacementplugin-程序员宅基地

文章浏览阅读1.9k次。babel.config.jsmodule.exports = { presets: [ '@vue/app', [ '@babel/preset-env', { corejs: 3, useBuiltIns: 'entry' } ] ], plugins: [ 'lodash' ..._lodashmodulereplacementplugin

Yolo3 darknet voc格式数据集制作训练经验_darknet 数据集制作-程序员宅基地

文章浏览阅读684次。数据集中有两张图像属于同一类物体但是Scale尺度差异太大(就是图像大小一致,但图像中的物体一个特别大,另一个特别小),会导致系统崩溃。解决办法也很简单,在中间插入无关的一个类别作为分界线,而且最好插入2张以上。..._darknet 数据集制作

mysql使用某一列的内容赋值给另外一列,concat()函数_mysql判断列中的字符结果填充另一列-程序员宅基地

文章浏览阅读3.2k次。关键函数:concat(str1,str2,…)描述:连接字符串函数,返回结果为连接参数产生的字符串。如有任何一个参数为NULL,则返回值为NULL。如果参数是数字,则自动转换为字符串示例:数据库表t_player的格式如下图目的:我们要修改列nickname的值为Tourist+列showUID的格式办法:update t_player set nickname = co..._mysql判断列中的字符结果填充另一列

随便推点

MyBatis 里面如何配置log4j_mybatis log4j配置过程-程序员宅基地

文章浏览阅读2k次。一 :在Mavan的 pom.xml 中导入相关的依赖二: 添加log4j.properties① 该配置文件的位置必须是在Resources 目录下,解析XML文件时会寻找到该文件进行解析② log4j 的配置具体配置内容(这里可以直接 CV 过去)# Set root category priority to INFO and its only appender to CONSOLE.#log4j.rootCategory=INFO, CONSOLE debug _mybatis log4j配置过程

交换机路由器、ARP欺骗基本知识及实验_交换机 arp欺骗-程序员宅基地

文章浏览阅读1.7k次,点赞3次,收藏12次。数据链路层里的帧结构数据部分最多1500字节,帧最大一共6+6+2+1500+4=1518字节。交换机工作原理原理:收到一个数据帧后,首先学习原MAC地址来形成MAC地址表,然后检查帧中的目标MAC地址,并匹配MAC地址表,如表中又匹配项,则单播转发,若没有,除接受端口外广播转发,MAC地址表的老化时间默认是300秒(可修改)。通过二层设备交换机直连的两台主机之间数据传输假设现在 A 要向 B 发送数据,那么 A 首先要对发送的数据进行封装,在每一层会加上相应的数据头,传输层主要是加上源和目标端_交换机 arp欺骗

Android 系统之 AudioTrack 回顾小结_audiotrack.write_non_blocking-程序员宅基地

文章浏览阅读5.2k次。AudioTrack1. MODE_STATIC 和 MODE_STREAM2. audio buffer3. 应用层AudioTrack的使用关于write()关于StreamTypegetMinBufferSize()4. Framework native层AudioTrack的创建5. AudioTrack的处理几个音频概念transfer_typeAudioT..._audiotrack.write_non_blocking

邻水职中计算机公开课教案,计算机基础公开课教案.doc-程序员宅基地

文章浏览阅读178次。PAGEPAGE 4计算机应用基础公开课教案授课人:袁涛 授课对象:机电工程系2011级学生时间:2011年12月8日 星期四 上午一、二节课题:excel中数据的基本处理一、教学目标:知识与技能1、掌握一些常见函数的使用方法2、会对一组数据排序、筛选(二)过程与方法1、锻炼学生恰当、自如地使用函数的能力;2、培养学生收集、分析、处理数据的能力;3、培养自主探索,合作交流能力..._计算机应用基础公开课教案

vue日期范围选择器_Vue.js日期范围选择器,具有多个范围和预设-程序员宅基地

文章浏览阅读3.9k次。vue日期范围选择器 vue-mj-daterangepicker (vue-mj-daterangepicker)Vue.js date range picker with multiples ranges and presets (vue 2.x) . Vue.js日期范围选择器,具有多个范围和预设(vue 2.x)。 View demo查看演示 Download Source 下载..._mj-daterange-picker

适用于Spring Boot的数据字典翻译Starter_springboot 自动翻译数据字段数据-程序员宅基地

文章浏览阅读2.1k次,点赞6次,收藏14次。适用于Spring Boot的字典翻译扩展在常见的web应用中,有很多数据库字段会使用字典值,但是在数据查询时,我们需要将存储的字典值转换成对应的字典标签(value>>name),用于展示给用户。常见的转换方式为从数据库查询、逻辑包装等,这样的字段一旦有很多的话,就非常的不方便,所以我就做了这个扩展项目。总述我做的是一个基于Spring Boot的扩展starter,项目代码已经上传到github:dict-traslate-starter ,这篇文档先说一下用法然后讲一下设计思路与_springboot 自动翻译数据字段数据

推荐文章

热门文章

相关标签