登 录
注 册
< 大 数 据
Flink
Hadoop
Spark
Hive
HBase
Kafka
其他框架
Flink本地程序开发
Flink实战-数据准备
Flink实战-自定义Sink
Flink实战-实时计算
Flink实战-部署上线
文件映射为Table
FlinkTable自定义函数
Flink API JOIN总结
Flink SQL JOIN总结
3分钟搭建Flink SQL测试环境
热门推荐>>>
中台架构
中台建设与架构
Hadoop
源码分析-NN启动(三)
HBase
HBased对接Hive
Linux
Nginx高可用
Python
数据导出工具
Kafka
Kafka对接Flume
深度学习
卷积神经网络
数据结构与算法
选择合适的算法
MySQL
数据备份恢复
计算机系统
信号量同步线程
Hive
Hive调优参数大全
其他框架
Azkaban Flow1.0与2.0
ClickHouse
表引擎-其他类型
技术成长
最好的职业建议
精选书单
技术成长书单—机器学习
技术资讯
数据在线:计算将成为公共服务
开发工具
IntelliJ IDEA 20年发展回顾(二)
系统工具
Mac命令行工具
虚拟化
内存虚拟化概述
云原生
云原生构建现代化应用
云服务
一文搞懂公有云、私有云...
Java
Spring Boot依赖注入与Runners
Go
Go函数与方法
SQL
SQL模板
安全常识
一文读懂SSO
当前位置:
首页
>>
Flink
>>
文件映射为Table
文件映射为Table
2020-10-25 21:39:03 星期日 阅读:3894
以下简单的demo展示如何将本地静态文件映射为Flink Table,并实现使用SQL查询。 Flink版本:1.10.0 使用语言:Java #####本地文件的内容预览 ``` 100,北京,99.2 200,天津,80.32 300,上海,89.91 400,广州,79.78 500,深圳,89.23 600,杭州,78.09 ``` #####配置maven依赖 (注:完整的依赖请看官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/) pom文件增加如下内容: ``` <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> ``` ##### 编写Java程序 ``` package cn.libins; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.FileSystem; import org.apache.flink.table.descriptors.OldCsv; import org.apache.flink.table.descriptors.Schema; public class FlinkTableTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String filepath = "/Users/alibins/Desktop/flink-table-api.csv"; // 将本地文件连接到Flink内部,并指定相关schema和文件格式 tableEnv.connect(new FileSystem().path(filepath)).withFormat(new OldCsv()) .withSchema(new Schema() .field("id", DataTypes.BIGINT()) .field("name", DataTypes.STRING()) .field("score", DataTypes.DOUBLE())) .createTemporaryTable("fileToTable"); // 注册为临时表,并指定表名称 // 将临时表转换为Flink Table Table fileToTable = tableEnv.from("fileToTable"); // 如果需要控制台输出,则需要将table转为流,并指定表数据的泛型 DataStream ds1 = tableEnv.toAppendStream(fileToTable, TypeInformation.of(new TypeHint<Tuple3<Long, String, Double>>() { })); ds1.print(); // 将表的结果打印到控制台 // 使用SQL查询刚刚创建的临时表(返回结果是Table类型) Table selectResult = tableEnv.sqlQuery("" + "SELECT id, score " + "FROM fileToTable " + "WHERE id >= 300"); DataStream ds2 = tableEnv.toAppendStream(selectResult, TypeInformation.of(new TypeHint<Tuple2<Long, Double>>() { })); ds2.print(); // 打印FlinkSQL的查询结果 env.execute("Flink table test job"); } } ``` 输出结果如下: ![](/static/images/article_images/1603633105.4561827.png)