Kafka、Flink安装,简单使用
内容来自尚硅谷、黑马、DeepSeek1. 安装Kafka1.1 下载安装包安装包名kafka_2.12-3.8.0.tgz下载地址apache-kafka-3.8.0安装包下载_开源镜像站-阿里云用xftp传到任意一个节点上。1.2 解压、配置Kafka1.2.1 解压改名cd /component/packages tar -zxvf kafka_2.12-3.8.0.tgz -C /component/ mv kafka_2.12-3.8.0/ kafka # 改名1.2.2 修改配置文件cd config/ vim server.properties #broker的全局唯一编号不能重复只能是数字。 broker.id0 #kafka运行日志(数据)存放的路径路径不需要提前创建kafka自动帮你创建可以配置多个磁盘路径路径与路径之间可以用分隔 log.dirs/component/kafka/datas #配置连接Zookeeper集群地址在zk根目录下创建/kafka方便管理 zookeeper.connectzx101:2181,zx102:2181,zx103:2181/kafka 1.2.3 分发Kafka包第一节点broker.id0第二节点broker.id1第三节点broker.id2cd /component xsync kafka # 修改第二台、第三台节点的brokerid #broker的全局唯一编号不能重复只能是数字。 broker.id21.2.4 修改/etc/profile.d/my_env.shsudo vim /etc/profile.d/my_env.sh #KAFKA_HOME export KAFKA_HOME/component/kafka export PATH$PATH:$KAFKA_HOME/bin # 分发 sudo ~/bin/xsync /etc/profile.d/my_env.sh # 所有机器刷新 source /etc/profile1.3 启动Kafka# 1. 启动zookeeper zookeeper.sh start # 2. 所有机器启动Kafka cd /component/kafka/bin kafka-server-start.sh -daemon config/server.properties # 每一个节点上运行一次 # 3. 关闭Kafka kafka-server-stop.sh # 每一个节点上运行一次1.4 群起Kafkavim ~/bin/kafka.sh【加执行权限】#! /bin/bash case $1 in start){ for i in zx101 zx102 zx103 do echo --------启动 $i Kafka------- ssh $i /component/kafka/bin/kafka-server-start.sh -daemon /component/kafka/config/server.properties done };; stop){ for i in zx101 zx102 zx103 do echo --------停止 $i Kafka------- ssh $i /component/kafka/bin/kafka-server-stop.sh done };; esaccd /home/zx101/bin/ chmod x kafka.shzookeeper报错处理使用zookeeper.sh群起zookeeper集群时报错解决办法。[kyiredicketnewdata1 ~]$ zookeeper.sh start[ERROR] Missing peer id argument. Usage: zookeeper.sh ((start|start-foreground) peer-id)|stop|stop-all.把zookeeper.sh改个名字改成zeek.sh我的就好了。kafka 安装异常进入Zookeeper的客户端] zkCli.sh) ls /# 按顺序删除以下内容deleteall /admindeleteall /brokersdeleteall /clusterdeleteall /configdeleteall /consumersdeleteall /controllerdeleteall /controller_epochdeleteall /isr_change_notificationdeleteall /latest_producer_id_blockdeleteall /log_dir_event_notification# 把3台Kafka的logs下面的文件全部删掉logs] rm -rf ./*2. Kafka基础命令# 查看当前节点的topic bin/kafka-topics.sh --bootstrap-server newdata1:9092 --list # 创建topic名为first bin/kafka-topics.sh --bootstrap-server newdata1:9092 --create --partitions 1 --replication-factor 3 --topic first # 查看topic的主题详情 bin/kafka-topics.sh --bootstrap-server newdata1:9092 --describe --topic first # 删除topic bin/kafka-topics.sh --bootstrap-server newdata1:9092 --delete --topic first # 生产发送消息 bin/kafka-console-producer.sh --bootstrap-server newdata1:9092 --topic first # 消费者查看消息每次运行这个命令消费者组就会增加一个即使退出了也会存在 bin/kafka-console-consumer.sh --bootstrap-server newdata1:9092 --topic first # 可以手动增加group的消费者组 bin/kafka-console-consumer.sh --bootstrap-server newdata1:9092 --topic first --group group_consumer_1234 # 消费者查看历史所有消息 bin/kafka-console-consumer.sh --bootstrap-server newdata1:9092 --from-beginning --topic first3. 安装Kafka可视化监控进入EFAK官网下载Eagle (http://download.kafka-eagle.org/)3.1 解压、安装MySQL如果已安装过就不需再安装安装过程见【Hive安装】。3.2 解压、安装Kafka-Eagle只需安装在mysql同一台上3.2.1 修改kafka配置、分发# 修改kafka中的配置改完之后要分发到其他节点 vim bin/kafka-server-start.sh # 以前的内容 if [ x$KAFKA_HEAP_OPTS x ]; then export KAFKA_HEAP_OPTS-Xmx1G -Xms1G fi # 改成如下内容 if [ x$KAFKA_HEAP_OPTS x ]; then export KAFKA_HEAP_OPTS-server -Xms2G -Xmx2G -XX:PermSize128m -XX:UseG1GC -XX:MaxGCPauseMillis200 -XX:ParallelGCThreads8 -XX:ConcGCThreads5 -XX:InitiatingHeapOccupancyPercent70 export JMX_PORT9999 #export KAFKA_HEAP_OPTS-Xmx1G -Xms1G fi 3.2.2 解压、安装Kafka-Eagle有两步解压比其他组件多一步骤。# 第一步解压 tar -zxvf kafka-eagle-bin-2.0.8.tar.gz -C /component/packages # 第二步解压 tar -zxvf efak-web-2.0.8-bin.tar.gz -C /component # 修改名字看个人习惯找到该文件对应的地方修改。其他的不用改。# 修改配置文件 cd /component/efak/conf vim system-config.properties ###################################### # multi zookeeper kafka cluster list # Settings prefixed with kafka.eagle. will be deprecated, use efak. instead ###################################### efak.zk.cluster.aliascluster1 cluster1.zk.listnewdata1:2181,newdata2:2181,newdata3:2181/kafka # 这个按各自情况更改 ###################################### # kafka offset storage ###################################### # offset保存在kafka cluster1.efak.offset.storagekafka ###################################### # kafka sqlite jdbc driver address ###################################### # 配置mysql连接 efak.drivercom.mysql.jdbc.Driver efak.urljdbc:mysql://newdata1:3306/ke?useUnicodetruecharacterEncodingUTF-8zeroDateTimeBehaviorconvertToNull # 这个按各自情况更改还要在mysql库中建一个叫做ke的database efak.usernameroot # 这个按各自情况更改 efak.password123456 # 这个按各自情况更改 3.2.3 添加环境变量这个不用分发sudo vim /etc/profile.d/my_env.sh # kafkaEFAK export KE_HOME/component/efak export PATH$PATH:$KE_HOME/bin # 刷新 source /etc/profile3.2.4 启动、关闭efakkf.sh start kf.sh stop3.2.5 登录网页Windows上登录网页http://192.168.84.138:8048/ 【网页、账号密码会在启动efak时提供】4. Flink安装Flink on yarn4.1 Flink解压、安装。【选一台安装Flink我选了newdata1】tar -zxvf flink-1.13.0-bin-scala_2.12.tgz -C /component/ # 修改配置 vim flink-conf.yaml jobmanager.rpc.address: newdata1 vim workers newdata2 newdata3 1.2 修改环境变量sudo vim /etc/profile.d/my_env.sh # FLINK_HOME export HADOOP_CONF_DIR${HADOOP_HOME}/etc/hadoop export HADOOP_CLASSPATHhadoop classpath source /etc/profile1.3 创建Windows的flink-maven工程maven的配置和简单使用参考【https://blog.csdn.net/ERROR101/article/details/147761152?spm1011.2415.3001.5331】导入依赖properties flink.version1.13.0/flink.version java.version1.8/java.version scala.binary.version2.12/scala.binary.version slf4j.version1.7.30/slf4j.version /properties dependencies !-- 引入Flink相关依赖-- dependency groupIdorg.apache.flink/groupId artifactIdflink-java/artifactId version${flink.version}/version /dependency dependency groupIdorg.apache.flink/groupId artifactIdflink-streaming-java_${scala.binary.version}/artifactId version${flink.version}/version /dependency dependency groupIdorg.apache.flink/groupId artifactIdflink-clients_${scala.binary.version}/artifactId version${flink.version}/version /dependency !-- 引入日志管理相关依赖-- dependency groupIdorg.slf4j/groupId artifactIdslf4j-api/artifactId version${slf4j.version}/version /dependency dependency groupIdorg.slf4j/groupId artifactIdslf4j-log4j12/artifactId version${slf4j.version}/version /dependency dependency groupIdorg.apache.logging.log4j/groupId artifactIdlog4j-to-slf4j/artifactId version2.14.0/version /dependency /dependencies拉取依赖所需要的内容新增日志管理文件创建基础示例package com.zx; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.util.Arrays; public class SocketStreamWordCount { public static void main(String[] args) throws Exception { // 1. 创建流式执行环境 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 读取文本流newdata12表示发送端主机名、7777表示端口号 DataStreamSourceString lineStream env.socketTextStream(newdata1, 7777); // 3. 转换、分组、求和得到统计结果 SingleOutputStreamOperatorTuple2String, Long sum lineStream.flatMap((String line, CollectorTuple2String, Long out) - { String[] words line.split( ); for (String word : words) { out.collect(Tuple2.of(word, 1L)); } }).returns(Types.TUPLE(Types.STRING, Types.LONG)) .keyBy(data - data.f0) .sum(1); // 4. 打印 sum.print(); // 5. 执行 env.execute(); } }打包成jarflink_learn-1.0-SNAPSHOT.jar1.3 使用-会话模式1. 启动hdfsyarn集群如果是Hadoop高可用模式提前启动zookeeper服务cd /component/hadoop-3.1.3 sbin/start-dfs.sh sbin/start-yarn.sd2. 启动Flink会话模式bin/yarn-session.sh -nm test -d # -nm(--name)YARN UI显示的任务名 # -d 分离式如果不指定会一直占用这个窗口。试一试指定和不指定的效果 # 显示的INFO日志中最后面会显示web ui地址3. 提交作业在web ui页面上点击【Submit New Job】-【Add New】添加Windows系统下已经打好的Jar包。然后在虚拟机中启动一个netcat服务nc -lk 7777 # 启动后输入一些单词4. 显示、关闭application# test替换成启动的时候给作业取的名字。 yarn application -list | grep test ./bin/yarn-session.sh -id application_123456789_0001 -stop # 或者 yarn application -kill application_123456789_0001完。

相关新闻

软考 系统架构设计师系列知识点之杂项集萃(75)

软考 系统架构设计师系列知识点之杂项集萃(75)

接前一篇文章:软考 系统架构设计师系列知识点之杂项集萃(74) 第131题 在无噪声的情兄下,若某通信链路的带宽为3KHz,所采用的调制方法支持32种信号状态,则该通信链路的最大数据传输速率为()Kbps。 A. 100 B. 50 C. 30 D. 6 正确答案:C。 试题解析: 最大数据传输…

2026/5/17 12:40:01 阅读更多 →
软考 系统架构设计师系列知识点之杂项集萃(74)

软考 系统架构设计师系列知识点之杂项集萃(74)

接前一篇文章:软考 系统架构设计师系列知识点之杂项集萃(73) 第128题 监理活动的主要内容可以概括为“四控、三管、一协调”,其中四控包含()①质量控制②风险控制③投资控制④进度控制⑤范围控制⑥变更控制。 A. ①②③④ B. ①②④⑤ C. ①③④⑤ D. ①③④⑥ 正确答…

2026/5/17 12:40:00 阅读更多 →
计算机网络-2

计算机网络-2

1. OSI 7层模型1.1. 传输层传输层用于管理或控制端到端连接传输层控制了,连接或数据传输是否可靠,指定端口号可靠的连接(面向连接)tcp协议不可靠的连接(无连接)udp协议通过端口与对方连接(访问百…

2026/7/3 1:11:18 阅读更多 →

最新新闻

告别英文困扰!GitHub Desktop中文汉化工具让你3分钟搞定界面翻译

告别英文困扰!GitHub Desktop中文汉化工具让你3分钟搞定界面翻译

告别英文困扰!GitHub Desktop中文汉化工具让你3分钟搞定界面翻译 【免费下载链接】GitHubDesktop2Chinese GithubDesktop语言本地化(汉化)工具 【GitHub桌面客户端中文汉化】 项目地址: https://gitcode.com/gh_mirrors/gi/GitHubDesktop2Chinese 还在为GitH…

2026/7/3 19:55:08 阅读更多 →
IS31FL3731与MKV42F256VLH16的LED矩阵控制实战

IS31FL3731与MKV42F256VLH16的LED矩阵控制实战

1. IS31FL3731与MKV42F256VLH16的硬件协同设计1.1 IS31FL3731 LED驱动芯片深度解析IS31FL3731这颗芯片在LED控制领域堪称"瑞士军刀",它能同时驱动144个LED(169矩阵配置)。我实际测试过,在5V工作电压下,每个L…

2026/7/3 19:55:08 阅读更多 →
容器故障检测新纪元:openeuler/cpds-agent核心采集组件深度解析

容器故障检测新纪元:openeuler/cpds-agent核心采集组件深度解析

容器故障检测新纪元:openeuler/cpds-agent核心采集组件深度解析 【免费下载链接】cpds-agent Collect Container info for Container Problem Detect System. 项目地址: https://gitcode.com/openeuler/cpds-agent 前往项目官网免费下载:https://…

2026/7/3 19:53:07 阅读更多 →
戴森球计划蓝图库实战指南:如何用FactoryBluePrints构建高效星际工厂

戴森球计划蓝图库实战指南:如何用FactoryBluePrints构建高效星际工厂

戴森球计划蓝图库实战指南:如何用FactoryBluePrints构建高效星际工厂 【免费下载链接】FactoryBluePrints 游戏戴森球计划的**工厂**蓝图仓库 项目地址: https://gitcode.com/GitHub_Trending/fa/FactoryBluePrints 在戴森球计划这款太空工厂模拟游戏中&…

2026/7/3 19:53:07 阅读更多 →
LENA-R8与STM32F427ZI构建全球连接与高精度定位系统

LENA-R8与STM32F427ZI构建全球连接与高精度定位系统

1. LENA-R8与STM32F427ZI的硬件组合解析这个项目最吸引人的地方在于将LENA-R8蜂窝通信模块与STM32F427ZI高性能MCU相结合,构建了一个既能实现全球网络连接又能进行高精度位置跟踪的嵌入式系统。我们先拆解这两个核心硬件:LENA-R8是u-blox推出的多模LTE C…

2026/7/3 19:51:07 阅读更多 →
免费开源项目文档:基于BP神经网络的雾霾天气交通标志识别系统设计与实现

免费开源项目文档:基于BP神经网络的雾霾天气交通标志识别系统设计与实现

摘要:随着国民经济的持续发展和城市化进程的不断推进,机动车保有量呈现出快速增长的态势,随之而来的交通安全问题也日益突出。交通标志作为道路交通系统中传递管理信息、规范驾驶行为的重要载体,其能否被驾驶员及时、准确地识别&a…

2026/7/3 19:51:07 阅读更多 →

日新闻

Nginx防御TLS重协商攻击实战:从原理到配置与监控

Nginx防御TLS重协商攻击实战:从原理到配置与监控

1. 项目概述:为什么TLS重协商攻击至今仍需警惕十多年前的CVE-2011-1473,一个关于TLS/SSL协议重协商机制的漏洞,现在提起来还有必要吗?很多运维和开发朋友可能会觉得,这都老掉牙了,现代服务器和客户端不都默…

2026/7/3 0:03:59 阅读更多 →
华为防火墙双通道远程管理实战:Web与SSH配置详解

华为防火墙双通道远程管理实战:Web与SSH配置详解

1. 项目概述:为什么需要双通道远程管理防火墙?在任何一个稍具规模的企业网络里,防火墙都是那个默默守护在边界的关键角色。作为网络工程师,我们不可能每次都跑到机房,插上console线去配置它。远程管理能力,…

2026/7/3 0:03:59 阅读更多 →
AD74413R与PIC18F65K40的高精度工业数据采集方案

AD74413R与PIC18F65K40的高精度工业数据采集方案

1. 项目概述:AD74413R与PIC18F65K40的协同工作在工业自动化和精密测量领域,同时实现高精度模数转换(ADC)和数模转换(DAC)功能是许多复杂系统的核心需求。AD74413R作为一款四通道可配置模拟输入/输出器件,与PIC18F65K40微控制器的组合&#xf…

2026/7/3 0:05:59 阅读更多 →

周新闻

月新闻