登 录
注 册
< 大 数 据
Flink
Hadoop
Spark
Hive
HBase
Kafka
其他框架
Flink本地程序开发
Flink实战-数据准备
Flink实战-自定义Sink
Flink实战-实时计算
Flink实战-部署上线
文件映射为Table
FlinkTable自定义函数
Flink API JOIN总结
Flink SQL JOIN总结
3分钟搭建Flink SQL测试环境
热门推荐>>>
中台架构
中台建设与架构
Hadoop
源码分析-NN启动(三)
HBase
HBased对接Hive
Linux
Nginx高可用
Python
数据导出工具
Kafka
Kafka对接Flume
深度学习
卷积神经网络
数据结构与算法
选择合适的算法
MySQL
数据备份恢复
计算机系统
信号量同步线程
Hive
Hive调优参数大全
其他框架
Azkaban Flow1.0与2.0
ClickHouse
表引擎-其他类型
技术成长
最好的职业建议
精选书单
技术成长书单—机器学习
技术资讯
数据在线:计算将成为公共服务
开发工具
IntelliJ IDEA 20年发展回顾(二)
系统工具
Mac命令行工具
虚拟化
内存虚拟化概述
云原生
云原生构建现代化应用
云服务
一文搞懂公有云、私有云...
Java
Spring Boot依赖注入与Runners
Go
Go函数与方法
SQL
SQL模板
安全常识
一文读懂SSO
当前位置:
首页
>>
Flink
>>
Flink实战-自定义Sink
Flink实战-自定义Sink
2020-10-08 16:47:43 星期四 阅读:1867
创建Java项目并添加依赖 创建项目的教程见之前文章:[Flink本地程序开发](https://libins.cn/detail/moduleName=flink&articleId=108787089760b00872fa91fe69ea21b7 "Flink本地程序开发"),这里还需要增加如下maven依赖: ``` <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.0.0</version> </dependency> <!-- flink kafka连接器 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency> <!-- flink客户端的依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency> <!-- 用来解析json --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.48</version> </dependency> <!-- 连接MySQL --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.11</version> </dependency> ``` MySQL Sink类需要自定义,因为Flink官方不提供 ``` package cn.libins; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; //继承Flink的Rich接口,不用每插入一条打开一个MySQL连接 public class MysqlSink extends RichSinkFunction<Tuple3<String, Integer, String>> { private Connection connection = null; private PreparedStatement preparedStatement = null; private String userName = null; private String password = null; private String driverName = null; private String DBUrl = null; public MysqlSink(String userName, String passowrd, String DBUrl) { this.userName = userName; this.password = passowrd; this.driverName = "com.mysql.jdbc.Driver"; this.DBUrl = DBUrl; } public void invoke(Tuple3<String, Integer, String> value) throws Exception { if (connection == null) { Class.forName(driverName); connection = DriverManager.getConnection(DBUrl, userName, password); } // 插入结果表的SQL String sql = "insert into word_count_realtime(word,count, update_time) values(?,?,?)"; // String sql = "insert into word_count_realtime(word,count) values(?,?)"; preparedStatement = connection.prepareStatement(sql); preparedStatement.setString(1, value.f0); preparedStatement.setInt(2, value.f1); preparedStatement.setString(3, value.f2); preparedStatement.executeUpdate();//返回成功的话就是一个,否则就是0 } @Override public void open(Configuration parameters) throws Exception { Class.forName(driverName); connection = DriverManager.getConnection(DBUrl, userName, password); } @Override public void close() throws Exception { if (preparedStatement != null) { preparedStatement.close(); } if (connection != null) { connection.close(); } } } ``` ##### 下一步 [实时计算](https://libins.cn/detail/moduleName=flink&articleId=94d0b666ca8938902fc0e15428abe199 "实时计算")