一、flink introduction
Flink is a unified computing framework that combines batch processing and stream processing. Its core is a stream data processing engine that provides data distribution and parallel computing. Its biggest highlight is stream processing, which is the industry’s top open source stream processing engine. The most suitable application scenario for Flink is the low-latency data processing scenario: high-concurrency pipeline processing data, millisecond-level latency, and reliability
二、environment setup
install flink
intall Netcat
Netcat (also known as NC) is a computer networking tool that can establish a TCP/IP or UDP connection between two computers. It is widely used for testing ports in a network, sending files, etc. Netcat makes it easy to perform network debugging and probing, as well as perform advanced network operations such as encrypted connections and remote management. Because of its powerful functions and simplicity of use, it is also widely used in the field of computer security.
install nc command
yum install -y nc
start socket port
[root@node01 bin]# nc -lk 8888
三、Code Project
Experiment purpose: Unbounded stream reading socket text stream
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springboot-demo</artifactId>
<groupId>com.et</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>flink</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- add Flink dependency-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.17.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.17.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.17.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>1.17.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>1.17.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
</transformer>
<transformer
implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
<resource>META-INF/spring.factories</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.et.flink.job.SocketJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
SoketJob.java
package com.et.flink.job;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @author liuhaihua
* @version 1.0
* @ClassName SocketJob
* @Description todo
* @date 2024年02月29日 17:06
*/
public class SocketJob {
public static void main(String[] args) throws Exception {
// create excute Environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// default 3
env.setParallelism(3);
DataStreamSource<String> socketDS = env.socketTextStream("172.24.4.193", 8888);
// ETL
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS
.flatMap(
(String value, Collector<Tuple2<String, Integer>> out) -> {
String[] words = value.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
)
.setParallelism(2)
.returns(new TypeHint<Tuple2<String, Integer>>() {
})
// .returns(Types.TUPLE(Types.STRING,Types.INT))
.keyBy(value -> value.f0)
.sum(1);
// pinrt
sum.print();
// execute
env.execute();
}
}
Code repository
四、Test
start socket streaming
[root@cmn-zentao-002 ~]# nc -l 8888
local execute
First: start main funcition
second: input some text in the socket
abc bcd cde
bcd cde fgh
cde fgh hij
console logs
3> (abc,1)
1> (fgh,1)
3> (bcd,1)
3> (cde,1)
3> (bcd,2)
3> (cde,2)
3> (cde,3)
1> (fgh,2)
2> (hij,1)
cluster execute
execute maven package,upload jar to the cluster
input some text,you will see the result as same as the local run