创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment 会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。
// 初始化环境
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment
如果没有设置并行度,会以 flink-conf.yaml 中的配置为准,默认是 1
在一个本地内存中,生成一个集合作为Flink处理的source。
离线处理代码如下:
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
object ListSource {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val listDataSet: DataSet[String] = env.fromCollection(List("hadoop spark","hive hbase"))
listDataSet.print()
}
}
实时处理代码如下:
import org.apache.flink.api.scala.{
ExecutionEnvironment, _}
import org.apache.flink.streaming.api.scala.{
DataStream, StreamExecutionEnvironment}
object ListSourceStream {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val listDataStream: DataStream[String] = env.fromCollection(List("hadoop spark","hive hbase"))
listDataStream.print()
env.execute("ListSourceStream is runned")
}
}
导入本地文本数据作为数据源。
离线处理代码如下:
import org.apache.flink.api.scala.{
DataSet, ExecutionEnvironment}
object FileSource {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val fileDataSet = env.readTextFile("C:\\Users\\thinkpad\\Desktop\\words.txt")
fileDataSet.print()
}
}
实时处理代码如下:
import org.apache.flink.streaming.api.scala.{
DataStream, StreamExecutionEnvironment}
object FileSourceStream {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val fileDataStream: DataStream[String] = env.readTextFile("C:\\Users\\thinkpad\\Desktop\\words.txt")
fileDataStream.print()
env.execute("FileSourceStream is runned")
}
}
读取hdfs文件,作为数据源。
离线处理代码如下:
import org.apache.flink.api.scala.{
DataSet, ExecutionEnvironment}
object hdfsSource {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val hdfsDataSet: DataSet[String] = env.readTextFile("hdfs://linux01:9000/a.txt")
hdfsDataSet.print()
}
}
实时处理代码如下:
import org.apache.flink.streaming.api.scala.{
DataStream, StreamExecutionEnvironment}
object hdfsSourceStream {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val hdfsDataStream: DataStream[String] = env.readTextFile("hdfs://linux01:9000/a.txt")
hdfsDataStream.print()
env.execute("hdfsSourceStream is runned")
}
}
处理代码如下:
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{
DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.api.scala._
object kafkaSourceStream {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092")
props.setProperty("group.id", "consumer-group")
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.setProperty("auto.offset.reset", "latest")
//SimpleStringSchema反序列化工具
val kafkaDataStream: DataStream[String] =
env.addSource(new FlinkKafkaConsumer010[String]("test",new SimpleStringSchema(),props))
kafkaDataStream.print()
env.execute(“kafkaSourceStream is runned”)
}
}
除了以上的source数据来源,我们还可以自定义source,只是继承SourceFunction即可。
自定义source代码如下:
import org.apache.flink.streaming.api.functions.source.SourceFunction
class MySource extends SourceFunction[String] {
//定义标志位用来标记是否正常运行
var running = true
override def cancel(): Unit = {
running = false
}
override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
val data: Range.Inclusive = 1.to(10)
while (running) {
data.foreach(t => {
sourceContext.collect(t.toString)
})
}
}
}
调用自定义source代码如下:
import org.apache.flink.streaming.api.scala.{
DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
object DefineSource {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val defineSource: DataStream[String] = env.addSource(new MySource())
defineSource.print()
env.execute("DefineSource is runned")
}
}
sink 也就是Flink运行完后,最终要将数据输出到哪儿。
将数据最终输出到内存中的集合中。
示例代码如下:
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
object listSink {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val listDataSet: DataSet[String] = env.fromCollection(List("hadoop","spark","hive"))
val list: Seq[String] = listDataSet.collect()
list.foreach(println(_))
}
}
1.3.2基于本地文件的sink
将结果输出到本地文件系统中。
示例代码如下:
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
object fileSink {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val fileDataSet: DataSet[String] = env.fromCollection(List("hadoop","spark"))
fileDataSet.writeAsText("C:\\Users\\thinkpad\\Desktop\\print.txt")
env.execute("fileSink is runned")
}
}
将结果输出到hdfs文件系统中。
示例代码如下:
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
object hdfsSink {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val hdfsDataSet: DataSet[String] = env.fromCollection(List("hadoop","spark"))
hdfsDataSet.writeAsText("hdfs://linux01:9000/hdfsSink")
env.execute("hdfsSink is runned")
}
}
将结果输出到kafka文件系统中,用flink作为kafka的生产者。
示例代码如下:
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{
DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
object kafkaSink {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")
props.setProperty("group.id", "consumer-group")
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.setProperty("auto.offset.reset", "latest")
val listDataStream: DataStream[String] = env.fromCollection(List("hadoop","spark"))
listDataStream.addSink(new FlinkKafkaProducer010[String]("linux01:9092,linux02:9092,linux03:9092","test",new SimpleStringSchema()))
env.execute("kafkaSink is runned")
}
}
将计算结果存储到关系数据库中,如mysql等。
导入依赖:
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
实现MyJdbcSink类,继承RichSinkFunction,用来是实现保存到mysql中调用的命令。
import java.sql
import java.sql.DriverManager
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{
RichSinkFunction, SinkFunction}
//为什么继承的是富函数
class MyJdbcSink extends RichSinkFunction[String] {
//定义连接参数成员属性
var conn: Connection = _
var prepare: PreparedStatement = _
//打开连接
override def open(parameters: Configuration): Unit = {
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata",
"root", "root")
prepare= conn.prepareStatement("INSERT INTO infoTest VALUES (?, ?)")
}
//执行sql语句
override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {
prepare.setString(1,value)
prepare.setString(2,value)
prepare.execute()
}
//关闭资源
override def close(): Unit = {
prepare.close()
conn.close()
}
}
将结果写入mysql,调用自定义mysql类,代码如下:
import org.apache.flink.streaming.api.scala.{
DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
object mysqlSInk {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val listDataSream: DataStream[String] = env.fromCollection(List("hadoop","spark"))
listDataSream.addSink(new MyJdbcSink())
env.execute("mysqlSInk is runned")
}
}
将计算结果存储到redis非关系数据库中。
导入flink-redis依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.0</version>
</dependency>
定义一个redis的mapper类,继承RedisMapper类,用于定义保存到 redis时调用的命令,代码如下:
import org.apache.flink.streaming.connectors.redis.common.mapper.{
RedisCommand, RedisCommandDescription, RedisMapper}
class MyRedisMapper extends RedisMapper[String]{
//定义保存到redis中的命令
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET,"redis")
}
override def getKeyFromData(t: String): String = {
t.hashCode.toString
}
override def getValueFromData(t: String): String = {
t
}
}
将结果输入到redis代码如下:
import org.apache.flink.streaming.api.scala.{
DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
object RedisSink {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val listDataStream: DataStream[String] = env.fromCollection(List("hadoop","spark"))
val conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).setDatabase(0).build()
listDataStream.addSink(new RedisSink[String](conf,new MyRedisMapper))
env.execute("RedisSink is runned")
}
}
在flink中有类似于spark的一类转换算子,就是transform,在Flink的编程体系中,我们获取到数据源之后,需要经过一系列的处理即transformation操作,再将最终结果输出到目的Sink使数据落地。
常用的transform转换算子如下:
Transformation | 说明 |
---|---|
map | 将DataSet中的每一个元素转换为另外一个元素 |
flatMap | 将DataSet中的每一个元素转换为0…n个元素 |
mapPartition | 将一个分区中的元素转换为另一个元素 |
filter | 过滤出来一些符合条件的元素 |
reduce | 可以对一个dataset或者一个group来进行聚合计算,最终聚合成一个元素 |
reduceGroup | 将一个dataset或者一个group聚合成一个或多个元素 |
aggregate | 按照内置的方式来进行聚合。例如:SUM/MIN/MAX… |
distinct | 去重 |
join | 将两个DataSet按照一定条件连接到一起,形成新的DataSet |
union | 将两个DataSet取并集,并自动进行去重 |
KeyBy | 逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素,在内部以 hash 的形式实现的 |
Split | 根据某些特征把一个 DataStream 拆分成两个或者多个 |
Select | 从一个 SplitStream 中获取一个或者多个 DataStream |
Connect | 连接两个保持他们类型的数据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。 |
CoMap,CoFlatMap | 跟map and flatMap类似,只不过作用在ConnectedStreams上 |
rebalance | 让每个分区的数据均匀分布,避免数据倾斜 |
partitionByHash | 按照指定的key进行hash分区 |
sortPartition | 指定字段对分区中的数据进行排序 |
将DataSet中的每一个元素转换为另外一种形式的元素
示例代码如下:
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
object Transform {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val listDataSet: DataSet[Int] = env.fromCollection(List(1,2,3))
val result: DataSet[Int] = listDataSet.map(_*2)
result.print()
}
}
flatMap也是一种类似于遍历循环,是将每一个元素按照特定的标识切分,变成多个元素。
如将集合中每个元素按照空格切分。
示例代码如下:
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
object Transform {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val listDataSet: DataSet[String] = env.fromCollection(List("hadoop spark","hive mysql","hbase kafka"))
val result: DataSet[String] = listDataSet.flatMap(_.split(" "))
result.print()
}
}
mapPartition:中的函数是在每个分区运行一次
map :每个元素运行一次
mapPartition是按照分区进行处理数据,传入是一个迭代,是将分区中的元素进行转换,map 和 mapPartition 的效果是一样的,但如果在map的函数中,需要访问一些外部存储。例如:
访问mysql数据库,需要打开连接,此时map效率较低。而使用 mapPartition 可以有效减少连接数,提高效率。
示例代码如下:
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
object Transform {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val listDataSet: DataSet[String] = env.fromCollection(List("hadoop spark","hive mysql","hbase kafka"))
val result: DataSet[String] = listDataSet.mapPartition(iter => {
iter.flatMap(_.split(" "))
})
result.print()
}
}
filter是遍历循环dataset中每一个元素,filter中满足表达式的过滤出来,不满足表达式的过滤掉。
示例代码如下:
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
object Transform {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val listDataSet: DataSet[String] = env.fromCollection(List("hadoop spark","hive","hbase kafka"))
val result: DataSet[String] = listDataSet.filter(_.length>=5)
result.print()
}
}
reduce是对一个 dataset 或者一个 group 来进行聚合计算,按照表达逻辑最终聚合成一个元素。
示例代码如下:
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
object Transform {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val listDataSet: DataSet[Int] = env.fromCollection(List(1,2,3,4))
val result = listDataSet.reduce(_+_)
result.print()
}
}
streaming 流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而 window 是一种切割无限数据为有限块进行处理的手段。
Window 是无限数据流处理的核心,Window 将一个无限的 stream 拆分成有
Window 可以分成两类:CountWindow:按照指定的数据条数生成一个 Window,与时间无关;TimeWindow:按照时间生成 Window。
CountWindow 根据窗口中相同 key 元素的数量来触发执行,执行时只计算元素数量达到窗口大小的 key 对应的结果。
注意:CountWindow 的 window_size 指的是相同 Key 的元素的个数,不是输入的所有元素的总数。
默认的 CountWindow 是一个滚动窗口,只需要指定窗口大小即可,当相同key元素数量达到窗口大小时,就会触发窗口的执行。
object Windows {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val file: DataStream[String] = env.socketTextStream("node01",9999)
val countStream: DataStream[(String, Int)] = file.flatMap(_.split(" "))
.map((_, 1))
.keyBy(0)
.countWindow(2)
.sum(1)
countStream.print()
env.execute("Windows is runned")
}
}
滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是 window_size,一个是 sliding_size。
下面代码中的 sliding_size 设置为了 2,也就是说,每收到两个相同 key 的数据就计算一次,每一次计算的 window 范围是 5 个元素。
object Windows {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val file: DataStream[String] = env.socketTextStream("node01",9999)
val countStream: DataStream[(String, Int)] = file.flatMap(_.split(" "))
.map((_, 1))
.keyBy(0)
.countWindow(5,2)
.sum(1)
countStream.print()
env.execute("Windows is runned")
}
}
对于 TimeWindow,可以根据窗口实现原理的不同分成三类:
将数据依据固定的窗口长度对数据进行切片。
特点:时间对齐,窗口长度固定,没有重叠。所有的数据只能落在一个窗口里面
滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。例如:如果你指定了一个 5 分钟大小的滚动窗口
适用场景: 适合做 BI 统计等(做每个时间段的聚合计算)。
滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。一次数据统计的时间长度 每次统计移动多长的时间
特点:时间对齐,窗口长度固定,可以有重叠。一个数据可以被统计多次,滑动间隔、窗口长度是某个数值的整数倍
滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。
例如,你有 10 分钟的窗口和 5 分钟的滑动,那么每个窗口中 5 分钟的窗口里包含着上个 10 分钟产生的数据
电商网站: 登录一个系统之后,多长时间没有操作,session就失效。
手机银行: 登录一个系统之后,多长时间没有操作,session就失效要求重新登录。
由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。
特点: 时间无对齐。多长时间之内没有收到数据,这个不是人为能规定的。
session 窗口分配器通过 session 活动来对元素进行分组,session 窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那这个窗口就会关闭。一个 session 窗口通过一个 session 间隔来配置,这个 session 间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的 session 将关闭并且后续的元素将被分配到新的 session 窗口中去
window function 定义了要对窗口中收集的数据做的计算操作,主要可以分为两类:
每条数据到来就进行计算,保持一个简单的状态。典型的增量聚合函数有ReduceFunction, AggregateFunction。
先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。ProcessWindowFunction 就是一个全窗口函数。
trigger()
触发器 定义 window 什么时候关闭,触发计算并输出结果
evitor()
移除器 定义移除某些数据的逻辑
allowedLateness()
允许处理迟到的数据
sideOutputLateData()
将迟到的数据放入侧输出流
getSideOutput()
获取侧输出流
Table API是流处理和批处理通用的关系型 API,Table API 可以基于流输入或者批输入来运行而不需要进行任何修改。Table API 是 SQL 语言的超集并专门为 Apache Flink 设计的,Table API 是 Scala 和 Java 语言集成式的 API。与常规 SQL 语言中将查询指定为字符串不同,Table API 查询是以 Java 或 Scala 中的语言嵌入样式来定义的,具有 IDE 支持如:自动完成和语法检测;允许以非常直观的方式组合关系运算符的查询,例如 select,filter 和 join。Flink SQL 的支持是基于实现了SQL标准的 Apache Calcite。无论输入是批输入(DataSet)还是流输入(DataStream),任一接口中指定的查询都具有相同的语义并指定相同的结果。
<!-- flink-table&sql -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>1.9.1</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.9.1</version>
</dependency>
TableEnvironment 是 Table API 和SQL集成的核心概念,它负责:
创建 TableEnvironment:
// 基于流的tableEnv
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for streaming queries
val sTableEnv = StreamTableEnvironment.create(sEnv)
// 基于批的bTableEnv
val bEnv: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val bTableEnv: BatchTableEnvironment = BatchTableEnvironment.create(bEnv)
数据加载通常有两种:一者基于流/批,一者基于TableSource,但是后者在Flink1.11中已经被废弃,所以不建议使用。
case class Student(id:Int,name:String,age:Int,gender:String,course:String,score:Int)
object FlinkBatchTableOps {
def main(args: Array[String]): Unit = {
//构建batch的executionEnvironment
val env = ExecutionEnvironment.getExecutionEnvironment
val bTEnv = BatchTableEnvironment.create(env)
val dataSets: DataSet[Student] = env.readCsvFile[Student]("E:\\data\\student.csv",
//是否忽略文件的第一行数据(主要考虑表头数据)
ignoreFirstLine = true,
//字段之间的分隔符
fieldDelimiter = "|")
//table 就相当于sparksql中的dataset
val table: Table = bTEnv.fromDataSet(dataSets)
//条件查询
val result: Table = table.select("name,age").where("age=25")
//打印输出
bTEnv.toDataSet[Row](result).print()
}
}
case class Goods(id: Int,brand:String,category:String)
object FlinkStreamTableOps {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val sTEnv = StreamTableEnvironment.create(env)
val dataStream: DataStream[Goods] = env.fromElements(
"001|mi|mobile",
"002|mi|mobile",
"003|mi|mobile",
"004|mi|mobile",
"005|huawei|mobile",
"006|huawei|mobile",
"007|huawei|mobile",
"008|Oppo|mobile",
"009|Oppo|mobile",
"010|uniqlo|clothing",
"011|uniqlo|clothing",
"012|uniqlo|clothing",
"013|uniqlo|clothing",
"014|uniqlo|clothing",
"015|selected|clothing",
"016|selected|clothing",
"017|selected|clothing",
"018|Armani|clothing",
"019|lining|sports",
"020|nike|sports",
"021|adidas|sports",
"022|nike|sports",
"023|anta|sports",
"024|lining|sports"
).map(line => {
val fields = line.split("\\|")
Goods(fields(0), fields(1), fields(2))
})
//load data from external system
var table = sTEnv.fromDataStream(dataStream)
// stream table api
table.printSchema()
// 高阶api的操作
table = table.select("category").distinct()
/*
将一个table转化为一个DataStream的时候,有两种选择:
1. toAppendStream :在没有聚合操作的时候使用
2. toRetractStream(缩放的含义) :在进行聚合操作之后使用
*/
sTEnv.toRetractStream[Row](table).print()
env.execute("FlinkStreamTableOps")
}
}
sql仍然是最主要的分析工具,使用dsl当然也能完成业务分析,但是灵活性,简易性上都不及sql。FlinkTable通过sqlQuery来完成sql的查询操作。
object FlinkSQLOps {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val sTEnv = BatchTableEnvironment.create(env)
val dataStream: DataSet[Goods] = env.fromElements(
"001|mi|mobile",
"002|mi|mobile",
"003|mi|mobile",
"004|mi|mobile",
"005|huawei|mobile",
"006|huawei|mobile",
"007|huawei|mobile",
"008|Oppo|mobile",
"009|Oppo|mobile",
"010|uniqlo|clothing",
"011|uniqlo|clothing",
"012|uniqlo|clothing",
"013|uniqlo|clothing",
"014|uniqlo|clothing",
"015|selected|clothing",
"016|selected|clothing",
"017|selected|clothing",
"018|Armani|clothing",
"019|lining|sports",
"020|nike|sports",
"021|adidas|sports",
"022|nike|sports",
"023|anta|sports",
"024|lining|sports"
).map(line => {
val fields = line.split("\\|")
Goods(fields(0), fields(1), fields(2))
})
//load data from external system
sTEnv.registerTable("goods", dataStream)
//sql操作
var sql =
"""
|select
| id,
| brand,
| category
|from goods
|""".stripMargin
sql =
"""
|select
| category,
| count(1) counts
|from goods
|group by category
|order by counts desc
|""".stripMargin
table = sTEnv.sqlQuery(sql)
sTEnv.toDataSet[Row](table).print()
}
}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.Table
import org.apache.flink.types.Row
//基于滚动窗口Table操作
object FlinkTrumblingWindowTableOps {
def main(args: Array[String]): Unit = {
//1、获取流式执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
// 2、获取table执行环境
val tblEnv = StreamTableEnvironment.create(env)
//3、获取数据源
//输入数据:
val ds = env.socketTextStream("node01", 9999)
.map(line => {
val fields = line.split("\t")
UserLogin(fields(0), fields(1), fields(2), fields(3).toInt,
fields(4))
})
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[UserLogin](Time.seconds(2)) {
override def extractTimestamp(userLogin: UserLogin): Long = {
userLogin.dataUnix * 1000
}
}
)
//4、将DataStream转换成table
//引入隐式
//某天每隔2秒的输入记录条数:
import org.apache.flink.table.api.scala._
val table: Table = tblEnv.fromDataStream[UserLogin](ds , 'platform, 'server, 'status, 'ts.rowtime)
// tblEnv.toAppendStream[Row](table).print()
tblEnv.sqlQuery(
s"""
|select
|platform,
|count(1) counts
|from ${
table}
|where status = 'LOGIN'
|group by platform, tumble(ts,interval '2' second)
|""".stripMargin)
.toAppendStream[Row]
.print("每隔2秒不同平台登录用户->")
env.execute()
}
}
/** 用户登录
*
* @param platform 所在平台 id(e.g. H5/IOS/ADR/IOS_YY)
* @param server 所在游戏服 id
* @param uid 用户唯一 id
* @param dataUnix 事件时间/s 时间戳
* @param status 登录动作(LOGIN/LOGOUT)
*/
case class UserLogin(platform: String, server: String, uid: String, dataUnix: Int, status: String)
object FlinkTrumblingWindowTableOps2 {
def main(args: Array[String]): Unit = {
//1、获取流式执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 2、获取table执行环境
val tblEnv = StreamTableEnvironment.create(env)
//3、获取数据源
//输入数据:
val ds = env.socketTextStream("node01", 9999)
.map(line => {
val fields = line.split("\t")
UserLogin(fields(0), fields(1), fields(2), fields(3).toInt, fields(4))
})
//4、将DataStream转换成table
//引入隐式
//某天每隔2秒的输入记录条数:
import org.apache.flink.table.api.scala._
val table: Table = tblEnv.fromDataStream[UserLogin](ds , 'platform, 'server, 'status, 'ts.proctime)
// tblEnv.toAppendStream[Row](table).print()
tblEnv.sqlQuery(
s"""
|select
| platform,
| count(1) counts
|from ${
table}
|where status = 'LOGIN'
|group by platform, tumble(ts,interval '2' second)
|""".stripMargin)
.toAppendStream[Row]
.print("prcotime-每隔2秒不同平台登录用户->")
env.execute()
}
}
自定义标量函数(User Defined Scalar Function)。一行输入一行输出。
某个用户在某个时刻浏览了某个商品,以及商品的价值
{
"userID": 2, "eventTime": "2020-10-01 10:02:00", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{
"userID": 1, "eventTime": "2020-10-01 10:02:02", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{
"userID": 2, "eventTime": "2020-10-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{
"userID": 1, "eventTime": "2020-10-01 10:02:10", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{
"userID": 2, "eventTime": "2020-10-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{
"userID": 2, "eventTime": "2020-10-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{
"userID": 1, "eventTime": "2020-10-01 10:02:12", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{
"userID": 2, "eventTime": "2020-10-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20.99
{
"userID": 2, "eventTime": "2020-10-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{
"userID": 1, "eventTime": "2020-10-01 10:02:15", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{
"userID": 1, "eventTime": "2020-10-01 10:02:16", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
ScalarFunction
抽象类,主要实现eval方法。object FlinkTableUDFOps {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val bTEnv = BatchTableEnvironment.create(env)
val ds = env.fromElements(
"{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:00\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
"{\"userID\": 1, \"eventTime\": \"2020-10-01 10:02:02\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
"{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:06\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
"{\"userID\": 1, \"eventTime\": \"2020-10-01 10:02:10\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
"{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:06\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
"{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:06\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
"{\"userID\": 1, \"eventTime\": \"2020-10-01 10:02:12\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
"{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:06\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
"{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:06\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
"{\"userID\": 1, \"eventTime\": \"2020-10-01 10:02:15\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
"{\"userID\": 1, \"eventTime\": \"2020-10-01 10:02:16\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}"
).map(line => {
val jsonObj = new JSONObject(line)
val userID = jsonObj.getInt("userID")
val eventTime = jsonObj.getString("eventTime")
val eventType = jsonObj.getString("eventType")
val productID = jsonObj.getString("productID")
val productPrice = jsonObj.getDouble("productPrice")
UserBrowseLog(userID, eventTime, eventType, productID, productPrice)
})
//自定义udf
bTEnv.registerFunction("to_time", new TimeScalarFunction())
bTEnv.registerFunction("myLen", new LenScalarFunction())
val table = bTEnv.fromDataSet(ds)
val sql =
s"""
|select
| userID,
| eventTime,
| myLen(eventTime) my_len_et,
| to_time(eventTime) timestamps
|from ${
table}
|""".stripMargin
val ret = bTEnv.sqlQuery(sql)
bTEnv.toDataSet[Row](ret).print
}
}
case class UserBrowseLog(
userID: Int,
eventTime: String,
eventType: String,
productID: String,
productPrice: Double
)
/*
自定义类去扩展ScalarFunction 复写其中的方法:eval
at least one method named 'eval' which is public, not
*/
class TimeScalarFunction extends ScalarFunction {
//2020-10-01 10:02:16
private val df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
def eval(eventTime: String): Long = {
df.parse(eventTime).getTime
}
}
class LenScalarFunction extends ScalarFunction {
//2020-10-01 10:02:16
def eval(str: String): Int = {
str.length
}
}
文章浏览阅读462次。视频教程图文教程步骤1:导入素材一、素材1、本视频的素材是使用GoPro Fusion 全景相机进行拍摄的,使用的道具是钢丝棉,在使用钢丝棉的过程中,请注意不要伤到自己与其他人,以及远离可燃物!2、其实拍摄的素材不仅仅可以用GoPro Fusion拍摄,还可以使用GoPro其他的设备选一个超棒的角度,也可以得到意想不到的效果!3、拍摄的素材还可以是星轨、光绘等,等着..._视频转光绘图片
文章浏览阅读545次。类型别名类型别名就是给类型定义一个名字,然后用这个名字来代替这个类型,使用 type 来声明。可以定义对象类型,可以是联合类型,也可以是基元类型,或者其他的类型,一般使用大驼峰命名法。注意:type 定义的只是个类型,后面等于的就是类型的名字,并不是变量来的。对象类型例子type Point = { x: number y: number}function printCoord(pt: Point) { console.log(pt)}printCoord({ _给interface 导出一个别名
文章浏览阅读4.6k次。方案一:SpringBoot配置多数据源配置Mysql多数据库具体可以参考:https://www.w3cschool.cn/article/50807439.html配置Mysql数据库与Neo4j数据库参考:https://gitee.com/baomidou/dynamic-datasource-spring-boot-starter具体如下:1.引入dynamic-datasource-spring-boot-starter。<dependency> <group_集成neo4j知识图谱可视化
文章浏览阅读1.9k次。babel.config.jsmodule.exports = { presets: [ '@vue/app', [ '@babel/preset-env', { corejs: 3, useBuiltIns: 'entry' } ] ], plugins: [ 'lodash' ..._lodashmodulereplacementplugin
文章浏览阅读684次。数据集中有两张图像属于同一类物体但是Scale尺度差异太大(就是图像大小一致,但图像中的物体一个特别大,另一个特别小),会导致系统崩溃。解决办法也很简单,在中间插入无关的一个类别作为分界线,而且最好插入2张以上。..._darknet 数据集制作
文章浏览阅读3.2k次。关键函数:concat(str1,str2,…)描述:连接字符串函数,返回结果为连接参数产生的字符串。如有任何一个参数为NULL,则返回值为NULL。如果参数是数字,则自动转换为字符串示例:数据库表t_player的格式如下图目的:我们要修改列nickname的值为Tourist+列showUID的格式办法:update t_player set nickname = co..._mysql判断列中的字符结果填充另一列
文章浏览阅读2k次。一 :在Mavan的 pom.xml 中导入相关的依赖二: 添加log4j.properties① 该配置文件的位置必须是在Resources 目录下,解析XML文件时会寻找到该文件进行解析② log4j 的配置具体配置内容(这里可以直接 CV 过去)# Set root category priority to INFO and its only appender to CONSOLE.#log4j.rootCategory=INFO, CONSOLE debug _mybatis log4j配置过程
文章浏览阅读1.7k次,点赞3次,收藏12次。数据链路层里的帧结构数据部分最多1500字节,帧最大一共6+6+2+1500+4=1518字节。交换机工作原理原理:收到一个数据帧后,首先学习原MAC地址来形成MAC地址表,然后检查帧中的目标MAC地址,并匹配MAC地址表,如表中又匹配项,则单播转发,若没有,除接受端口外广播转发,MAC地址表的老化时间默认是300秒(可修改)。通过二层设备交换机直连的两台主机之间数据传输假设现在 A 要向 B 发送数据,那么 A 首先要对发送的数据进行封装,在每一层会加上相应的数据头,传输层主要是加上源和目标端_交换机 arp欺骗
文章浏览阅读5.2k次。AudioTrack1. MODE_STATIC 和 MODE_STREAM2. audio buffer3. 应用层AudioTrack的使用关于write()关于StreamTypegetMinBufferSize()4. Framework native层AudioTrack的创建5. AudioTrack的处理几个音频概念transfer_typeAudioT..._audiotrack.write_non_blocking
文章浏览阅读178次。PAGEPAGE 4计算机应用基础公开课教案授课人:袁涛 授课对象:机电工程系2011级学生时间:2011年12月8日 星期四 上午一、二节课题:excel中数据的基本处理一、教学目标:知识与技能1、掌握一些常见函数的使用方法2、会对一组数据排序、筛选(二)过程与方法1、锻炼学生恰当、自如地使用函数的能力;2、培养学生收集、分析、处理数据的能力;3、培养自主探索,合作交流能力..._计算机应用基础公开课教案
文章浏览阅读3.9k次。vue日期范围选择器 vue-mj-daterangepicker (vue-mj-daterangepicker)Vue.js date range picker with multiples ranges and presets (vue 2.x) . Vue.js日期范围选择器,具有多个范围和预设(vue 2.x)。 View demo查看演示 Download Source 下载..._mj-daterange-picker
文章浏览阅读2.1k次,点赞6次,收藏14次。适用于Spring Boot的字典翻译扩展在常见的web应用中,有很多数据库字段会使用字典值,但是在数据查询时,我们需要将存储的字典值转换成对应的字典标签(value>>name),用于展示给用户。常见的转换方式为从数据库查询、逻辑包装等,这样的字段一旦有很多的话,就非常的不方便,所以我就做了这个扩展项目。总述我做的是一个基于Spring Boot的扩展starter,项目代码已经上传到github:dict-traslate-starter ,这篇文档先说一下用法然后讲一下设计思路与_springboot 自动翻译数据字段数据