说明
这里都是我个人的见解
现在还没解决怎么在写入CSV文件时,在第一行加入行头
Table
- 写的String字符串的SQL,其中表要使用你注入table_env时指定的name名称,
SQL中的字段要和所对应的实体类的属性名一致,最好都使用小写
- 写的String字符串的SQL,其中表要使用你注入table_env时指定的name名称,
Join
- 这里指多个表联合,例如
A.join(B).where(C).equalTo(D);
,
这里其中C是A中一列数据,如果是对象可以使用双引号””里面写对象的属性名,如果A中有属性name就写where("name")
,
如果A不是对象,而是Tuple类型数据,并且不是嵌套类型,可以直接使用数字进行指定列,例如where(0)
表示第一列,5表示第六列,
如果是嵌套的Tuple就可以使用new KeySelector
,重写它的方法来直接return类型,
这里的D是B中的一列数据,使用方法同上所述 - 如果要使用到
.map(new MapFunction<IN,OUT>(){})
时,就要指定两个参数,其中第一个是出,第二个是进,
在大括号{}中要重写map方法,而对数据进行操作,也是起到了封装过滤的效果,
(注意:这个map我理解是处理一条数据进行返回放一个地方,再处理一条放那个地方,最后把那个地方值汇总) - 因为如果不使用Table就不能使用
count,avg
等SQL的函数,而要在.map
中进行计算,
而其中groupBy
分组,sortPartition
排序,sum
求和,max
最大值,min
最小值是可以的
- 这里指多个表联合,例如
数据库表及.csv文件内容
导出.sql文件
Z_STUDENT.sql
1 | /* |
Z_GRADE.sql
1 | /* |
Z_LINK.sql
1 | /* |
导入Maven依赖
方式1,推荐
- 这是我把没用的都删除调之后,改进之后的方式,我也是用这种写的
- 这里的flink-table我用1.7.2的原因,是因为我的项目是1.7.2版本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22<dependencies>
<!--对实体类使用注解@Data 直接对属性进行setter,getter,toString方法,省略写过多的代码-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.8</version>
</dependency>
<!--使用批处理操作Table-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<!--内部基于scala,所以这里不能导入java,会出错,没有1.7.2版本,我就使用了1.8.0-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.8.0</version>
</dependency>
</dependencies>
方式2
这时我改进之前的方式,也是我从网上找的,具体什么意思,我也不太理解…
使用这种方式创建BatchTableEnvironment table_env = BatchTableEnvironment.create(env);
而不是BatchTableEnvironment table_env = BatchTableEnvironment.getTableEnvironment(env);
1 | <dependencies> |
实体类
Student
1
2
3
4
5
6
7
8
9import lombok.Data;
public class Student
{
private Long s_id;
private String s_name;
private Long age;
}Grade
1
2
3
4
5
6
7
8import lombok.Data;
public class Grade
{
private Long g_id;
private String g_name;
}Link
1
2
3
4
5
6
7
8
9import lombok.Data;
public class Link
{
private Long sg_id;
private Long g_id;
private Long s_id;
}
FlinkCsv类
Table
示例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96package flink;
import flink.a.Grade;
import flink.a.Link;
import flink.a.Student;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import java.text.SimpleDateFormat;
import java.util.Date;
public class FlinkCsv
{
public static void main(String[] args) throws Exception
{
// 一,设置环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);// 设置并行数为1,不然后面writeAsCsv创建文件不是csv文件
BatchTableEnvironment table_env = BatchTableEnvironment.getTableEnvironment(env);
// 二,读取数据
DataSet<Student> data_student = env
// 1. 文件的读取,本地的绝对路径
.readCsvFile("D:\\Z_STUDENT.csv")
// 2. 指定分隔符,默认逗号","
.fieldDelimiter(",")
// 3. 忽略第一行
.ignoreFirstLine()
// 4. 传入三个参数,表示只取前三列,true表示获取数据,false表示不获取
// 如果有5列数据,第4列不取,可以写`true,true,true,false,true`
// 如果有5列数据,只取2,3两列,可以写`false,true,true`表示只取前三列,并且第一列不取
.includeFields(true, true, true)
// 5. 指定实体类,后面指定实体类的属性,属性名可以不和数据库已经CSV文件头对应,
// 但需要确保实体类中有此属性名,并且顺序和`includeFields`的true顺序对应
.pojoType(Student.class, "s_id", "s_name", "age");
DataSet<Grade> data_grade = env.readCsvFile("D:\\Z_GRADE.csv").fieldDelimiter(",").ignoreFirstLine()
.includeFields(true, true).pojoType(Grade.class, "g_id", "g_name");
DataSet<Link> data_link = env.readCsvFile("D:\\Z_LINK.csv").fieldDelimiter(",").ignoreFirstLine()
.includeFields(true, true, true).pojoType(Link.class, "sg_id", "g_id", "s_id");
// 三,使用`BatchTableEnvironment`把获取的数据转为`Table`
Table table_student = table_env.fromDataSet(data_student);
Table table_grade = table_env.fromDataSet(data_grade);
Table table_link = table_env.fromDataSet(data_link);
// 四,往`BatchTableEnvironment`中注入表,放在内存中,量大会很耗内存
table_env.registerTable("student", table_student);
table_env.registerTable("grade", table_grade);
table_env.registerTable("link", table_link);
// 五,定义SQL语句,最好都用小写,容易出错
// 这里面的表名就是第四步注入表设置的name,这里的字段就是对应实体类的属性
String sql = "select g_name,grade.g_id,count(age),sum(age),avg(age),max(age),min(age) " +
"from student,link,grade " +
"where student.s_id=link.s_id and grade.g_id=link.g_id " +
"group by grade.g_id,g_name " +
"order by grade.g_id asc";
// 六,执行,把执行结果再次转换为Table
Table table_result = table_env.sqlQuery(sql);
// 七,把执行结果的Table再使用`BatchTableEnvironment`转换为`Tuple`类型,这里的Tuple必须制定类型
// 这里使用Tuple7是因为必须是Tuple才可以进行打印,如果不需要打印,可以使用实体类进行对应
DataSet<Tuple7<String, Long, Long, Long, Long, Long, Long>> data_sql_result = table_env.toDataSet(table_result,
TypeInformation.of(new TypeHint<Tuple7<String, Long, Long, Long, Long, Long, Long>>() {}));
// 八,打印
data_sql_result.print();//DataSet的Tuple7结果,DataSet字段的print()打印方法
System.out.println("数量: "+data_sql_result.count());//也就是SQL查询到了多少条记录
//data_sql_result.collect();//转换为List集合
// 九,把结果写入CSV文件
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss SSS");
data_sql_result
// 1,指定文件名和存放位置,文件夹不存会自动创建
.writeAsCsv("D:\\Flink_CSV\\" + sdf.format(new Date()) + ".csv",
// 2,指定行的分隔符
"\n",
// 3,指定列的分隔符
",",
// 4,是否覆盖,不覆盖使用`NO_OVERWRITE`
FileSystem.WriteMode.OVERWRITE);
// 十,执行一个Job,里面的字符串可以随笔写,也可以不指定字符串使用`env.execute();`
// 如果要`writeAsCsv`就需要写`execute`,不然会报错
env.execute("Hello!@ Fuck...");
}
}执行结果
Join
因为Java代码实现起来很麻烦
所以我写成了两种
可以先看我的步骤分解的图示,看一下实现思路是怎样的
方式1:步骤分解,每一步都打印
1 | package flink; |
步骤分解,每一步的结果,汇总一起,图示
方式2:步骤合并,打印结果,步骤看不懂,可以看步骤分解的解释
1 | package flink; |