内容来自尚硅谷、黑马、DeepSeek1. 安装Kafka1.1 下载安装包安装包名kafka_2.12-3.8.0.tgz下载地址apache-kafka-3.8.0安装包下载_开源镜像站-阿里云用xftp传到任意一个节点上。1.2 解压、配置Kafka1.2.1 解压改名cd /component/packages tar -zxvf kafka_2.12-3.8.0.tgz -C /component/ mv kafka_2.12-3.8.0/ kafka # 改名1.2.2 修改配置文件cd config/ vim server.properties #broker的全局唯一编号不能重复只能是数字。 broker.id0 #kafka运行日志(数据)存放的路径路径不需要提前创建kafka自动帮你创建可以配置多个磁盘路径路径与路径之间可以用分隔 log.dirs/component/kafka/datas #配置连接Zookeeper集群地址在zk根目录下创建/kafka方便管理 zookeeper.connectzx101:2181,zx102:2181,zx103:2181/kafka 1.2.3 分发Kafka包第一节点broker.id0第二节点broker.id1第三节点broker.id2cd /component xsync kafka # 修改第二台、第三台节点的brokerid #broker的全局唯一编号不能重复只能是数字。 broker.id21.2.4 修改/etc/profile.d/my_env.shsudo vim /etc/profile.d/my_env.sh #KAFKA_HOME export KAFKA_HOME/component/kafka export PATH$PATH:$KAFKA_HOME/bin # 分发 sudo ~/bin/xsync /etc/profile.d/my_env.sh # 所有机器刷新 source /etc/profile1.3 启动Kafka# 1. 启动zookeeper zookeeper.sh start # 2. 所有机器启动Kafka cd /component/kafka/bin kafka-server-start.sh -daemon config/server.properties # 每一个节点上运行一次 # 3. 关闭Kafka kafka-server-stop.sh # 每一个节点上运行一次1.4 群起Kafkavim ~/bin/kafka.sh【加执行权限】#! /bin/bash case $1 in start){ for i in zx101 zx102 zx103 do echo --------启动 $i Kafka------- ssh $i /component/kafka/bin/kafka-server-start.sh -daemon /component/kafka/config/server.properties done };; stop){ for i in zx101 zx102 zx103 do echo --------停止 $i Kafka------- ssh $i /component/kafka/bin/kafka-server-stop.sh done };; esaccd /home/zx101/bin/ chmod x kafka.shzookeeper报错处理使用zookeeper.sh群起zookeeper集群时报错解决办法。[kyiredicketnewdata1 ~]$ zookeeper.sh start[ERROR] Missing peer id argument. Usage: zookeeper.sh ((start|start-foreground) peer-id)|stop|stop-all.把zookeeper.sh改个名字改成zeek.sh我的就好了。kafka 安装异常进入Zookeeper的客户端] zkCli.sh) ls /# 按顺序删除以下内容deleteall /admindeleteall /brokersdeleteall /clusterdeleteall /configdeleteall /consumersdeleteall /controllerdeleteall /controller_epochdeleteall /isr_change_notificationdeleteall /latest_producer_id_blockdeleteall /log_dir_event_notification# 把3台Kafka的logs下面的文件全部删掉logs] rm -rf ./*2. Kafka基础命令# 查看当前节点的topic bin/kafka-topics.sh --bootstrap-server newdata1:9092 --list # 创建topic名为first bin/kafka-topics.sh --bootstrap-server newdata1:9092 --create --partitions 1 --replication-factor 3 --topic first # 查看topic的主题详情 bin/kafka-topics.sh --bootstrap-server newdata1:9092 --describe --topic first # 删除topic bin/kafka-topics.sh --bootstrap-server newdata1:9092 --delete --topic first # 生产发送消息 bin/kafka-console-producer.sh --bootstrap-server newdata1:9092 --topic first # 消费者查看消息每次运行这个命令消费者组就会增加一个即使退出了也会存在 bin/kafka-console-consumer.sh --bootstrap-server newdata1:9092 --topic first # 可以手动增加group的消费者组 bin/kafka-console-consumer.sh --bootstrap-server newdata1:9092 --topic first --group group_consumer_1234 # 消费者查看历史所有消息 bin/kafka-console-consumer.sh --bootstrap-server newdata1:9092 --from-beginning --topic first3. 安装Kafka可视化监控进入EFAK官网下载Eagle (http://download.kafka-eagle.org/)3.1 解压、安装MySQL如果已安装过就不需再安装安装过程见【Hive安装】。3.2 解压、安装Kafka-Eagle只需安装在mysql同一台上3.2.1 修改kafka配置、分发# 修改kafka中的配置改完之后要分发到其他节点 vim bin/kafka-server-start.sh # 以前的内容 if [ x$KAFKA_HEAP_OPTS x ]; then export KAFKA_HEAP_OPTS-Xmx1G -Xms1G fi # 改成如下内容 if [ x$KAFKA_HEAP_OPTS x ]; then export KAFKA_HEAP_OPTS-server -Xms2G -Xmx2G -XX:PermSize128m -XX:UseG1GC -XX:MaxGCPauseMillis200 -XX:ParallelGCThreads8 -XX:ConcGCThreads5 -XX:InitiatingHeapOccupancyPercent70 export JMX_PORT9999 #export KAFKA_HEAP_OPTS-Xmx1G -Xms1G fi 3.2.2 解压、安装Kafka-Eagle有两步解压比其他组件多一步骤。# 第一步解压 tar -zxvf kafka-eagle-bin-2.0.8.tar.gz -C /component/packages # 第二步解压 tar -zxvf efak-web-2.0.8-bin.tar.gz -C /component # 修改名字看个人习惯找到该文件对应的地方修改。其他的不用改。# 修改配置文件 cd /component/efak/conf vim system-config.properties ###################################### # multi zookeeper kafka cluster list # Settings prefixed with kafka.eagle. will be deprecated, use efak. instead ###################################### efak.zk.cluster.aliascluster1 cluster1.zk.listnewdata1:2181,newdata2:2181,newdata3:2181/kafka # 这个按各自情况更改 ###################################### # kafka offset storage ###################################### # offset保存在kafka cluster1.efak.offset.storagekafka ###################################### # kafka sqlite jdbc driver address ###################################### # 配置mysql连接 efak.drivercom.mysql.jdbc.Driver efak.urljdbc:mysql://newdata1:3306/ke?useUnicodetruecharacterEncodingUTF-8zeroDateTimeBehaviorconvertToNull # 这个按各自情况更改还要在mysql库中建一个叫做ke的database efak.usernameroot # 这个按各自情况更改 efak.password123456 # 这个按各自情况更改 3.2.3 添加环境变量这个不用分发sudo vim /etc/profile.d/my_env.sh # kafkaEFAK export KE_HOME/component/efak export PATH$PATH:$KE_HOME/bin # 刷新 source /etc/profile3.2.4 启动、关闭efakkf.sh start kf.sh stop3.2.5 登录网页Windows上登录网页http://192.168.84.138:8048/ 【网页、账号密码会在启动efak时提供】4. Flink安装Flink on yarn4.1 Flink解压、安装。【选一台安装Flink我选了newdata1】tar -zxvf flink-1.13.0-bin-scala_2.12.tgz -C /component/ # 修改配置 vim flink-conf.yaml jobmanager.rpc.address: newdata1 vim workers newdata2 newdata3 1.2 修改环境变量sudo vim /etc/profile.d/my_env.sh # FLINK_HOME export HADOOP_CONF_DIR${HADOOP_HOME}/etc/hadoop export HADOOP_CLASSPATHhadoop classpath source /etc/profile1.3 创建Windows的flink-maven工程maven的配置和简单使用参考【https://blog.csdn.net/ERROR101/article/details/147761152?spm1011.2415.3001.5331】导入依赖properties flink.version1.13.0/flink.version java.version1.8/java.version scala.binary.version2.12/scala.binary.version slf4j.version1.7.30/slf4j.version /properties dependencies !-- 引入Flink相关依赖-- dependency groupIdorg.apache.flink/groupId artifactIdflink-java/artifactId version${flink.version}/version /dependency dependency groupIdorg.apache.flink/groupId artifactIdflink-streaming-java_${scala.binary.version}/artifactId version${flink.version}/version /dependency dependency groupIdorg.apache.flink/groupId artifactIdflink-clients_${scala.binary.version}/artifactId version${flink.version}/version /dependency !-- 引入日志管理相关依赖-- dependency groupIdorg.slf4j/groupId artifactIdslf4j-api/artifactId version${slf4j.version}/version /dependency dependency groupIdorg.slf4j/groupId artifactIdslf4j-log4j12/artifactId version${slf4j.version}/version /dependency dependency groupIdorg.apache.logging.log4j/groupId artifactIdlog4j-to-slf4j/artifactId version2.14.0/version /dependency /dependencies拉取依赖所需要的内容新增日志管理文件创建基础示例package com.zx; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.util.Arrays; public class SocketStreamWordCount { public static void main(String[] args) throws Exception { // 1. 创建流式执行环境 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 读取文本流newdata12表示发送端主机名、7777表示端口号 DataStreamSourceString lineStream env.socketTextStream(newdata1, 7777); // 3. 转换、分组、求和得到统计结果 SingleOutputStreamOperatorTuple2String, Long sum lineStream.flatMap((String line, CollectorTuple2String, Long out) - { String[] words line.split( ); for (String word : words) { out.collect(Tuple2.of(word, 1L)); } }).returns(Types.TUPLE(Types.STRING, Types.LONG)) .keyBy(data - data.f0) .sum(1); // 4. 打印 sum.print(); // 5. 执行 env.execute(); } }打包成jarflink_learn-1.0-SNAPSHOT.jar1.3 使用-会话模式1. 启动hdfsyarn集群如果是Hadoop高可用模式提前启动zookeeper服务cd /component/hadoop-3.1.3 sbin/start-dfs.sh sbin/start-yarn.sd2. 启动Flink会话模式bin/yarn-session.sh -nm test -d # -nm(--name)YARN UI显示的任务名 # -d 分离式如果不指定会一直占用这个窗口。试一试指定和不指定的效果 # 显示的INFO日志中最后面会显示web ui地址3. 提交作业在web ui页面上点击【Submit New Job】-【Add New】添加Windows系统下已经打好的Jar包。然后在虚拟机中启动一个netcat服务nc -lk 7777 # 启动后输入一些单词4. 显示、关闭application# test替换成启动的时候给作业取的名字。 yarn application -list | grep test ./bin/yarn-session.sh -id application_123456789_0001 -stop # 或者 yarn application -kill application_123456789_0001完。