springboot integrated flink quick start demo

HBLOG
3 min readMar 4, 2024

--

一、flink introduction

Flink is a unified computing framework that combines batch processing and stream processing. Its core is a stream data processing engine that provides data distribution and parallel computing. Its biggest highlight is stream processing, which is the industry’s top open source stream processing engine. The most suitable application scenario for Flink is the low-latency data processing scenario: high-concurrency pipeline processing data, millisecond-level latency, and reliability

二、environment setup

install flink

intall Netcat

Netcat (also known as NC) is a computer networking tool that can establish a TCP/IP or UDP connection between two computers. It is widely used for testing ports in a network, sending files, etc. Netcat makes it easy to perform network debugging and probing, as well as perform advanced network operations such as encrypted connections and remote management. Because of its powerful functions and simplicity of use, it is also widely used in the field of computer security.

install nc command

yum install -y nc

start socket port

[root@node01 bin]# nc -lk 8888

三、Code Project

Experiment purpose: Unbounded stream reading socket text stream

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springboot-demo</artifactId>
<groupId>com.et</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>flink</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- add Flink dependency-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.17.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.17.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.17.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>1.17.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>1.17.0</version>
</dependency>

</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
</transformer>
<transformer
implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
<resource>META-INF/spring.factories</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.et.flink.job.SocketJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>

SoketJob.java

package com.et.flink.job;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @author liuhaihua
* @version 1.0
* @ClassName SocketJob
* @Description todo
* @date 2024年02月29日 17:06
*/
public class SocketJob {
public static void main(String[] args) throws Exception {
// create excute Environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// default 3
env.setParallelism(3);
DataStreamSource<String> socketDS = env.socketTextStream("172.24.4.193", 8888);
// ETL
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS
.flatMap(
(String value, Collector<Tuple2<String, Integer>> out) -> {
String[] words = value.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
)
.setParallelism(2)
.returns(new TypeHint<Tuple2<String, Integer>>() {
})
// .returns(Types.TUPLE(Types.STRING,Types.INT))
.keyBy(value -> value.f0)
.sum(1);

// pinrt
sum.print();
// execute
env.execute();
}
}

Code repository

四、Test

start socket streaming

[root@cmn-zentao-002 ~]# nc -l 8888

local execute

First: start main funcition

second: input some text in the socket

abc bcd cde
bcd cde fgh
cde fgh hij

console logs

3> (abc,1)
1> (fgh,1)
3> (bcd,1)
3> (cde,1)
3> (bcd,2)
3> (cde,2)
3> (cde,3)
1> (fgh,2)
2> (hij,1)

cluster execute

execute maven package,upload jar to the cluster

input some text,you will see the result as same as the local run

五、reference

--

--

HBLOG
HBLOG

Written by HBLOG

talk is cheap ,show me your code

Responses (1)