炳裘垦 发表于 7 天前

Kafka如何快速的入个门呢?

Kafka是什么?

Kafka是Apache基金所维护的一个中间件项目,它是一个开源的分布式事件流平台,广泛用于构建高性能的数据管道、流式分析、数据集成以及关键业务应用。
这里面有几个点需要说明一下,开源自然不用讲

[*]分布式简单的理解为通过网络通信实现多台服务器部署,联动提供服务,保证容错和提高效率。
[*]事件流,事件指的是数据在传输过程中保持着的形式,类似于网络的一个包结构,而流则指的是随时间顺序持续传输,如水流般。事件流可以用以监测物理硬件,患者病情,数据服务,金融交易等等。
[*]后一句话则是讲了他的用途,数据管道(比如数据从源头到目的的处理),流式分析(实时显示热销数据),数据集成(指的是多数据源汇入统一系统)
Kafka有什么核心优势?


[*]高吞吐量:官方说它在海量数据写入写出时可以保证2ms以内的延迟
[*]可拓展性:在机器压力过大的时候可以自由便捷的增加集群的数量,配置较为简单
[*]永久存储:指的是在保证实时性的同时数据也可以同步到物理存储上
[*]高可用性:内部有故障转移机制,在某台服务器挂掉之后,会有其他服务器自动顶替
这里给出Kafka社区项目的github,算一个超级火爆的项目了,感兴趣的可以看看源代码
Kafka有什么应用场景?


[*]作为后端消息传递的工具,类似于ActiveMQ和RabbitMQ,这里我们为了专注于题目就不详细对比其中区别。
[*]网站活动跟踪,用于实时监控和处理,比如双十一淘宝的下单量最高的十个品等排行榜,或者检测异常区域下单。
[*]指标监控服务,收集多端数据进行大屏展示,集中运营数据。
[*]日志聚合,将不同系统的日志抽离聚合,更加具有时效性,比如测试过程中的链路日志聚合。
[*]流式处理,构建多阶段的数据处理管道,还可以和Flink等配合增强两阶段提交的故障恢复能力。
[*]时间顺序下的状态保存,比如一个订单从创建,支付,运输等等的变化过程。
[*]作为提交日志的保存点,类似于mysql的redo日志,负责给其他分布式集群提供一致性和恢复性的功能实现。
Kafka提供什么关键功能?


[*]写入和读出事件流,具体来说呢,我们可以把Mysql的数据写入到Kafka,也可以从Kafka读出这批数据到文本,无须关注其传输细节,好用!
[*]持久化事件流,一般来说呢它可以配置事件的保留时间,比如7*24然后自动删除,但是我们可自定义他落盘的介质,比如文本日志,数据库等等来实现永久持久化。
[*]支持事件流发生过程中二次处理,比如每笔订单在落盘之前进行过滤,未付款的就不进入存储,可以在对事件流进行控制和处理。
Kafka中有哪些核心的概念?


[*]集群概括:

[*]服务器端Servers:kafka以集群方式运行,其中一部分作为数据存储,称为broker,另一些运行kafka connect,用于持续导入和导出数据流。
[*]客户端Clients:负责与集群交互,用以编写程序,从而实现对事件流的处理,写入和写出等。

[*]事件Events(也被成为record和message):

[*]事件键Event key:"Alice"
[*]事件值Event value:"支付2000元"
[*]事件发生时间Event timestamp:"Jun. 25, 2020 at 2:06 p.m."

类似于一个网络包的头部,内容等,在kafka的数据传输中是按照事件的形式。


[*]客户端角色

[*]生产者Producer:负责将数据写入Kafka
[*]消费者Consumer:负责从Kafka消费数据]

生产者和消费者是解耦的,互相并不影响,数量也没有限制,可以n生产者或者n消费者


[*]主题概念:用来存储数据的"文件夹",生产者将数据写入,消费者从其中读出数据;主题可以有多个,比如购买的日志放一个主题,收藏的日志放一个主题,一个主题可以有多个生产者和消费者来对接;主题的内容被消费后也不会立刻删除,可以配置删除时间甚至是永久不删除,对应上面的持久存储。
主题算是一个逻辑概念,不算是物理概念,从物理上来讲呢,数据具体是按照分区(文件夹)来存储的,当一个event来临的时候,按照event的key对分区数取模运算(假定如此简单的分区规则),运算的结果是多少,那么就存储在哪个分区,这就是物理划分的结果。相当于主题又划分为分区,这里的分区也是分布式的,比如备份数量为3,那么就在集群中有三台服务器上存储有这个分区的数据,容错还是很高的,也方便在不同的网络环境和机架上更加高效择优的进行数据读写(选择距离自己更近的选择数据读取,网络路由选择有利于提高读取效率)。

图 Fig.1 我们看到分了三个分区,而三个分区的区内数据都是按照事件流来的顺序有序的,但是分区之间可不是有序的,并不是说把012分区按顺序都读取数据就是有序的,具体我们从图 Fig.2 中可以看出。如果想要保证数据完全有序呢,可以采用一个分区,毕竟区间内是有序的。

如何对Kafka说一声Hello,Word呢?

Kafka的部署-以Centos版本为例,链接: https://pan.baidu.com/s/1-LgeDD_EM8fDTLdW5DcShg?pwd=qgsx 提取码: qgsx ,其中包含Kafka4.0.0的压缩包,JDK21的压缩包以及Centos7的压缩包,还有后续的配置文件,辅助工具等等
Centos部署

下载Centos7版本的ISO镜像(部署在Vmware虚拟机中,也可以自行替换本机系统),系统安装过程略说:

[*]新建虚拟机,自定义高级配置(典型也是可以的),等到需ISO镜像文件的时候选择上述下载的文件即可,其他步骤按照默认下一步。
[*]如果是单机测试呢,网络环境以及ssh免密登录也就省略了,如果是集群的话我们后面细说。
[*]关闭网络防火墙,避免Kafka的端口无法访问,systemctl start firewalld
JDK部署

# 解压缩jdk的压缩包到目录中,我将jdk的压缩包放在/opt/pkg/目录下,解压缩到/opt/soft下
tar -zxvf /opt/pkg/jdk-21_linux-x64_bin.tar.gz -C ../soft
# 修改压缩包的名称,方便后面的环境配置
mv 解压后的文件夹名称 jdk21
# 新建环境变量文件
sudo vi /etc/profile.d/qgsx_env.sh#新建环境变量文件
# 写入配置文件以下内容
export JAVA_HOME=/opt/jdk17
export PATH=$JAVA_HOME/bin:$PATH
# 重新加载配置文件
source /etc/profile.d/qgsx_env.sh
# 验证是否正确配置
java -versionKafka部署

# 解压缩kafka的压缩包到目录中,同上所述
tar -zxvf /opt/pkg/kafka_2.13-4.0.0.tgz -C ../soft
# 修改压缩包名称,如上
mv 解压后的文件夹名称 kafka400
# 在上述的配置文件上添加内容
export KAFKA_HOME=/opt/kafka400
export PATH=$KAFKA_HOME/bin:$PATH
# 后续的操作也如上所示
# 验证阶段如下,判断地址是否打印正确,这里不方便直接测试,因为kafka必须格式化之后才使用命令
echo $KAFKA_HOMEKafka启动


[*]已经部署了Kafka的环境,那么就得做其预备工作了,主要涉及生成UUID和格式化日志目录两个步骤

[*]格式化UUID,KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
[*]格式化目录日志,bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties
[*]上述两步在kafka目录里面执行,第一步是uuid作为集群的唯一标志,不同的uuid代表不同的集群;第二步是执行一次就可以将生成的uuid写入到kafka的本地数据中,同时初始化元数据的日志,一些索引,集群信息等内容。

[*]启动Kafka集群

[*]修改kafka启动配置文件,如下(中文注释即可修改):
############################# Socket Server Settings #############################

# The address the socket server listens on.
# Combined nodes (i.e. those with `process.roles=broker,controller`) must list the controller listener here at a minimum.
# If the broker listener is not defined, the default listener will use a host name that is equal to the value of java.net.InetAddress.getCanonicalHostName(),
# with PLAINTEXT listener name, and port 9092.
#   FORMAT:
#   listeners = listener_name://host_name:port
#   EXAMPLE:
#   listeners = PLAINTEXT://your.host.name:9092

# 内部通信所使用的ip和端口,9092用于生产者消费者等通信,而9093是控制器(集群领导)用来进行元数据管理
listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093

# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT

# Listener name, hostname and port the broker or the controller will advertise to clients.
# If not set, it uses the value for "listeners".

# 对外提供连接的ip和端口
advertised.listeners=PLAINTEXT://192.168.131.201:9092,CONTROLLER://192.168.131.201:9093

# A comma-separated list of the names of the listeners used by the controller.
# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol
# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
[*]bin/kafka-server-start.sh config/server.properties,尾巴的配置文件就是kafka的启动配置文件
[*]启动之后呢这个控制台就不要动了,保持活跃状态,然后开启其他控制台来执行下面的程序,其中生产者和消费者也要分别占用一个控制台
[*]既然我们要进行数据生产和消费,首先需要一个主题(逻辑数据分类)
bin/kafka-topics.sh --create --topic qgsx --bootstrap-server localhost:9092 # qgsx是主题的名称,在kafka目录中执行
[*]生产者生产数据到主题中去
bin/kafka-console-producer.sh --topic qgsx --bootstrap-server localhost:9092 # 这里我们采用简单的控制台输入,然后就可以在控制台输入消息
[*]消费者消费对应主题的数据
bin/kafka-console-consumer.sh --topic qgsx --from-beginning --bootstrap-server localhost:9092 # 这里我们可以看到在生产者生产的数据

Kafka的连接器使用

这里我们解释一下kafka connect是做什么用的?kafka不仅仅提供了数据通道和处理的框架,为了方便用户使用,kafka还开发了很多接口,可以帮助我们直接对接常规的数据库,数据系统等,我们通过配置对应的连接方式或者详细地址,就可以自动从数据存储的位置将数据同步到Kafka的Topic中,反之亦然,也可以通过Kafka的Connect将Topic的数据同步到目的系统或者数据库中。
如何使用Kafka Connect来进行数据导入和导出到对应的系统呢?


[*]首先,我们要使用特定数据库或者系统连接器功能,就得有相应的库(一般封装为jar包,当然了,官方一般是提供好的,也提供自定义写法,相当于一个插件,也可以理解为游戏的MOD),这里我们采用文件形式的连接器,简而言之呢就是:file --->Kafka Topic ---> file。
[*]下面我们就得导入Kafka的Connect包,那么我们在哪里找到对应的Connect包呢?Confluent官方提供,也可以搜索。
[*]这里我们以官方自带的File连接器为例,毕竟官方提供好的Demo,免去了入门配置的痛苦苦恼,见见效果才有动力!
[*]在4.0的Kafka的libs目录中,我们过滤出来需要用的FileConnect,ls | grep file,可以看到connect-file-4.0.0.jar包,没有的我这边放在上述的网盘文件中,方便大家自取,但是4.0版本这里不在依赖于Zookeeper(过去的一个合作伙伴),以前的版本使用该版本库可能存在兼容问题。
[*]有了包之后呢,我们还需要启动的配置项,Kafka连接器是用配置来启动的,主要包括哪些配置文件呢?

[*]在这里为了方便使用,都是采用单机形式,所以首先要有单机启动配置文件,位于config/connect-standalone.properties,这里的启动文件较为简单我们来介绍一下,下面是去掉注释的部分        :
# kafka connect连接Kafka服务集群的地址,单机我们自然就写一个本机就可以
bootstrap.servers=localhost:9092
# Kafka Connect在数据导出的时候是结构化的,而Topic里面的数据是字节流的,所以需要转换器,将k和v进行转换
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# 这里表明k和v在Json中包含Schema信息,比如字段类型,结构等等
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# 这里就是偏移量存储的位置,说白了就是source读取数据到什么位置了,别重复读取,每10s刷新一次到该文件
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
# 这里表明我们此次需要加载的插件,和上面对照我们加载的是file的连接器,这句条件是我们需要添加的!!!!!
plugin.path=/opt/soft/kafka213400/libs/connect-file-4.0.0.jar
[*]还得有Kafka Source Connect以及Kafka Sink Connect,其实呀就是对应的我们上面说的生产者和消费者,分别对应的配置文件位于config/connect-file-sink.properties 和 config/connect-file-source.properties,一般来说我们在config里面ls | grep file就可以看到这两个配置文件,如果没有的话,这里也是放在上面的网盘上了。

[*]首先来看connect-file-source.properties
# 表示该连接器的名称,可以自定义,标志着这个连接器实例
name=qgsx-local-file-source
# 这里表示采用我们上述的File类型连接器,主类可以标志其进入的入口函数
connector.class=FileStreamSource
# task的任务数量,也就是并行度,对于文件读取的话1也比较合适,毕竟要顺序读取
tasks.max=1
# 指定要读取的本地文件夹
file=test.txt
# 要将数据读取到哪个主题
topic=qgsx
[*]然后看connect-file-source.properties
# 实例名称同上
name=qgsx-local-file-sink
# 连接器类型同上
connector.class=FileStreamSink
# 并行度同上
tasks.max=1
# 从Topic中读取数据到哪里
file=test.sink.txt
# 想要读取哪个主题
topics=qgsx

[*]配置文件和连接器的Jar包已经准备就绪了,如果并不是从开头观看本文的话,这里需要启动一下Kafka的服务,同时创建主题qgsx,下面给出相关命令,在Kafka文件夹中执行即可,然后用jps查看Kafka的状态:
bin/kafka-server-start.sh config/server.properties
bin/kafka-topics.sh --create --topic qgsx --bootstrap-server localhost:9092
## 如果操作有误的话,我们可以删除主题重新创建
# 删除主题
bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic qgsx
# 重新创建主题
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic qgsx --partitions 1 --replication-factor 1
# 查看主题的状态
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
[*]然后就是在Kafka的文件夹中创建我们的原始数据了,也就是上面的test.txt文件,放入一些内容
echo -e "let us\nqgsx" > test.txt
[*]最后就是启动我们的连接器服务
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
如何在数据传递过程中进行实时处理呢?

我们在上面说过,Kafka支持流式处理,也就是在事件不断传递的同时还能对其中的数据进行加工处理,下面我们进行演示:

[*]首先是创建两个主题,分别表示数据存储的主题和数据处理后的主题,而在这个中间就是我们的处理过程,展示为:qgsx-src ---->   processing ----> qgsx-processed
bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-plaintext-input
bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-wordcount-output \
    --config cleanup.policy=compact# 这里表示主题的数据仅保留相同key的最新版本的value(一条)
# 查看主题状态
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
[*]启动事件流处理程序,该程序主要是从streams-plaintext-input中实时读取数据,统计不同单词的数量,然后将不同单词作为key,value是数量存放进入streams-wordcount-output主题,考虑到该主题是compact策略,所以每次输出的都是最新的数量。
bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
[*]最后是进行测试,我们启动一个控制台的生产者写入数据到streams-plaintext-input,一个控制台的消费者查看数据streams-wordcount-output,结果可以看到随着不断输入,输出也在实时更新,有变化的单词会新生成一条记录,展现新的数量,而没有数量变化的单词在上面不会生成新的记录,这里解释一下避免大家对结果展示不理解。
# 生产者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
# 消费者,配置项多了点,分别介绍
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \   # 从头开始读取数据,避免offset偏移量影响
    --property print.key=true \# 打印key,也就是单词
    --property print.value=true \# 打印value,也就是数量
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ # 存储的数据是字节流的,无法直接查看,采用String序列化器进行序列化可视化
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer# 同理,采用Long序列器
[*]那么具体这个实时控制的逻辑是怎么做的呢?我们详细拆解一下

[*]首先我们来做一个基于Java的最简单的功能来展示数据通路,该功能的目的仅仅是将A主题的数据传递给B主题,注释虽然写的很多,但是代码是很简单的,只是为了让入门的同学有个了解:
package myapps;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class Pipe {
    public static void main(String[] args) throws Exception {
      // 我们要连接kafka集群来执行任务,首先要配置kafka集群的连接参数,这里还配置了序列化器
      Properties props = new Properties();
      props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
      props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.131.201:9092");
      // 稍微解释一下序列化的作用,我们想要把Java这种结构性的对象比如String之类的放入主题,而主题传递需要的是字节流,就需要经过序列化器完成转换。
      props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
      props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                // 有了配置项,那么就该写我们的数据处理逻辑了,这里定义的结构是存放我们的处理逻辑的对象StreamsBuilder,比如先聚合,再分割,再。。。,一套处理逻辑可以放在该结构里面,当然了,这只是逻辑部分,并不是直接执行的,要分清楚哦,一般我们称之为该系列性的逻辑是拓扑图,和大家刚入行学的数据结构中的拓扑序列本质是一样的。
      final StreamsBuilder builder = new StreamsBuilder();
      // 下面就是具体的逻辑,从streams-plaintext-input主题构建了一个流,经过分割,统计得到k是单词,v是数量,进一步传递给streams-wordcount-output主题
      builder.<String, String>stream("streams-plaintext-input")
       .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
       .groupBy((key, value) -> value)
       .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
       .toStream()
       .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
                // 当我们定义好了处理逻辑之后,接下来就需要将它“执行”起来。这个过程就像是从一个逻辑视图转变成一个执行视图。简单来说,逻辑视图只是粗略地描述了我们“想做什么”,而执行视图则明确了“怎么做”。在构建执行视图的过程中,Kafka Streams 会自动完成以下细节:从哪里读取数据(即输入的 topic);如何处理数据(使用了哪些操作);拓扑中有哪些处理节点,它们之间如何连接;节点的执行顺序;是否需要状态存储等;这个过程其实就类似于数据库中执行 EXPLAIN 时生成的执行计划:你只需描述“查什么”,数据库会决定“怎么查”最优。换句话说,Kafka Streams 会帮你把抽象的流处理逻辑转化为一个可以执行的拓扑结构,而我们不需要关注底层的细节,这大大简化了开发的复杂度。
      final Topology topology = builder.build();
      // 执行计划依赖于逻辑计划,但是我们最终想要执行需要的是执行计划,有了执行计划和集群连接配置,就需要一个真正能执行计划的主体了,也就是我们的KafkaStreams对象,需要执行计划和配置项。
      final KafkaStreams streams = new KafkaStreams(topology, props);
      // CountDownLatch是一个同步辅助工具类,协调执行顺序的。具体是标记计数器的值,下面为1。然后采用await函数来阻塞进程,当执行countdown足够次数的时候才会使得计数器变为0,那么就会让其他等待的线程被释放。
      final CountDownLatch latch = new CountDownLatch(1);
      // 获取当前运行的程序,挂一个钩子,目的是在程序被异常关闭时候能优雅的关闭流处理,避免数据读取异常的问题,同时计数器变为0,结束程序。
      Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
      });
           // 开始执行数据传递程序,阻塞主程序
      try {
            streams.start();
            latch.await();
      } catch (Throwable e) {
            System.exit(1);
      }
      System.exit(0);
    }
}

想要复现这段代码,需要哪些前置手段呢,首先是按照我们上述的流程能正常走到单机部署和Kafka的启动,确保Kafka的运行是正常的,然后利用Maven搭建Kafka项目,下面我们给出搭建的命令(官方提供,会自带三个文件,其中包括简单的主题数据传递,上一个基础加上单词分割,再上一个基础加上统计数量):
# 为了帮助同学在初期避免踩坑呢,我们这里给出对应的各种版本:
# --Maven Linux版本的4.0.0
# --JDK 21
# --Kafka 4.0.0
# --Idea版本2025.1,这里我是采用虚拟机上部署Centos7系统,然后利用Idea的远程连接来操作的,为了避免直接远程下载比较慢,我是上传的压缩包(如果涉及到相关的步骤有不明白的可以在下面提问,也会对其补充,尽量保证大家能顺利的入个门先)
# --同时,本次文章涉及到的配置文件以及相关的所有压缩包都会在上述的百度网盘链接中。
mvn archetype:generate \
-DarchetypeGroupId=org.apache.kafka \
-DarchetypeArtifactId=streams-quickstart-java \
-DarchetypeVersion=4.0.0 \
-DgroupId=streams.examples \
-DartifactId=streams-quickstart \
-Dversion=0.1 \
-Dpackage=myapps上述的命令执行完之后会生成我们的项目文件,其中三个Src的源文件在修改localhost目标地址之后可以直接启动,来源主题都是streams-plaintext-input,可以自行创建和添加数据,为了方便查看转移的效果呢,下面我们提供一段Java的消费者代码,修改对应的主题和localhost可以持续监控打印主题的数据在控制台。
package myapps;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.LongDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class Pipe {

    public static void main(String[] args) {
      Properties props = new Properties();
      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.131.201:9092");
      props.put(ConsumerConfig.GROUP_ID_CONFIG, "wordcount-consumer");
      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

      KafkaConsumer<String, Long> consumer = new KafkaConsumer<>(props);
      consumer.subscribe(Collections.singletonList("streams-wordcount-output"));

      System.out.println("==== 消费者启动,开始打印统计结果 ====");

      while (true) {
            ConsumerRecords<String, Long> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, Long> record : records) {
                System.out.printf("单词: %-10s次数: %d%n", record.key(), record.value());
            }
      }
    }
}上述的操作其中感兴趣的可以自行修改尝试,比如为k增加一个前缀,更改统计的类别划分规则,过滤部分数据等等
在连接器使用过程中会有什么可能的问题呢?


[*]内容没有写入Kafka主题?

[*]对数据源文件是否有读写权限,当然没有权限是会报错的,修正的话可以使用chmod命令
[*]主题不存在,查看主题列表是否存在
[*]数据源也就是source配置文件是否有误,主题是否错误,文件地址是否正确(用绝对地址好一些)
[*]Kafka集群服务是否启动
[*]可以命令查看主题是否写入数据:bin/kafka-console-consumer.sh --topic qgsx --from-beginning --bootstrap-server localhost:9092

[*]内容没有输出到指定文件?

[*]Kafka集群服务是否启动?
[*]输出文件是否有读写权限
[*]数据输出的配置文件是否有误,同上
[*]offset也就是读取的偏移量,默认是读取最新的数据,可能偏移量已经到头了,可以追加一条数据到数据源头echo "hello again" >> test.txt,或者是直接删除偏移量的文件rm -rf /tmp/connect.offsets

[*]Maven项目无法部署
这里主要是涉及到包从中央仓库下载的问题,下面更改镜像源可解决部分问题,给出阿里云的镜像云:
# 如何找到配置文件,我是通过下载Maven的压缩包解压来获取的,所以其中的Conf配置里面的settings.xml就是镜像源的配置文件,我是将所有的配置和部署都放在了同一台机器,这样的好处是避免网络通信和系统不兼容的问题,如果大家是在win上部署的idea和maven,那么maven的配置文件一般存放在C:\Users\用户名\.m2,名字都是一样的
# 替换其中的镜像源部分,然后再idea里面的setting找到maven,然后在user_setting_files替换为自己的setting文件就可以,下面的库位置会自动识别的。
<mirrors>
    <mirror>
      <id>alimaven</id>
      <name>aliyun maven</name>   
      <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
      <mirrorOf>central</mirrorOf>
   </mirror>
</mirrors># 如果大家的Idea也是部署在win上面的话,我这里再给出一个maven项目的pom文件,方便同学快速构建项目
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>streams.examples</groupId>
    streams-quickstart</artifactId>
    <version>0.1</version>
    <packaging>jar</packaging>

    <name>Kafka Streams Quickstart :: Java</name>

    <properties>
      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      <kafka.version>4.0.0</kafka.version>
      <slf4j.version>2.0.16</slf4j.version>
    </properties>

    <repositories>
      <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
      </repository>
    </repositories>

    <build>
      <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                maven-compiler-plugin</artifactId>
                <version>3.13.0</version>
                <configuration>
                  <release>21</release>
                </configuration>
            </plugin>
      </plugins>

      <pluginManagement>
            <plugins>
                <plugin>
                  <groupId>org.eclipse.m2e</groupId>
                  lifecycle-mapping</artifactId>
                  <version>1.0.0</version>
                  <configuration>
                        <lifecycleMappingMetadata>
                            <pluginExecutions>
                              <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        maven-assembly-plugin</artifactId>
                                        <versionRange>[2.4,)</versionRange>
                                        <goals>
                                          <goal>single</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    
                                        <ignore/>
                                    </action>
                              </pluginExecution>
                              <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        maven-compiler-plugin</artifactId>
                                        <versionRange>[3.1,)</versionRange>
                                        <goals>
                                          <goal>testCompile</goal>
                                          <goal>compile</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    
                                        <ignore/>
                                    </action>
                              </pluginExecution>
                            </pluginExecutions>
                        </lifecycleMappingMetadata>
                  </configuration>
                </plugin>
            </plugins>
      </pluginManagement>
    </build>

    <dependencies>
      
      <dependency>
            <groupId>org.slf4j</groupId>
            slf4j-reload4j</artifactId>
            <version>${slf4j.version}</version>
      </dependency>
      
      <dependency>
            <groupId>org.apache.kafka</groupId>
            kafka-streams</artifactId>
            <version>${kafka.version}</version>
      </dependency>

    </dependencies>
</project>
[*]如何查看主题的数据和消费情况呢?
为了方便可视化,同时有效的检查bug,这里我们提供两组客户端工具,一组是老Kafka工具,也就是Kafka Tool、Offset Exploer,地址为Kafka Tool,另一组是在github上发现的比较新的工具kafka king,地址为Kafka King。虽然各自支持的版本还不是完全最新的,但是也是可以使用的,另外说一嘴,目前好用的版本大多是基于网页的,部署到kafka的项目里面然后网页端访问,比如AKHQ、Kafka eagle等,有下载问题或者安装问题的在在百度网盘连接中我们也提供原始的windows安装包。
Kafka有哪些文章支撑呢?


[*]Akanbi A. Estemd: A distributed processing framework for environmental monitoring based on apache kafka streaming engine//Proceedings of the 4th International Conference on Big Data Research. 2020: 18-25.
[*]Sax M J, Wang G, Weidlich M, et al. Streams and tables: Two sides of the same coin//Proceedings of the International Workshop on Real-Time Business Intelligence and Analytics. 2018: 1-10.
可以看到近几年已经没有文章了,在学术界基本定型,目前主要是工业上的应用多一些,而随着大模型在学术界的不断活跃,未来大模型和流式处理的碰撞一定是很火热的,可以在流式处理中加入大模型的智能推断,为下游提供决策,也可能是实时Agent驱动的响应系统等等
大功告成!我们实现了Kafka的基础理解和基本使用(单机版本),对于:

[*]为什么要用Kafka而不用其他消息中间件?
[*]Kafka内部的框架是什么样子的?
[*]集群版本的同步是如何做的?
[*]故障出现的转移是怎样的过程?
[*]他是如何进行流式处理的?
[*]我们能不能自定义组件?
Kafka是一个充满智慧的设计,后面我们慢慢分解,此篇文章仅给想要了解Kafka的同学们!下次再见,我会不断分解由浅入深的中间件系列的设计,感兴趣的同学可以关注一下新开的公众号--起个数先。


来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: Kafka如何快速的入个门呢?