Flink接收RabbitMQ数据写入到Oracle

项目案例

https://github.com/TaoPanfeng/case/tree/master/03-flink/flink-rabbitmq-oracle

文件内容

FlinkMain.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;

public class FlinkMain
{
public static void main(String[] args) throws Exception
{
// 1,执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 2,RabbitMQ配置
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("192.168.1.3")
.setPort(5673)
.setUserName("panfeng")
.setPassword("panfeng")
.setVirtualHost("/panfeng")
.build();

// 3,添加资源
DataStreamSource<String> dataStreamSource = env.addSource(new RMQSource<String>(
connectionConfig,
"flink",
true,
new SimpleStringSchema()));

// 4,添加到流,去执行接收到的数据进行入库
dataStreamSource.addSink(new SinkOracle());

// 5,执行工作,定义一个工作名称
env.execute("rabbitmq flink oracle");
}
}

SinkOracle.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

public class SinkOracle extends RichSinkFunction<String>
{
private Connection connection;
private PreparedStatement statement;

// 1,初始化
@Override
public void open(Configuration parameters) throws Exception
{
super.open(parameters);
Class.forName("oracle.jdbc.OracleDriver");
connection = DriverManager.getConnection("jdbc:oracle:thin:@localhost:1521:orcl", "scott", "123456");
statement = connection.prepareStatement("INSERT INTO FLINK VALUES (SEQ_FLINK.NEXTVAL,?)");
}

// 2,执行
@Override
public void invoke(String value, Context context) throws Exception
{
System.out.println("value.toString()-------" + value.toString());
statement.setString(1, value);
statement.execute();
}

// 3,关闭
@Override
public void close() throws Exception
{
super.close();
if (statement != null)
statement.close();
if (connection != null)
connection.close();
}
}

pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<dependencies>
<!--flink-java-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.9.0</version>
</dependency>

<!--flink-streaming-java_2.11-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.9.0</version>
</dependency>

<!--flink-connector-rabbitmq_2.11-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq_2.11</artifactId>
<version>1.9.0</version>
</dependency>

<!--Oracle-->
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc8</artifactId>
<version>12.2.0.1.0</version>
</dependency>
</dependencies>

测试步骤

执行 Flink.java中的主方法,往对应队列中传入数据,可以输入到控制台

读取配置文件

如果想把配置信息写文件application.properties的话

配置文件内容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
db.driver=oracle.jdbc.OracleDriver
db.url=jdbc:oracle:thin:@10.18.20.180:1521:MUDATA
db.username=MD_REF
db.password=MD_REF_2018

rmq.host=10.18.20.13
rmq.port=5672
rmq.username=camel
rmq.password=camel123
rmq.vhost=reference
rmq.exchanges=ref.muservice.input
rmq.queue.airport=two.airport.muservice.input
rmq.queue.city=two.city.muservice.input
rmq.queue.country=two.country.muservice.input
读取RabbitMQ
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;

public class CountryFlinkMain
{
public static void main(String[] args) throws Exception
{
// 1,执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 2,读取 country.properties 配置
ParameterTool pt=ParameterTool.fromPropertiesFile("flink-crew-assign/src/main/resources/country.properties");

// 3,RabbitMQ配置
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost(pt.get("rmq.host"))
.setPort(Integer.parseInt(pt.get("rmq.port")))
.setUserName(pt.get("rmq.username"))
.setPassword(pt.get("rmq.password"))
.setVirtualHost(pt.get("rmq.vhost"))
.build();

// 4,添加资源,RMQSource(OUT)
DataStreamSource<String> dataStreamSource = env.addSource(new RMQSource<String>(
connectionConfig,
pt.get("rmq.queue.country"),// 国家
true,
new SimpleStringSchema()));

// 5,添加到流,去执行接收到的数据进行入库,addSink(IN)
dataStreamSource.addSink(new CountrySinkOracle());
// 6,执行工作,定义一个工作名称
env.execute("rabbitmq flink oracle");
}
}
读取数据库
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class CountrySinkOracle extends RichSinkFunction<String>
{
private Connection conn;
private PreparedStatement statement;

// 1,初始化
@Override
public void open(Configuration parameters) throws Exception
{
super.open(parameters);
ParameterTool pt = ParameterTool.fromPropertiesFile("flink-crew-assign/src/main/resources/country.properties");
Class.forName(pt.get("db.driver"));
conn = DriverManager.getConnection(pt.get("db.url"), pt.get("db.username"), pt.get("db.password"));
}