Kafka、Flink安装,简单使用
内容来自尚硅谷、黑马、DeepSeek1. 安装Kafka1.1 下载安装包安装包名kafka_2.12-3.8.0.tgz下载地址apache-kafka-3.8.0安装包下载_开源镜像站-阿里云用xftp传到任意一个节点上。1.2 解压、配置Kafka1.2.1 解压改名cd /component/packages tar -zxvf kafka_2.12-3.8.0.tgz -C /component/ mv kafka_2.12-3.8.0/ kafka # 改名1.2.2 修改配置文件cd config/ vim server.properties #broker的全局唯一编号不能重复只能是数字。 broker.id0 #kafka运行日志(数据)存放的路径路径不需要提前创建kafka自动帮你创建可以配置多个磁盘路径路径与路径之间可以用分隔 log.dirs/component/kafka/datas #配置连接Zookeeper集群地址在zk根目录下创建/kafka方便管理 zookeeper.connectzx101:2181,zx102:2181,zx103:2181/kafka 1.2.3 分发Kafka包第一节点broker.id0第二节点broker.id1第三节点broker.id2cd /component xsync kafka # 修改第二台、第三台节点的brokerid #broker的全局唯一编号不能重复只能是数字。 broker.id21.2.4 修改/etc/profile.d/my_env.shsudo vim /etc/profile.d/my_env.sh #KAFKA_HOME export KAFKA_HOME/component/kafka export PATH$PATH:$KAFKA_HOME/bin # 分发 sudo ~/bin/xsync /etc/profile.d/my_env.sh # 所有机器刷新 source /etc/profile1.3 启动Kafka# 1. 启动zookeeper zookeeper.sh start # 2. 所有机器启动Kafka cd /component/kafka/bin kafka-server-start.sh -daemon config/server.properties # 每一个节点上运行一次 # 3. 关闭Kafka kafka-server-stop.sh # 每一个节点上运行一次1.4 群起Kafkavim ~/bin/kafka.sh【加执行权限】#! /bin/bash case $1 in start){ for i in zx101 zx102 zx103 do echo --------启动 $i Kafka------- ssh $i /component/kafka/bin/kafka-server-start.sh -daemon /component/kafka/config/server.properties done };; stop){ for i in zx101 zx102 zx103 do echo --------停止 $i Kafka------- ssh $i /component/kafka/bin/kafka-server-stop.sh done };; esaccd /home/zx101/bin/ chmod x kafka.shzookeeper报错处理使用zookeeper.sh群起zookeeper集群时报错解决办法。[kyiredicketnewdata1 ~]$ zookeeper.sh start[ERROR] Missing peer id argument. Usage: zookeeper.sh ((start|start-foreground) peer-id)|stop|stop-all.把zookeeper.sh改个名字改成zeek.sh我的就好了。kafka 安装异常进入Zookeeper的客户端] zkCli.sh) ls /# 按顺序删除以下内容deleteall /admindeleteall /brokersdeleteall /clusterdeleteall /configdeleteall /consumersdeleteall /controllerdeleteall /controller_epochdeleteall /isr_change_notificationdeleteall /latest_producer_id_blockdeleteall /log_dir_event_notification# 把3台Kafka的logs下面的文件全部删掉logs] rm -rf ./*2. Kafka基础命令# 查看当前节点的topic bin/kafka-topics.sh --bootstrap-server newdata1:9092 --list # 创建topic名为first bin/kafka-topics.sh --bootstrap-server newdata1:9092 --create --partitions 1 --replication-factor 3 --topic first # 查看topic的主题详情 bin/kafka-topics.sh --bootstrap-server newdata1:9092 --describe --topic first # 删除topic bin/kafka-topics.sh --bootstrap-server newdata1:9092 --delete --topic first # 生产发送消息 bin/kafka-console-producer.sh --bootstrap-server newdata1:9092 --topic first # 消费者查看消息每次运行这个命令消费者组就会增加一个即使退出了也会存在 bin/kafka-console-consumer.sh --bootstrap-server newdata1:9092 --topic first # 可以手动增加group的消费者组 bin/kafka-console-consumer.sh --bootstrap-server newdata1:9092 --topic first --group group_consumer_1234 # 消费者查看历史所有消息 bin/kafka-console-consumer.sh --bootstrap-server newdata1:9092 --from-beginning --topic first3. 安装Kafka可视化监控进入EFAK官网下载Eagle (http://download.kafka-eagle.org/)3.1 解压、安装MySQL如果已安装过就不需再安装安装过程见【Hive安装】。3.2 解压、安装Kafka-Eagle只需安装在mysql同一台上3.2.1 修改kafka配置、分发# 修改kafka中的配置改完之后要分发到其他节点 vim bin/kafka-server-start.sh # 以前的内容 if [ x$KAFKA_HEAP_OPTS x ]; then export KAFKA_HEAP_OPTS-Xmx1G -Xms1G fi # 改成如下内容 if [ x$KAFKA_HEAP_OPTS x ]; then export KAFKA_HEAP_OPTS-server -Xms2G -Xmx2G -XX:PermSize128m -XX:UseG1GC -XX:MaxGCPauseMillis200 -XX:ParallelGCThreads8 -XX:ConcGCThreads5 -XX:InitiatingHeapOccupancyPercent70 export JMX_PORT9999 #export KAFKA_HEAP_OPTS-Xmx1G -Xms1G fi 3.2.2 解压、安装Kafka-Eagle有两步解压比其他组件多一步骤。# 第一步解压 tar -zxvf kafka-eagle-bin-2.0.8.tar.gz -C /component/packages # 第二步解压 tar -zxvf efak-web-2.0.8-bin.tar.gz -C /component # 修改名字看个人习惯找到该文件对应的地方修改。其他的不用改。# 修改配置文件 cd /component/efak/conf vim system-config.properties ###################################### # multi zookeeper kafka cluster list # Settings prefixed with kafka.eagle. will be deprecated, use efak. instead ###################################### efak.zk.cluster.aliascluster1 cluster1.zk.listnewdata1:2181,newdata2:2181,newdata3:2181/kafka # 这个按各自情况更改 ###################################### # kafka offset storage ###################################### # offset保存在kafka cluster1.efak.offset.storagekafka ###################################### # kafka sqlite jdbc driver address ###################################### # 配置mysql连接 efak.drivercom.mysql.jdbc.Driver efak.urljdbc:mysql://newdata1:3306/ke?useUnicodetruecharacterEncodingUTF-8zeroDateTimeBehaviorconvertToNull # 这个按各自情况更改还要在mysql库中建一个叫做ke的database efak.usernameroot # 这个按各自情况更改 efak.password123456 # 这个按各自情况更改 3.2.3 添加环境变量这个不用分发sudo vim /etc/profile.d/my_env.sh # kafkaEFAK export KE_HOME/component/efak export PATH$PATH:$KE_HOME/bin # 刷新 source /etc/profile3.2.4 启动、关闭efakkf.sh start kf.sh stop3.2.5 登录网页Windows上登录网页http://192.168.84.138:8048/ 【网页、账号密码会在启动efak时提供】4. Flink安装Flink on yarn4.1 Flink解压、安装。【选一台安装Flink我选了newdata1】tar -zxvf flink-1.13.0-bin-scala_2.12.tgz -C /component/ # 修改配置 vim flink-conf.yaml jobmanager.rpc.address: newdata1 vim workers newdata2 newdata3 1.2 修改环境变量sudo vim /etc/profile.d/my_env.sh # FLINK_HOME export HADOOP_CONF_DIR${HADOOP_HOME}/etc/hadoop export HADOOP_CLASSPATHhadoop classpath source /etc/profile1.3 创建Windows的flink-maven工程maven的配置和简单使用参考【https://blog.csdn.net/ERROR101/article/details/147761152?spm1011.2415.3001.5331】导入依赖properties flink.version1.13.0/flink.version java.version1.8/java.version scala.binary.version2.12/scala.binary.version slf4j.version1.7.30/slf4j.version /properties dependencies !-- 引入Flink相关依赖-- dependency groupIdorg.apache.flink/groupId artifactIdflink-java/artifactId version${flink.version}/version /dependency dependency groupIdorg.apache.flink/groupId artifactIdflink-streaming-java_${scala.binary.version}/artifactId version${flink.version}/version /dependency dependency groupIdorg.apache.flink/groupId artifactIdflink-clients_${scala.binary.version}/artifactId version${flink.version}/version /dependency !-- 引入日志管理相关依赖-- dependency groupIdorg.slf4j/groupId artifactIdslf4j-api/artifactId version${slf4j.version}/version /dependency dependency groupIdorg.slf4j/groupId artifactIdslf4j-log4j12/artifactId version${slf4j.version}/version /dependency dependency groupIdorg.apache.logging.log4j/groupId artifactIdlog4j-to-slf4j/artifactId version2.14.0/version /dependency /dependencies拉取依赖所需要的内容新增日志管理文件创建基础示例package com.zx; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.util.Arrays; public class SocketStreamWordCount { public static void main(String[] args) throws Exception { // 1. 创建流式执行环境 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 读取文本流newdata12表示发送端主机名、7777表示端口号 DataStreamSourceString lineStream env.socketTextStream(newdata1, 7777); // 3. 转换、分组、求和得到统计结果 SingleOutputStreamOperatorTuple2String, Long sum lineStream.flatMap((String line, CollectorTuple2String, Long out) - { String[] words line.split( ); for (String word : words) { out.collect(Tuple2.of(word, 1L)); } }).returns(Types.TUPLE(Types.STRING, Types.LONG)) .keyBy(data - data.f0) .sum(1); // 4. 打印 sum.print(); // 5. 执行 env.execute(); } }打包成jarflink_learn-1.0-SNAPSHOT.jar1.3 使用-会话模式1. 启动hdfsyarn集群如果是Hadoop高可用模式提前启动zookeeper服务cd /component/hadoop-3.1.3 sbin/start-dfs.sh sbin/start-yarn.sd2. 启动Flink会话模式bin/yarn-session.sh -nm test -d # -nm(--name)YARN UI显示的任务名 # -d 分离式如果不指定会一直占用这个窗口。试一试指定和不指定的效果 # 显示的INFO日志中最后面会显示web ui地址3. 提交作业在web ui页面上点击【Submit New Job】-【Add New】添加Windows系统下已经打好的Jar包。然后在虚拟机中启动一个netcat服务nc -lk 7777 # 启动后输入一些单词4. 显示、关闭application# test替换成启动的时候给作业取的名字。 yarn application -list | grep test ./bin/yarn-session.sh -id application_123456789_0001 -stop # 或者 yarn application -kill application_123456789_0001完。

相关新闻

软考 系统架构设计师系列知识点之杂项集萃(75)

软考 系统架构设计师系列知识点之杂项集萃(75)

接前一篇文章:软考 系统架构设计师系列知识点之杂项集萃(74) 第131题 在无噪声的情兄下,若某通信链路的带宽为3KHz,所采用的调制方法支持32种信号状态,则该通信链路的最大数据传输速率为()Kbps。 A. 100 B. 50 C. 30 D. 6 正确答案:C。 试题解析: 最大数据传输…

2026/7/3 19:58:13 阅读更多 →
软考 系统架构设计师系列知识点之杂项集萃(74)

软考 系统架构设计师系列知识点之杂项集萃(74)

接前一篇文章:软考 系统架构设计师系列知识点之杂项集萃(73) 第128题 监理活动的主要内容可以概括为“四控、三管、一协调”,其中四控包含()①质量控制②风险控制③投资控制④进度控制⑤范围控制⑥变更控制。 A. ①②③④ B. ①②④⑤ C. ①③④⑤ D. ①③④⑥ 正确答…

2026/5/17 12:40:00 阅读更多 →
计算机网络-2

计算机网络-2

1. OSI 7层模型1.1. 传输层传输层用于管理或控制端到端连接传输层控制了,连接或数据传输是否可靠,指定端口号可靠的连接(面向连接)tcp协议不可靠的连接(无连接)udp协议通过端口与对方连接(访问百…

2026/7/3 1:11:18 阅读更多 →

最新新闻

Umi-OCR深度配置与优化终极指南:从入门到精通的离线OCR解决方案

Umi-OCR深度配置与优化终极指南:从入门到精通的离线OCR解决方案

Umi-OCR深度配置与优化终极指南:从入门到精通的离线OCR解决方案 【免费下载链接】Umi-OCR OCR software, free and offline. 开源、免费的离线OCR软件。支持截屏/批量导入图片,PDF文档识别,排除水印/页眉页脚,扫描/生成二维码。内…

2026/7/3 20:49:24 阅读更多 →
STM32F373VC与KMR221的嵌入式电压管理系统设计

STM32F373VC与KMR221的嵌入式电压管理系统设计

1. KMR221与STM32F373VC的硬件协同设计在嵌入式电压管理系统中,KMR221作为一款高精度电压监测芯片,与STM32F373VC微控制器的配合使用构成了硬件设计的核心。KMR221具有16位ADC分辨率,支持0.1%的电压测量精度,其I2C接口与STM32F373…

2026/7/3 20:47:24 阅读更多 →
企业级AI编排:MuleSoft集成LLM的工程化实践

企业级AI编排:MuleSoft集成LLM的工程化实践

1. 项目概述:当企业级集成平台遇上大语言模型“AI Orchestration in Action: How MuleSoft and LLMs Fuel the Future of Enterprise AI”——这个标题不是一句空泛的营销口号,而是我在过去18个月里亲手搭建、上线并持续迭代的三个核心生产系统的真实写照…

2026/7/3 20:45:23 阅读更多 →
MuleSoft企业级AI编排:安全、可审计的大模型集成实践

MuleSoft企业级AI编排:安全、可审计的大模型集成实践

1. 项目概述:当企业级集成平台遇上大语言模型“AI Orchestration in Action: How MuleSoft and LLMs Fuel the Future of Enterprise AI”——这个标题不是一句空泛的行业口号,而是我在过去18个月里亲手落地的三个核心生产系统的真实写照。它讲的不是“用…

2026/7/3 20:45:23 阅读更多 →
如何彻底解决Windows 10/11中PL2303老芯片的驱动兼容性问题

如何彻底解决Windows 10/11中PL2303老芯片的驱动兼容性问题

如何彻底解决Windows 10/11中PL2303老芯片的驱动兼容性问题 【免费下载链接】pl2303-win10 Windows 10 driver for end-of-life PL-2303 chipsets. 项目地址: https://gitcode.com/gh_mirrors/pl/pl2303-win10 如果你在Windows 10或Windows 11系统中使用PL-2303 USB转串…

2026/7/3 20:43:22 阅读更多 →
Spring Boot集成Cassandra:高性能数据存储实战指南

Spring Boot集成Cassandra:高性能数据存储实战指南

1. 为什么选择 Cassandra 作为 Spring Boot 的数据存储方案在分布式系统架构设计中,数据库选型往往直接决定了系统的扩展上限。三年前我在处理一个物联网平台项目时,曾面临日均千万级设备状态写入的挑战。当时测试了多种数据库方案,最终 Cass…

2026/7/3 20:43:22 阅读更多 →

日新闻

Nginx防御TLS重协商攻击实战:从原理到配置与监控

Nginx防御TLS重协商攻击实战:从原理到配置与监控

1. 项目概述:为什么TLS重协商攻击至今仍需警惕十多年前的CVE-2011-1473,一个关于TLS/SSL协议重协商机制的漏洞,现在提起来还有必要吗?很多运维和开发朋友可能会觉得,这都老掉牙了,现代服务器和客户端不都默…

2026/7/3 0:03:59 阅读更多 →
华为防火墙双通道远程管理实战:Web与SSH配置详解

华为防火墙双通道远程管理实战:Web与SSH配置详解

1. 项目概述:为什么需要双通道远程管理防火墙?在任何一个稍具规模的企业网络里,防火墙都是那个默默守护在边界的关键角色。作为网络工程师,我们不可能每次都跑到机房,插上console线去配置它。远程管理能力,…

2026/7/3 0:03:59 阅读更多 →
AD74413R与PIC18F65K40的高精度工业数据采集方案

AD74413R与PIC18F65K40的高精度工业数据采集方案

1. 项目概述:AD74413R与PIC18F65K40的协同工作在工业自动化和精密测量领域,同时实现高精度模数转换(ADC)和数模转换(DAC)功能是许多复杂系统的核心需求。AD74413R作为一款四通道可配置模拟输入/输出器件,与PIC18F65K40微控制器的组合&#xf…

2026/7/3 0:05:59 阅读更多 →

周新闻

月新闻