Flink SQL 自定义 Sink_shengjk1的博客-程序员ITS301_flink sql 自定义sink

技术标签: Flink SQL  Apache Flink  SQL 自定义 sink  工作之行  Flink 自定义 sink  

1.背景

内部要做 Flink SQL 平台,本文以自定义 Redis Sink 为例来说明 Flink SQL 如何自定义 Sink 以及自定义完了之后如何使用
基于 Flink 1.11

2.步骤

  1. implements DynamicTableSinkFactory
  2. implements DynamicTableSink
  3. 创建 Redis Sink

3.自定义 sink 代码

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.DynamicTableSink.DataStructureConverter;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import redis.clients.jedis.JedisCluster;

import java.util.*;

import static org.apache.flink.configuration.ConfigOptions.key;


/**
 * @author shengjk1
 * @date 2020/10/16
 */
public class RedisTableSinkFactory implements DynamicTableSinkFactory {
    
	
	public static final String IDENTIFIER = "redis";
	
	public static final ConfigOption<String> HOST_PORT = key("hostPort")
			.stringType()
			.noDefaultValue()
			.withDescription("redis host and port,");
	
	public static final ConfigOption<String> PASSWORD = key("password")
			.stringType()
			.noDefaultValue()
			.withDescription("redis password");
	
	public static final ConfigOption<Integer> EXPIRE_TIME = key("expireTime")
			.intType()
			.noDefaultValue()
			.withDescription("redis key expire time");
	
	public static final ConfigOption<String> KEY_TYPE = key("keyType")
			.stringType()
			.noDefaultValue()
			.withDescription("redis key type,such as hash,string and so on ");
	
	public static final ConfigOption<String> KEY_TEMPLATE = key("keyTemplate")
			.stringType()
			.noDefaultValue()
			.withDescription("redis key template ");
	
	public static final ConfigOption<String> FIELD_TEMPLATE = key("fieldTemplate")
			.stringType()
			.noDefaultValue()
			.withDescription("redis field template ");
	
	
	public static final ConfigOption<String> VALUE_NAMES = key("valueNames")
			.stringType()
			.noDefaultValue()
			.withDescription("redis value name ");
	
	@Override
	// 当 connector 与 IDENTIFIER 一直才会找到 RedisTableSinkFactory 通过 
	public String factoryIdentifier() {
    
		return IDENTIFIER;
	}
	
	@Override
	public Set<ConfigOption<?>> requiredOptions() {
    
		return new HashSet<>();
	}
	
	@Override
	//我们自己定义的所有选项 (with 后面的 ) 都会在这里获取
	public Set<ConfigOption<?>> optionalOptions() {
    
		Set<ConfigOption<?>> options = new HashSet<>();
		options.add(HOST_PORT);
		options.add(PASSWORD);
		options.add(EXPIRE_TIME);
		options.add(KEY_TYPE);
		options.add(KEY_TEMPLATE);
		options.add(FIELD_TEMPLATE);
		options.add(VALUE_NAMES);
		return options;
	}
	
	@Override
	public DynamicTableSink createDynamicTableSink(Context context) {
    
		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
		helper.validate();
		ReadableConfig options = helper.getOptions();
		return new RedisSink(
				context.getCatalogTable().getSchema().toPhysicalRowDataType(),
				options);
	}
	
	
	private static class RedisSink implements DynamicTableSink {
    
		
		private final DataType type;
		private final ReadableConfig options;
		
		private RedisSink(DataType type, ReadableConfig options) {
    
			this.type = type;
			this.options = options;
		}
		
		@Override
		//ChangelogMode 
		public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
    
			return requestedMode;
		}
		
		@Override
		//具体运行的地方,真正开始调用用户自己定义的 streaming sink ,建立 sql 与 streaming 的联系
		public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
    
			DataStructureConverter converter = context.createDataStructureConverter(type);
			return SinkFunctionProvider.of(new RowDataPrintFunction(converter, options, type));
		}
		
		@Override
		// sink 可以不用实现,主要用来 source 的谓词下推
		public DynamicTableSink copy() {
    
			return new RedisSink(type, options);
		}
		
		@Override
		public String asSummaryString() {
    
			return "redis";
		}
	}
	
	/**
	 同 flink streaming 自定义 sink ,只不过我们这次处理的是 RowData,不细说
	 */
	private static class RowDataPrintFunction extends RichSinkFunction<RowData> {
    
		
		private static final long serialVersionUID = 1L;
		
		private final DataStructureConverter converter;
		private final ReadableConfig options;
		private final DataType type;
		private RowType logicalType;
		private HashMap<String, Integer> fields;
		private JedisCluster jedisCluster;
		
		private RowDataPrintFunction(
				DataStructureConverter converter, ReadableConfig options, DataType type) {
    
			this.converter = converter;
			this.options = options;
			this.type = type;
		}
		
		@Override
		public void open(Configuration parameters) throws Exception {
    
			super.open(parameters);
			logicalType = (RowType) type.getLogicalType();
			fields = new HashMap<>();
			List<RowType.RowField> rowFields = logicalType.getFields();
			int size = rowFields.size();
			for (int i = 0; i < size; i++) {
    
				fields.put(rowFields.get(i).getName(), i);
			}
			
			jedisCluster = RedisUtil.getJedisCluster(options.get(HOST_PORT));
		}
		
		@Override
		public void close() throws Exception {
    
			RedisUtil.closeConn(jedisCluster);
		}
		
		@Override
		/*
		2> +I(1,30017323,1101)
		2> -U(1,30017323,1101)
		2> +U(2,30017323,1101)
		2> -U(2,30017323,1101)
		2> +U(3,30017323,1101)
		2> -U(3,30017323,1101)
		2> +U(4,30017323,1101)
		3> -U(3,980897,3208)
		3> +U(4,980897,3208)
		 */
		public void invoke(RowData rowData, Context context) {
    
			RowKind rowKind = rowData.getRowKind();
			Row data = (Row) converter.toExternal(rowData);
			if (rowKind.equals(RowKind.UPDATE_AFTER) || rowKind.equals(RowKind.INSERT)) {
    
				
				String keyTemplate = options.get(KEY_TEMPLATE);
				if (Objects.isNull(keyTemplate) || keyTemplate.trim().length() == 0) {
    
					throw new NullPointerException(" keyTemplate is null or keyTemplate is empty");
				}
				
				if (keyTemplate.contains("${")) {
    
					String[] split = keyTemplate.split("\\$\\{");
					keyTemplate = "";
					for (String s : split) {
    
						if (s.contains("}")) {
    
							String filedName = s.substring(0, s.length() - 1);
							int index = fields.get(filedName);
							keyTemplate = keyTemplate + data.getField(index).toString();
						} else {
    
							keyTemplate = keyTemplate + s;
						}
					}
				}
				
				String keyType = options.get(KEY_TYPE);
				String valueNames = options.get(VALUE_NAMES);
				// type=hash must need fieldTemplate
				if ("hash".equalsIgnoreCase(keyType)) {
    
					String fieldTemplate = options.get(FIELD_TEMPLATE);
					if (fieldTemplate.contains("${")) {
    
						String[] split = fieldTemplate.split("\\$\\{");
						fieldTemplate = "";
						for (String s : split) {
    
							if (s.contains("}")) {
    
								String fieldName = s.substring(0, s.length() - 1);
								int index = fields.get(fieldName);
								fieldTemplate = fieldTemplate + data.getField(index).toString();
							} else {
    
								fieldTemplate = fieldTemplate + s;
							}
						}
					}
					
					//fieldName = fieldTemplate-valueName
					if (valueNames.contains(",")) {
    
						HashMap<String, String> map = new HashMap<>();
						String[] fieldNames = valueNames.split(",");
						for (String fieldName : fieldNames) {
    
							String value = data.getField(fields.get(fieldName)).toString();
							map.put(fieldTemplate + "_" + fieldName, value);
						}
						jedisCluster.hset(keyTemplate, map);
					} else {
    
						jedisCluster.hset(keyTemplate, fieldTemplate + "_" + valueNames, data.getField(fields.get(valueNames)).toString());
					}
					
				} else if ("set".equalsIgnoreCase(keyType)) {
    
					jedisCluster.set(keyTemplate, data.getField(fields.get(valueNames)).toString());
					
				} else if ("sadd".equalsIgnoreCase(keyType)) {
    
					jedisCluster.sadd(keyTemplate, data.getField(fields.get(valueNames)).toString());
				} else if ("zadd".equalsIgnoreCase(keyType)) {
    
					jedisCluster.sadd(keyTemplate, data.getField(fields.get(valueNames)).toString());
				} else {
    
					throw new IllegalArgumentException(" not find this keyType:" + keyType);
				}
				
				if (Objects.nonNull(options.get(EXPIRE_TIME))) {
    
					jedisCluster.expire(keyTemplate, options.get(EXPIRE_TIME));
				}
			}
		}
	}
}

4.使用 Redis Sink

因为 Flink 是通过 SPI 来发现服务的,所以需要先配置 service ![在这里插入图片描述](https://img-blog.csdnimg.cn/2020102015530087.png#pic_center)
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.scala.typeutils.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

/**
 * @author shengjk1
 * @date 2020/9/25
 */
public class SqlKafka {
    
	public static void main(String[] args) throws Exception {
    
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		
		EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
		StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, environmentSettings);
		// enable checkpointing
		Configuration configuration = tableEnv.getConfig().getConfiguration();
		configuration.set(
				ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
		configuration.set(
				ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10));
		
		String sql = "CREATE TABLE sourcedata (`id` bigint,`status` int,`city_id` bigint,`courier_id` bigint,info_index int,order_id bigint,tableName String" +
				") WITH (" +
				"'connector' = 'kafka','topic' = 'xxx'," +
				"'properties.bootstrap.servers' = 'xxx','properties.group.id' = 'testGroup'," +
				"'format' = 'json','scan.startup.mode' = 'earliest-offset')";
		tableEnv.executeSql(sql);
		
		//15017284 distinct
		Table bigtable = tableEnv.sqlQuery("select distinct a.id,a.courier_id,a.status,a.city_id,b.info_index from (select id,status,city_id,courier_id from sourcedata where tableName = 'orders' and status=60)a join (select " +
				" order_id,max(info_index)info_index from sourcedata  where tableName = 'infos'  group by order_id )b on a.id=b.order_id");

		sql = "CREATE TABLE redis (info_index BIGINT,courier_id BIGINT,city_id BIGINT" +
				") WITH (" +
				"'connector' = 'redis'," +
				"'hostPort'='xxx'," +
				"'keyType'='hash'," +
				"'keyTemplate'='test2_${city_id}'," +
				"'fieldTemplate'='test2_${courier_id}'," +
				"'valueNames'='info_index,city_id'," +
				"'expireTime'='1000')";
			
		tableEnv.executeSql(sql);
		
		Table resultTable = tableEnv.sqlQuery("select sum(info_index)info_index,courier_id,city_id from " + bigtable + " group by city_id,courier_id");
		TupleTypeInfo<Tuple3<Long, Long, Long>> tupleType = new TupleTypeInfo<>(
				Types.LONG(),
				Types.LONG(),
				Types.LONG());
		tableEnv.toRetractStream(resultTable, tupleType).print("===== ");
		tableEnv.executeSql("INSERT INTO redis SELECT info_index,courier_id,city_id FROM " + resultTable);
        env.execute("");
	}
}

5.详细解释

create table test(
`id` bigint,
 `url` string,
 `day` string,
  `pv` long,
  `uv` long
) with {
    
    'connector'='redis',
    'hostPort'='xxx',
    'password'='',
    'expireTime'='100',
    'keyType'='hash',
    'keyTemplate'='test_${id}',
    'fieldTemplate'='${day}',
    'valueNames'='pv,uv',
}

redis result: 假设 id=1 day=20201016 pv=20,uv=20
    hash
    test_1 20201016-pv 20,20201016-uv 20

参数解释:
connector  固定写法
hostPort   redis 的地址
password   redis 的密码
expireTime  redis key 过期时间,单位为 s
keyType  redis key 的类型,目前有 hash、set、sadd、zadd
keyTemplate  redis key 的表达式,如 test_${
    id} 注意 id 为表的字段名
fieldTemplate  redis keyType==hash 时,此选项为必选,表达式规则同 keyTemplate
valueNames  redis value  only 可以有多个

执行的时候需要先配置
在这里插入图片描述

6.原理

在这里插入图片描述

  1. 整个流程如图,CatalogTable —> DynamicTableSource and DynamicTableSink 这个过程中,其实是通过 DynamicTableSourceFactory and DynamicTableSinkFactory 起到了一个桥梁的作用

  2. (Source/Sink)Factory 通过 connector=‘xxx’ 找到 (实际上是通过 SPI ),理论上会做三种操作
    1. validate options
    2. configure encoding/decoding formats( if required )
    3. create a parameterized instance of the table connector
    其中 formats 是通过 format=‘xxx’ 找到

  3. DynamicTableSource DynamicTableSink
    官网虽说可以看做是有状态的,但是否真的有状态取决于具体实现的 source 和 sink

  4. 生成 Runtime logic,Runtime logic 被 Flink core connector interfaces( 如 InputFormat or SourceFunction),如果是 kafka 的话 则 是 FlinkKafkaConsumer 实现,而这些实现又被抽象为 *Provider,然后开始执行 *Provider

  5. *Provider 是连接 SQL 与 Streaming 代码级别的桥梁

7.参考

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/jsjsjs1789/article/details/109121186

智能推荐

A-SPICE流程_weixin_30248399的博客-程序员ITS301

  汽车行业的开发过程中涉及到一系列的标准和开发流程管控。如IATF16949,CMMI, A-SPICE, ISO26262等。  1. IATF 16949 汽车质量体系  IATF16949五大工具分别是:统计过程控制(SPC);测量系统分析(MSA);失效模式和效果分析(FMEA);产品质量先期策划(APQP);生产件批准程序(PPAP) 具体可以参考:htt...

windows+linux kail_i_need_try的博客-程序员ITS301

相信很多朋友使用的电脑都是单系统的,因为单系统在安装和使用方面都是比较方便,还有就是硬盘的空间比较小且没有什么很大的使用性质,因为同是Windows 系统,单系统和双系统有什么区别呢?但是,因为有了双系统的存在,让 Windows 和 Linux 之间共存与一台电脑,大大地解决了电脑的系统单一性和灵活性。 首先简单介绍一下 Kali Linux :Kali Linux 是基于 Debian 的

#pragma warning (disable:4200)什么意思?(清除VS工程的警告方法)_MISAYAONE的博客-程序员ITS301_pragma warning

如果项目中的烦人警告太多,可用此方法清除。关于#pragma warning  1.#pragma warning只对当前文件有效(对于.h,对包含它的cpp也是有效的),  而不是是对整个工程的所有文件有效。当该文件编译结束,设置也就失去作用。  2.#pragma warning(push) 存储当前报警设置。  #pragma warning(push, n) 存储当前报警设置...

简单实现了下SSDT SHADOW HOOK_H-KING的博客-程序员ITS301

介绍:        SSDT SHADOW HOOK可用于安全软件窗口保护、安全输入、截屏保护等。例如:挂钩NtUserFindWindowEx、NtUserGetForegroundWindow、NtUserBuildHwndList、NtUserQueryWindow、NtUserWindowFromPoint、NtUserSetParent、NtUserPostMessage、NtUs

Scala构建工具sbt的配置_weixin_30734435的博客-程序员ITS301

时间是17年12月24日。初学Scala,想使用它的标配构建工具sbt,结果好大一轮折腾,因为公司隔离外网,需要内部代理,所以尤其折腾。下面的配置参考了好多篇不同的文章,已经没法一一留下出处了。而且还没有全部验证过。如有侵权,请联系我删除,谢谢。1. 下载官方的zip包,http://www.scala-sbt.org/download.html。 解压后,修改用户的环境变量path,增...

cannot set property ‘context’ of null_coderlhk的博客-程序员ITS301_cannot set properties of null (setting 'context')

vue ui出现cannot set property ‘context’ of null项目场景:问题描述:vue ui出现cannot setproperty ‘context’ of null原因分析:1、nodejs版本过低,建议更换10.0以上2、vue-cli使用vue ui 需要3.6版本及其以上3、npm的版本过低解决方案:分别通过node --versionvue --versionnpm --version查看当前版本号可以通过npm install -g

随便推点

mysql 8 从文件导入数据提示secure-file-priv option问题_Andy杨的博客-程序员ITS301_mysql8 secure_file_priv

平常做数据导入或者动态监控数据自动保存到文件后,自动导入MYSQL表的时候,会出现文件导入失败的问题,错误提示“The MySQL server is running with the --secure-file-priv option so it cannot execute this statement”其实原因是因为mysql对通过文件导入导出作了限制,默认是不允许。我们可以通过执行mysql命令查看现有的配置。登录MYSQL console,执行:SHOW VARIABLES...

Caused by: java.net.SocketException: 打开的文件过多的解决办法_dianmiqiang2633的博客-程序员ITS301

weblogic下项目报错,项目报错信息如下Caused by: javax.wsdl.WSDLException: WSDLException: faultCode=PARSER_ERROR: Error reading source with PublicId: http://112.112.12.15:7001/wsDep-web/webservice/DataExchan...

XSS————XSS绕过Bypass的各种各样姿势_FLy_鹏程万里的博客-程序员ITS301_xssbypass

XSS 攻击是一种攻击者将 JavaScript 代码注入到用户运行页面中的攻击。为了避免这种攻击,一些应用会尝试从用户输入中移除 JavaScript 代码,但这很难完全实现。在本文中会先展示一些尝试过滤 JavaScript 的代码,并随后给出其绕过方法。以一个网上商城应用Magento中的过滤类 Mage_Core_Model_Input_Filter_MaliciousCode为例,部分代...

php主流框架优缺点及应用场景_蹲在角落数蚂蚁的博客-程序员ITS301_php框架应用

YiiYii 是一个基于组件的高性能php框架,用于开发大型Web应用。Yii采用严格的OOP编写,并有着完善的库引用以及全面的教程。从 MVC,DAO/ActiveRecord,widgets,caching,等级式RBAC,Web服务,到主题化,I18N和L10N,Yii提供了 今日Web 2.0应用开发所需要的几乎一切功能。事实上,Yii是最有效率的PHP框架之一。优点:纯OOP用于大规模Web应用模型使用方便开发速度快,运行速度也快。性能优异且功能丰富使用命令行工具。

夜光带你走进 Java 成神之路--Spring&SSH(四十八)擅长的领域_GeniusTeam-夜光的博客-程序员ITS301_public void add(login user) { sessionfactory.getcu

夜光序言:放空的心,那是最好的礼物,独走的路,那是最美的风景。正文: 以道御术 / 以术识道我们看一下用ssh来搭建一个登录~~jdbc:mysql://localhost:3306/testdb...

推荐文章

热门文章

相关标签