基于Java使用Flink读取CSV文件,针对批处理,多表联合两种方式Table类和Join方法的实现数据处理,再入CSV文件

说明

这里都是我个人的见解
现在还没解决怎么在写入CSV文件时,在第一行加入行头

  • Table
    1. 写的String字符串的SQL,其中表要使用你注入table_env时指定的name名称,
      SQL中的字段要和所对应的实体类的属性名一致,最好都使用小写
  • Join
    1. 这里指多个表联合,例如A.join(B).where(C).equalTo(D);,
      这里其中C是A中一列数据,如果是对象可以使用双引号””里面写对象的属性名,如果A中有属性name就写where("name"),
      如果A不是对象,而是Tuple类型数据,并且不是嵌套类型,可以直接使用数字进行指定列,例如where(0)表示第一列,5表示第六列,
      如果是嵌套的Tuple就可以使用new KeySelector,重写它的方法来直接return类型,
      这里的D是B中的一列数据,使用方法同上所述
    2. 如果要使用到.map(new MapFunction<IN,OUT>(){})时,就要指定两个参数,其中第一个是出,第二个是进,
      在大括号{}中要重写map方法,而对数据进行操作,也是起到了封装过滤的效果,
      (注意:这个map我理解是处理一条数据进行返回放一个地方,再处理一条放那个地方,最后把那个地方值汇总)
    3. 因为如果不使用Table就不能使用count,avg等SQL的函数,而要在.map中进行计算,
      而其中groupBy分组,sortPartition排序,sum求和,max最大值,min最小值是可以的

数据库表及.csv文件内容

在这里插入图片描述
在这里插入图片描述

导出.sql文件

Z_STUDENT.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/*
Navicat Oracle Data Transfer
Oracle Client Version : 19.3.0.0.0

Target Server Type : ORACLE
Target Server Version : 120200
File Encoding : 65001

Date: 2019-11-28 09:45:33
*/


-- ----------------------------
-- Table structure for Z_STUDENT
-- ----------------------------
DROP TABLE "MD_REF"."Z_STUDENT";
CREATE TABLE "MD_REF"."Z_STUDENT" (
"S_ID" NUMBER(10) NOT NULL ,
"S_NAME" VARCHAR2(255 BYTE) NULL ,
"AGE" VARCHAR2(255 BYTE) NULL
)
LOGGING
NOCOMPRESS
NOCACHE

;

-- ----------------------------
-- Records of Z_STUDENT
-- ----------------------------
INSERT INTO "MD_REF"."Z_STUDENT" VALUES ('1', '大牛', '11');
INSERT INTO "MD_REF"."Z_STUDENT" VALUES ('2', '二蛋', '22');
INSERT INTO "MD_REF"."Z_STUDENT" VALUES ('3', '三驴', '33');
INSERT INTO "MD_REF"."Z_STUDENT" VALUES ('4', '四毛', '44');
INSERT INTO "MD_REF"."Z_STUDENT" VALUES ('5', '五虎', '55');
INSERT INTO "MD_REF"."Z_STUDENT" VALUES ('6', '六豹', '66');

-- ----------------------------
-- Indexes structure for table Z_STUDENT
-- ----------------------------

-- ----------------------------
-- Checks structure for table Z_STUDENT
-- ----------------------------
ALTER TABLE "MD_REF"."Z_STUDENT" ADD CHECK ("S_ID" IS NOT NULL);

-- ----------------------------
-- Primary Key structure for table Z_STUDENT
-- ----------------------------
ALTER TABLE "MD_REF"."Z_STUDENT" ADD PRIMARY KEY ("S_ID");
Z_GRADE.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/*
Navicat Oracle Data Transfer
Oracle Client Version : 19.3.0.0.0

Target Server Type : ORACLE
Target Server Version : 120200
File Encoding : 65001

Date: 2019-11-28 09:44:37
*/


-- ----------------------------
-- Table structure for Z_GRADE
-- ----------------------------
DROP TABLE "MD_REF"."Z_GRADE";
CREATE TABLE "MD_REF"."Z_GRADE" (
"G_ID" NUMBER(10) NOT NULL ,
"G_NAME" VARCHAR2(255 BYTE) NULL
)
LOGGING
NOCOMPRESS
NOCACHE

;

-- ----------------------------
-- Records of Z_GRADE
-- ----------------------------
INSERT INTO "MD_REF"."Z_GRADE" VALUES ('1', '一年级');
INSERT INTO "MD_REF"."Z_GRADE" VALUES ('2', '二年级');
INSERT INTO "MD_REF"."Z_GRADE" VALUES ('3', '三年级');

-- ----------------------------
-- Indexes structure for table Z_GRADE
-- ----------------------------

-- ----------------------------
-- Checks structure for table Z_GRADE
-- ----------------------------
ALTER TABLE "MD_REF"."Z_GRADE" ADD CHECK ("G_ID" IS NOT NULL);

-- ----------------------------
-- Primary Key structure for table Z_GRADE
-- ----------------------------
ALTER TABLE "MD_REF"."Z_GRADE" ADD PRIMARY KEY ("G_ID");
Z_LINK.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/*
Navicat Oracle Data Transfer
Oracle Client Version : 19.3.0.0.0

Target Server Type : ORACLE
Target Server Version : 120200
File Encoding : 65001

Date: 2019-11-28 09:45:25
*/


-- ----------------------------
-- Table structure for Z_LINK
-- ----------------------------
DROP TABLE "MD_REF"."Z_LINK";
CREATE TABLE "MD_REF"."Z_LINK" (
"SG_ID" NUMBER(10) NOT NULL ,
"G_ID" NUMBER(10) NOT NULL ,
"S_ID" NUMBER(10) NOT NULL
)
LOGGING
NOCOMPRESS
NOCACHE

;

-- ----------------------------
-- Records of Z_LINK
-- ----------------------------
INSERT INTO "MD_REF"."Z_LINK" VALUES ('1', '1', '2');
INSERT INTO "MD_REF"."Z_LINK" VALUES ('2', '3', '5');
INSERT INTO "MD_REF"."Z_LINK" VALUES ('3', '2', '1');
INSERT INTO "MD_REF"."Z_LINK" VALUES ('4', '1', '6');
INSERT INTO "MD_REF"."Z_LINK" VALUES ('5', '2', '3');
INSERT INTO "MD_REF"."Z_LINK" VALUES ('6', '2', '4');

-- ----------------------------
-- Indexes structure for table Z_LINK
-- ----------------------------

-- ----------------------------
-- Checks structure for table Z_LINK
-- ----------------------------
ALTER TABLE "MD_REF"."Z_LINK" ADD CHECK ("SG_ID" IS NOT NULL);
ALTER TABLE "MD_REF"."Z_LINK" ADD CHECK ("G_ID" IS NOT NULL);
ALTER TABLE "MD_REF"."Z_LINK" ADD CHECK ("S_ID" IS NOT NULL);

-- ----------------------------
-- Primary Key structure for table Z_LINK
-- ----------------------------
ALTER TABLE "MD_REF"."Z_LINK" ADD PRIMARY KEY ("SG_ID");

导入Maven依赖

方式1,推荐

  • 这是我把没用的都删除调之后,改进之后的方式,我也是用这种写的
  • 这里的flink-table我用1.7.2的原因,是因为我的项目是1.7.2版本
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    <dependencies>
    <!--对实体类使用注解@Data 直接对属性进行setter,getter,toString方法,省略写过多的代码-->
    <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.8</version>
    </dependency>

    <!--使用批处理操作Table-->
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table_2.11</artifactId>
    <version>1.7.2</version>
    </dependency>

    <!--内部基于scala,所以这里不能导入java,会出错,没有1.7.2版本,我就使用了1.8.0-->
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.11</artifactId>
    <version>1.8.0</version>
    </dependency>
    </dependencies>

方式2

这时我改进之前的方式,也是我从网上找的,具体什么意思,我也不太理解…
使用这种方式创建BatchTableEnvironment table_env = BatchTableEnvironment.create(env);
而不是BatchTableEnvironment table_env = BatchTableEnvironment.getTableEnvironment(env);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.8</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.8.0</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.8.0</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.8.0</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.8.0</version>
</dependency>
</dependencies>

实体类

  1. Student
    1
    2
    3
    4
    5
    6
    7
    8
    9
    import lombok.Data;

    @Data
    public class Student
    {
    private Long s_id;
    private String s_name;
    private Long age;
    }
  2. Grade
    1
    2
    3
    4
    5
    6
    7
    8
    import lombok.Data;

    @Data
    public class Grade
    {
    private Long g_id;
    private String g_name;
    }
  3. Link
    1
    2
    3
    4
    5
    6
    7
    8
    9
    import lombok.Data;

    @Data
    public class Link
    {
    private Long sg_id;
    private Long g_id;
    private Long s_id;
    }

FlinkCsv类

Table

  • 示例代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    package flink;

    import flink.a.Grade;
    import flink.a.Link;
    import flink.a.Student;
    import org.apache.flink.api.common.typeinfo.TypeHint;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.tuple.Tuple7;
    import org.apache.flink.core.fs.FileSystem;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.java.BatchTableEnvironment;

    import java.text.SimpleDateFormat;
    import java.util.Date;

    public class FlinkCsv
    {
    public static void main(String[] args) throws Exception
    {
    // 一,设置环境
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);// 设置并行数为1,不然后面writeAsCsv创建文件不是csv文件
    BatchTableEnvironment table_env = BatchTableEnvironment.getTableEnvironment(env);

    // 二,读取数据
    DataSet<Student> data_student = env
    // 1. 文件的读取,本地的绝对路径
    .readCsvFile("D:\\Z_STUDENT.csv")
    // 2. 指定分隔符,默认逗号","
    .fieldDelimiter(",")
    // 3. 忽略第一行
    .ignoreFirstLine()
    // 4. 传入三个参数,表示只取前三列,true表示获取数据,false表示不获取
    // 如果有5列数据,第4列不取,可以写`true,true,true,false,true`
    // 如果有5列数据,只取2,3两列,可以写`false,true,true`表示只取前三列,并且第一列不取
    .includeFields(true, true, true)
    // 5. 指定实体类,后面指定实体类的属性,属性名可以不和数据库已经CSV文件头对应,
    // 但需要确保实体类中有此属性名,并且顺序和`includeFields`的true顺序对应
    .pojoType(Student.class, "s_id", "s_name", "age");

    DataSet<Grade> data_grade = env.readCsvFile("D:\\Z_GRADE.csv").fieldDelimiter(",").ignoreFirstLine()
    .includeFields(true, true).pojoType(Grade.class, "g_id", "g_name");

    DataSet<Link> data_link = env.readCsvFile("D:\\Z_LINK.csv").fieldDelimiter(",").ignoreFirstLine()
    .includeFields(true, true, true).pojoType(Link.class, "sg_id", "g_id", "s_id");

    // 三,使用`BatchTableEnvironment`把获取的数据转为`Table`
    Table table_student = table_env.fromDataSet(data_student);
    Table table_grade = table_env.fromDataSet(data_grade);
    Table table_link = table_env.fromDataSet(data_link);

    // 四,往`BatchTableEnvironment`中注入表,放在内存中,量大会很耗内存
    table_env.registerTable("student", table_student);
    table_env.registerTable("grade", table_grade);
    table_env.registerTable("link", table_link);

    // 五,定义SQL语句,最好都用小写,容易出错
    // 这里面的表名就是第四步注入表设置的name,这里的字段就是对应实体类的属性
    String sql = "select g_name,grade.g_id,count(age),sum(age),avg(age),max(age),min(age) " +
    "from student,link,grade " +
    "where student.s_id=link.s_id and grade.g_id=link.g_id " +
    "group by grade.g_id,g_name " +
    "order by grade.g_id asc";

    // 六,执行,把执行结果再次转换为Table
    Table table_result = table_env.sqlQuery(sql);

    // 七,把执行结果的Table再使用`BatchTableEnvironment`转换为`Tuple`类型,这里的Tuple必须制定类型
    // 这里使用Tuple7是因为必须是Tuple才可以进行打印,如果不需要打印,可以使用实体类进行对应
    DataSet<Tuple7<String, Long, Long, Long, Long, Long, Long>> data_sql_result = table_env.toDataSet(table_result,
    TypeInformation.of(new TypeHint<Tuple7<String, Long, Long, Long, Long, Long, Long>>() {}));

    // 八,打印
    data_sql_result.print();//DataSet的Tuple7结果,DataSet字段的print()打印方法
    System.out.println("数量: "+data_sql_result.count());//也就是SQL查询到了多少条记录
    //data_sql_result.collect();//转换为List集合

    // 九,把结果写入CSV文件
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss SSS");
    data_sql_result
    // 1,指定文件名和存放位置,文件夹不存会自动创建
    .writeAsCsv("D:\\Flink_CSV\\" + sdf.format(new Date()) + ".csv",
    // 2,指定行的分隔符
    "\n",
    // 3,指定列的分隔符
    ",",
    // 4,是否覆盖,不覆盖使用`NO_OVERWRITE`
    FileSystem.WriteMode.OVERWRITE);

    // 十,执行一个Job,里面的字符串可以随笔写,也可以不指定字符串使用`env.execute();`
    // 如果要`writeAsCsv`就需要写`execute`,不然会报错
    env.execute("Hello!@ Fuck...");
    }
    }
  • 执行结果
    在这里插入图片描述

Join

因为Java代码实现起来很麻烦
所以我写成了两种
可以先看我的步骤分解的图示,看一下实现思路是怎样的

方式1:步骤分解,每一步都打印
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
package flink;

import flink.a.Grade;
import flink.a.Link;
import flink.a.Student;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.*;
import org.apache.flink.core.fs.FileSystem;

import java.math.BigDecimal;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

public class FlinkCsv
{
public static void main(String[] args) throws Exception
{
// 一,设置环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);// 设置并行数为1,不然后面writeAsCsv创建文件不是csv文件

// 二,读取数据
DataSet<Student> data_student = env
// 1. 文件的读取,本地的绝对路径
.readCsvFile("D:\\Z_STUDENT.csv")
// 2. 指定分隔符,默认逗号","
.fieldDelimiter(",")
// 3. 忽略第一行
.ignoreFirstLine()
// 4. 传入三个参数,表示只取前三列,true表示获取数据,false表示不获取
// 如果有5列数据,第4列不取,可以写`true,true,true,false,true`
// 如果有5列数据,只取2,3两列,可以写`false,true,true`表示只取前三列,并且第一列不取
.includeFields(true, true, true)
// 5. 指定实体类,后面指定实体类的属性,属性名可以不和数据库已经CSV文件头对应,
// 但需要确保实体类中有此属性名,并且顺序和`includeFields`的true顺序对应
.pojoType(Student.class, "s_id", "s_name", "age");

DataSet<Grade> data_grade = env.readCsvFile("D:\\Z_GRADE.csv").fieldDelimiter(",").ignoreFirstLine()
.includeFields(true, true).pojoType(Grade.class, "g_id", "g_name");

DataSet<Link> data_link = env.readCsvFile("D:\\Z_LINK.csv").fieldDelimiter(",").ignoreFirstLine()
.includeFields(true, true, true).pojoType(Link.class, "sg_id", "g_id", "s_id");

// 三,使`data_student`与`data_link`结合
JoinOperator.DefaultJoin<Student, Link> student_link_join = data_student.join(data_link)
// 1data_student中的字段
.where("s_id")
// 2 data_link中的字段
.equalTo("s_id");
//student_link_join.print();//`student`与`link`联合结果,打印如下

//(Student(s_id=1, s_name=大牛, age=11),Link(sg_id=3, g_id=2, s_id=1))
//(Student(s_id=2, s_name=二蛋, age=22),Link(sg_id=1, g_id=1, s_id=2))
//(Student(s_id=3, s_name=三驴, age=33),Link(sg_id=5, g_id=2, s_id=3))
//(Student(s_id=4, s_name=四毛, age=44),Link(sg_id=6, g_id=2, s_id=4))
//(Student(s_id=5, s_name=五虎, age=55),Link(sg_id=2, g_id=3, s_id=5))
//(Student(s_id=6, s_name=六豹, age=66),Link(sg_id=4, g_id=1, s_id=6))

// 四,把第三步的结果再与`data_grade`结合
JoinOperator.DefaultJoin<Tuple2<Student, Link>, Grade> all_join = student_link_join.join(data_grade)
// 1. 这里的where不能再使用直接指定字段了,需要使用`new KeySelector`来指定
// 注意它有两个参数,第一个为`student_link_join`的类型,第二个为返回的字段类型
// 这里指定的字段类型为Long类型,与我们所定义的`Link`类中的`g_id`属性一致
.where(new KeySelector<Tuple2<Student, Link>, Long>()
{

@Override
public Long getKey(Tuple2<Student, Link> t) throws Exception
{
// 2. 返回`t.f1`也就是返回`Tuple2`中的第二个参数,也就是`Link`类
// `.getG_id()`也就是返回`Link`类中的`g_id`属性
return t.f1.getG_id();
}
// 3. 直接返回`data_grade`也就是`Grade`类中的`g_id`属性即可
}).equalTo("g_id");

//all_join.print();//三表联合结果,打印如下

//((Student(s_id=2, s_name=二蛋, age=22),Link(sg_id=1, g_id=1, s_id=2)),Grade(g_id=1, g_name=一年级))
//((Student(s_id=6, s_name=六豹, age=66),Link(sg_id=4, g_id=1, s_id=6)),Grade(g_id=1, g_name=一年级))
//((Student(s_id=1, s_name=大牛, age=11),Link(sg_id=3, g_id=2, s_id=1)),Grade(g_id=2, g_name=二年级))
//((Student(s_id=3, s_name=三驴, age=33),Link(sg_id=5, g_id=2, s_id=3)),Grade(g_id=2, g_name=二年级))
//((Student(s_id=4, s_name=四毛, age=44),Link(sg_id=6, g_id=2, s_id=4)),Grade(g_id=2, g_name=二年级))
//((Student(s_id=5, s_name=五虎, age=55),Link(sg_id=2, g_id=3, s_id=5)),Grade(g_id=3, g_name=三年级))

// 五,把第四步的结果,也就是三个表联合的结果进一步`.map`处理,只取里面的`g_name, age, g_id`三列数据
MapOperator<Tuple2<Tuple2<Student, Link>, Grade>, Tuple3<String, Long, Long>> map_init = all_join
// 1. 这里的`new MapFunction`有两个参数
// 第一个为入参,也就是`all_join`三表联合的类型
// 第二个为出参,也就是返回的类型
.map(new MapFunction<Tuple2<Tuple2<Student, Link>, Grade>, Tuple3<String, Long, Long>>()
{
@Override
public Tuple3<String, Long, Long> map(Tuple2<Tuple2<Student, Link>, Grade> t) throws Exception
{
String g_name = t.f1.getG_name();
// 2. `t.f0`表示`Tuple2<Student, Link>`
// `t.f0.f0`表示`Student`对象
// `.getG_id()`表示获取属性`g_id`的数据
long age = t.f0.f0.getAge();
long g_id = t.f1.getG_id();
return new Tuple3<>(g_name, age, g_id);
}
});

//map_init.print();//三表联合,取其中三列结果,打印如下

//(一年级,22,1)
//(一年级,66,1)
//(二年级,11,2)
//(二年级,33,2)
//(二年级,44,2)
//(三年级,55,3)

// 六,把第五步的数据处理,把第二列换成年级的出现次数,去除第三列
MapOperator<Tuple3<String, Long, Long>, Tuple2<String, Long>> map_count = map_init
.map(new MapFunction<Tuple3<String, Long, Long>, Tuple2<String, Long>>()
{
// 1. 在方法外,定义一个Map
Map<String, Long> map = new HashMap<>();

@Override
public Tuple2<String, Long> map(Tuple3<String, Long, Long> t) throws Exception
{
// 2. 如果map中有key为年级的值,则那个值+1
if (map.containsKey(t.f0))
{
Long x = map.get(t.f0) + 1;
map.put(t.f0, x);
} else
{
// 3. 如果没有,则添加,并设置值为1
map.put(t.f0, 1L);
}
// 4. 把第二列换成出现的次数,去除第三列,返回
return new Tuple2<>(t.f0, map.get(t.f0));
}
});

//map_count.print();

//(一年级,1)
//(一年级,2)
//(二年级,1)
//(二年级,2)
//(二年级,3)
//(三年级,1)

// 七,把第六步的数据进行第一列(年级名称)分组,取最大的出现的次数
AggregateOperator<Tuple2<String, Long>> result_count = map_count.groupBy(0).max(1);

//result_count.print();//

//(一年级,2)
//(三年级,1)
//(二年级,3)

// 八,把三表联合处理后的数据(map_init)进行按照第1,3(年级名称,年级id)列分组,计算第2列(年龄)总和
AggregateOperator<Tuple3<String, Long, Long>> result_sum = map_init.groupBy(0, 2).sum(1);

//result_sum.print();

//(一年级,88,1)
//(三年级,55,3)
//(二年级,88,2)

// 九,把`count`和`sum`联合,方便下面计算`avg`
JoinOperator.DefaultJoin<Tuple2<String, Long>, Tuple3<String, Long, Long>> count_sum_join =
// 这里`result_count`与`result_sum`都是Tuple类型,所以0表示第一列
// 也就是年级名称进行关联
result_count.join(result_sum).where(0).equalTo(0);

//count_sum_join.print();

//((一年级,2),(一年级,88,1))
//((三年级,1),(三年级,55,3))
//((二年级,3),(二年级,88,2))

// 十,计算`avg`,并把`g_name, count, sum, avg`合成一个`Tuple4<String, Long, Long, String>`
MapOperator<Tuple2<Tuple2<String, Long>, Tuple3<String, Long, Long>>, Tuple4<String, Long, Long, String>> map_name_count_sum_avg = count_sum_join
.map(new MapFunction<Tuple2<Tuple2<String, Long>, Tuple3<String, Long, Long>>, Tuple4<String, Long, Long, String>>()
{
@Override
public Tuple4<String, Long, Long, String> map
(Tuple2<Tuple2<String, Long>, Tuple3<String, Long, Long>> t) throws Exception
{
String g_name = t.f0.f0;
Long count = t.f0.f1;
Long sum = t.f1.f1;

// 1. 计算`sum/count`保留两位小数,四舍五入,转为字符串
String avg = new BigDecimal(Float.valueOf(sum) / count).setScale(2, BigDecimal.ROUND_HALF_UP).toString();
// 2. 字符串按照符号点"."进行分割,因为它是特殊符号,所以使用"\\."
String[] split = avg.split("\\.");
// 3. 如果分割的第二部分,也就是小数部分,可以匹配1到9任何一个数字出现至少1次或多次
// 对这个结果取反,也就是小数点后面都是0的话,平均值就等于整数部分(小数点以前的部分)
if (!split[1].matches("[1-9]+"))
avg = split[0];
// 4. 返回
return new Tuple4<>(g_name, count, sum, avg);
}
});

//map_name_count_sum_avg.print();

//(一年级,2,88,44)
//(三年级,1,55,55)
//(二年级,3,88,29.33)

// 十一,计算最大值,根据第1,3列分组
AggregateOperator<Tuple3<String, Long, Long>> result_max = map_init.groupBy(0, 2).max(1);
//max.print();

//(一年级,66,1)
//(三年级,55,3)
//(二年级,44,2)

// 十二,计算最小值,根据第1,3列分组
AggregateOperator<Tuple3<String, Long, Long>> result_min = map_init.groupBy(0, 2).min(1);
//min.print();

//(一年级,22,1)
//(三年级,55,3)
//(二年级,11,2)

// 十三,把所有的数据全部联合在一起
JoinOperator.DefaultJoin<Tuple2<Tuple4<String, Long, Long, String>, Tuple3<String, Long, Long>>,
Tuple3<String, Long, Long>> all_result_join = map_name_count_sum_avg
.join(result_max).where(0).equalTo(0)
.join(result_min).where(new KeySelector<Tuple2<Tuple4<String, Long, Long, String>, Tuple3<String, Long, Long>>, String>()
{
@Override
public String getKey(Tuple2<Tuple4<String, Long, Long, String>,
Tuple3<String, Long, Long>> t) throws Exception
{
return t.f0.f0;
}
}).equalTo(0);
//all_result_join.print();

//(((一年级,2,88,44),(一年级,66,1)),(一年级,22,1))
//(((三年级,1,55,55),(三年级,55,3)),(三年级,55,3))
//(((二年级,3,88,29.33),(二年级,44,2)),(二年级,11,2))

// 十四,把`所有的数据全部联合结果`提取成7列结果
MapOperator<Tuple2<Tuple2<Tuple4<String, Long, Long, String>, Tuple3<String, Long, Long>>, Tuple3<String, Long, Long>>,
Tuple7<String, Long, Long, String, Long, Long, Long>> map_result_all = all_result_join
.map(new MapFunction<Tuple2<Tuple2<Tuple4<String, Long, Long, String>, Tuple3<String, Long, Long>>, Tuple3<String, Long, Long>>,
Tuple7<String, Long, Long, String, Long, Long, Long>>()
{

@Override
public Tuple7<String, Long, Long, String, Long, Long, Long>
map(Tuple2<Tuple2<Tuple4<String, Long, Long, String>, Tuple3<String, Long, Long>>, Tuple3<String, Long, Long>> t) throws Exception
{
String g_name = t.f1.f0;
Long count = t.f0.f0.f1;
Long sum = t.f0.f0.f2;
String avg = t.f0.f0.f3;
Long max = t.f0.f1.f1;
Long min = t.f1.f1;
Long g_id = t.f1.f2;
return new Tuple7<>(g_name, count, sum, avg, max, min, g_id);
}
});

//map_result_all.print();

//(一年级,2,88,44,66,22,1)
//(三年级,1,55,55,55,55,3)
//(二年级,3,88,29.33,44,11,2)

// 十五,把7列提取成6列,因为最后一列没用,只为排序使用,使用之后,进行去除列,替换为`Tuple6`
MapOperator<Tuple7<String, Long, Long, String, Long, Long, Long>, Tuple6<String, Long, Long, String, Long, Long>> map = map_result_all
.sortPartition(6, Order.ASCENDING)
.map(new MapFunction<Tuple7<String, Long, Long, String, Long, Long, Long>,
Tuple6<String, Long, Long, String, Long, Long>>()
{
@Override
public Tuple6<String, Long, Long, String, Long, Long> map(Tuple7<String, Long, Long, String, Long, Long, Long> t) throws Exception
{
return new Tuple6<>(t.f0, t.f1, t.f2, t.f3, t.f4, t.f5);
}
});
//map.print();

//(一年级,2,88,44,66,22)
//(二年级,3,88,29.33,44,11)
//(三年级,1,55,55,55,55)


// 十六,把结果写入CSV文件
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss SSS");
map
// 1,指定文件名和存放位置,文件夹不存会自动创建
.writeAsCsv("D:\\Flink_CSV\\" + sdf.format(new Date()) + ".csv",
// 2,指定行的分隔符
"\n",
// 3,指定列的分隔符
",",
// 4,是否覆盖,不覆盖使用`NO_OVERWRITE`
FileSystem.WriteMode.OVERWRITE);

// 十七,执行一个Job,里面的字符串可以随笔写,也可以不指定字符串使用`env.execute();`
// 如果要`writeAsCsv`就需要写`execute`,不然会报错
env.execute("Hello!@ Fuck...");
}
}
步骤分解,每一步的结果,汇总一起,图示

在这里插入图片描述

方式2:步骤合并,打印结果,步骤看不懂,可以看步骤分解的解释
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package flink;

import flink.a.Grade;
import flink.a.Link;
import flink.a.Student;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.*;
import org.apache.flink.core.fs.FileSystem;

import java.math.BigDecimal;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

public class FlinkCsv
{
public static void main(String[] args) throws Exception
{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

DataSet<Student> data_student = env.readCsvFile("D:\\Z_STUDENT.csv").fieldDelimiter(",").ignoreFirstLine()
.includeFields(true, true, true).pojoType(Student.class, "s_id", "s_name", "age");

DataSet<Grade> data_grade = env.readCsvFile("D:\\Z_GRADE.csv").fieldDelimiter(",").ignoreFirstLine()
.includeFields(true, true).pojoType(Grade.class, "g_id", "g_name");

DataSet<Link> data_link = env.readCsvFile("D:\\Z_LINK.csv").fieldDelimiter(",").ignoreFirstLine()
.includeFields(true, true, true).pojoType(Link.class, "sg_id", "g_id", "s_id");

// 三,把这三个数据联合,并取其中三列数据<年级名称,年龄,年级编号>
MapOperator<Tuple2<Tuple2<Student, Link>, Grade>, Tuple3<String, Long, Long>> map = data_student
.join(data_link).where("s_id").equalTo("s_id")
.join(data_grade).where(new KeySelector<Tuple2<Student, Link>, Long>()
{
@Override
public Long getKey(Tuple2<Student, Link> t) throws Exception
{
return t.f1.getG_id();
}
}).equalTo("g_id")
.map(new MapFunction<Tuple2<Tuple2<Student, Link>, Grade>, Tuple3<String, Long, Long>>()
{
@Override
public Tuple3<String, Long, Long> map(Tuple2<Tuple2<Student, Link>, Grade> t) throws Exception
{
String g_name = t.f1.getG_name();
long age = t.f0.f0.getAge();
long g_id = t.f1.getG_id();
return new Tuple3<>(g_name, age, g_id);
}
});

// 四,获取结果,得到6列数据,按照年级分组得到<年级名称,人数,总年龄,平均年龄,最大年龄,最小年龄>并按照年级的大小排序
MapOperator<Tuple7<String, Long, Long, String, Long, Long, Long>, Tuple6<String, Long, Long, String, Long, Long>> result = map
// 计算count开始
.map(new MapFunction<Tuple3<String, Long, Long>, Tuple2<String, Long>>()
{
// 1. 在方法外,定义一个Map
Map<String, Long> map = new HashMap<>();

@Override
public Tuple2<String, Long> map(Tuple3<String, Long, Long> t) throws Exception
{
// 2. 如果map中有key为年级的值,则那个值+1
if (map.containsKey(t.f0))
{
Long x = map.get(t.f0) + 1;
map.put(t.f0, x);
} else
{
// 3. 如果没有,则添加,并设置值为1
map.put(t.f0, 1L);
}
// 4. 把第二列换成出现的次数,去除第三列,返回
return new Tuple2<>(t.f0, map.get(t.f0));
}
}).groupBy(0).max(1)
// 计算count结束
.join(map.groupBy(0, 2).sum(1)).where(0).equalTo(0)//联合总年龄数
// 计算avg开始
.map(new MapFunction<Tuple2<Tuple2<String, Long>, Tuple3<String, Long, Long>>,
Tuple4<String, Long, Long, String>>()
{
@Override
public Tuple4<String, Long, Long, String> map
(Tuple2<Tuple2<String, Long>, Tuple3<String, Long, Long>> t) throws Exception
{
String g_name = t.f0.f0;
Long count = t.f0.f1;
Long sum = t.f1.f1;

// 1. 计算`sum/count`保留两位小数,四舍五入,转为字符串
String avg = new BigDecimal(Float.valueOf(sum) / count).setScale(2, BigDecimal.ROUND_HALF_UP).toString();
// 2. 字符串按照符号点"."进行分割,因为它是特殊符号,所以使用"\\."
String[] split = avg.split("\\.");
// 3. 如果分割的第二部分,也就是小数部分,可以匹配1到9任何一个数字出现至少1次或多次
// 对这个结果取反,也就是小数点后面都是0的话,平均值就等于整数部分(小数点以前的部分)
if (!split[1].matches("[1-9]+"))
avg = split[0];
// 4. 返回
return new Tuple4<>(g_name, count, sum, avg);
}
})
// 计算avg结束
.join(map.groupBy(0, 2).max(1)).where(0).equalTo(0)//联合最大值
.join(map.groupBy(0, 2).min(1)).where(new KeySelector<Tuple2<Tuple4<String, Long, Long, String>,//联合最小值
Tuple3<String, Long, Long>>, String>()
{
@Override
public String getKey(Tuple2<Tuple4<String, Long, Long, String>,
Tuple3<String, Long, Long>> t) throws Exception
{
return t.f0.f0;
}
}).equalTo(0)
// 转化为7列
.map(new MapFunction<Tuple2<Tuple2<Tuple4<String, Long, Long, String>, Tuple3<String, Long, Long>>,
Tuple3<String, Long, Long>>, Tuple7<String, Long, Long, String, Long, Long, Long>>()
{

@Override
public Tuple7<String, Long, Long, String, Long, Long, Long>
map(Tuple2<Tuple2<Tuple4<String, Long, Long, String>, Tuple3<String, Long, Long>>, Tuple3<String, Long, Long>> t) throws Exception
{
String g_name = t.f1.f0;
Long count = t.f0.f0.f1;
Long sum = t.f0.f0.f2;
String avg = t.f0.f0.f3;
Long max = t.f0.f1.f1;
Long min = t.f1.f1;
Long g_id = t.f1.f2;
return new Tuple7<>(g_name, count, sum, avg, max, min, g_id);
}
}).sortPartition(6, Order.ASCENDING)//按照第7列排序,升序
// 取其中6列
.map(new MapFunction<Tuple7<String, Long, Long, String, Long, Long, Long>,
Tuple6<String, Long, Long, String, Long, Long>>()
{
@Override
public Tuple6<String, Long, Long, String, Long, Long> map(Tuple7<String, Long, Long, String, Long, Long, Long> t) throws Exception
{
return new Tuple6<>(t.f0, t.f1, t.f2, t.f3, t.f4, t.f5);
}
});

result.print();
//(一年级,2,88,44,66,22)
//(二年级,3,88,29.33,44,11)
//(三年级,1,55,55,55,55)

map.writeAsCsv("D:\\Flink_CSV\\" + new SimpleDateFormat("yyyy-MM-dd HH-mm-ss SSS").format(new Date()) + ".csv",
"\n", ",", FileSystem.WriteMode.OVERWRITE);
env.execute("Hello!@ Fuck...");
}
}