技术标签: zookeeper kafka big data
1,下载kafka
kafka_2.12-3.0 版本
2,下载zookeeper
apache-zookeeper-3.5.9 版本
3,解压zookeeper
tar -zxvf zpache-zookeeper
然后保存退出配置环境变量
vi /etc/profile
4,存放路径
export ZOOKEEPER_INSTALL=/root/zookeeper/
export PATH=$PATH:$ZOOKEEPER_INSTALL/bin
5,立即生效
source /etc/profile
6,修改zookeeper配置文件
cd /zookeeper/conf/zoo......cfg
这里要改为
mv zoo-....cfg zoo.cfg 改名
7,在修改配置文件 vi zoo.cfg
这两个必须加上,没有路径mkdir新建data log就行了
dataDir=/tmp/zookeeper/data
dataLogDir=/tmp/zookeeper/log
8,最后启动,来到bin/目录下的
./zkServer.sh start
查看是否安装成功
jps
9,yum安装jdk快速安装配置
yum install -y java-1.8.0-openjdk-devel.x86_64
10.关闭防火墙
systemctl stop firewalld
11,解压kafka
tar -zxvf kafka_2.12-3.0
12,进入到kafka的配置文件
cd kafka/conf/
# 超级重要允许外部访问的-----这是线上的配置-------------第一种配置----------------
# 阿里云私网ip
listeners=PLAINTEXT://172.22.239.137:9092
# 阿里云公网ip
advertised.host.name=8.135.54.87
# 阿里云私网ip
host.name=172.22.239.137
# 映射端口
prot=9092
# 底部的zookeeper 配置
zookeeper.connect=172.22.239.137:2181
# 超级重要允许外部访问的-----这是本地虚拟机的配置-------------第二种配置----------------
这里的ip地址均为虚拟机ip地址,不可变动
# 阿里云私网ip
listeners=PLAINTEXT://172.22.239.137:9092
# 底部的zookeeper 配置
zookeeper.connect=172.22.239.137:2181
13,全局启动kafka
./kafka-server-start.sh -daemon ../config/server.properties
14,启动成功后创建对应的主体/生产者/消费者
高版本kafka创建主体---------
./kafka/bin/kafka-topics.sh --create --bootstrap-server 192.168.1.158:9092 --replication-factor 1 --partitions 1 --topic stationTopic
./kafka/bin/kafka-topics.sh --create --bootstrap-server 192.168.1.158:9092 --replication-factor 1 --partitions 1 --topic etlTopic
创建生产者---------------------------------------------------------------------------
#模拟生产者
./kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic stationTopic
./kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic etlTopic
创建消费者…
#模拟消费者
./kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.158:9092 --topic stationTopic --from-beginning
./kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.158:9092 --topic etlTopic --from-beginning
1,这里3.0 3.2都可以对应配置hadoop的环境变量
spark-3.2.0-bin-hadoop3.2
spark下载地址
http://spark.apache.org/downloads.html
hadoop下载地址
https://hadoop.apache.org/releases.html
https://dlcdn.apache.org/hadoop/common/hadoop-3.2.2/hadoop-3.2.2-src.tar.gz
2,hadoop必须下载的文件,因为这是linux的解压文件没有windows启动命令,所以需要下载
下载地址
hadoop.dll文件下载地址:
链接:https://pan.baidu.com/s/1Rb5ROUQMSqp7SeQINlLZkA 提取码:n8t6
3,spark和hadoop文件配置在windos环境变量中
4,下载scalasdk并配置环境变量
下载地址
https://get-coursier.io/docs/cli-installation
打开后滑动到最底部下载对应的版本 windows linxu mac.........
下载完后配置环境变量,同上spark hadoop
5,打开idea编辑器,选择所用版本,
6,新建maven环境并导入依赖
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>apache</id>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<properties>
<encoding>UTF-8</encoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.version>2.12.11</scala.version>
<spark.version>3.0.1</spark.version>
<hadoop.version>2.7.5</hadoop.version>
</properties>
<dependencies>
<!--依赖Scala语言-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${
scala.version}</version>
</dependency>
<!--SparkCore依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${
spark.version}</version>
</dependency>
<!-- spark-streaming-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>${
spark.version}</version>
</dependency>
<!--spark-streaming+Kafka依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>${
spark.version}</version>
</dependency>
<!--SparkSQL依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${
spark.version}</version>
</dependency>
<!--SparkSQL+ Hive依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>${
spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_2.12</artifactId>
<version>${
spark.version}</version>
</dependency>
<!--StructuredStreaming+Kafka依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>${
spark.version}</version>
</dependency>
<!-- SparkMlLib机器学习模块,里面有ALS推荐算法-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.12</artifactId>
<version>${
spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>com.hankcs</groupId>
<artifactId>hanlp</artifactId>
<version>portable-1.7.7</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<plugins>
<!-- 指定编译java的插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
<!-- 指定编译scala的插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${
project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
7,修改maven 的地址,以及setting.xml 以及repo包存放路径
8,创建一个scala模拟数据文件
MockStationLog
stationTopic是在linux创建的主题,是往kafka里面发的
bootstrap.servers 对应是kafka的路径
package cn.itcast.structured
import java.util.Properties
import org.apache.kafka.clients.producer.{
KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import scala.util.Random
/**
* 模拟产生基站日志数据,实时发送Kafka Topic中
* 数据字段信息:
* 基站标识符ID, 主叫号码, 被叫号码, 通话状态, 通话时间,通话时长
*/
object MockStationLog {
def main(args: Array[String]): Unit = {
// 发送Kafka Topic
val props = new Properties()
props.put("bootstrap.servers", "192.168.1.158:9092")
props.put("acks", "1")
props.put("retries", "3")
props.put("key.serializer", classOf[StringSerializer].getName)
props.put("value.serializer", classOf[StringSerializer].getName)
val producer = new KafkaProducer[String, String](props)
val random = new Random()
val allStatus = Array(
"fail", "busy", "barring", "success", "success", "success",
"success", "success", "success", "success", "success", "success"
)
while (true) {
val callOut: String = "1860000%04d".format(random.nextInt(10000))
val callIn: String = "1890000%04d".format(random.nextInt(10000))
val callStatus: String = allStatus(random.nextInt(allStatus.length))
val callDuration = if ("success".equals(callStatus)) (1 + random.nextInt(10)) * 1000L else 0L
// 随机产生一条基站日志数据
val stationLog: StationLog = StationLog(
"station_" + random.nextInt(10),
callOut,
callIn,
callStatus,
System.currentTimeMillis(),
callDuration
)
println(stationLog.toString)
Thread.sleep(100 + random.nextInt(100))
val record = new ProducerRecord[String, String]("stationTopic", stationLog.toString)
producer.send(record)
}
producer.close() // 关闭连接
}
/**
* 基站通话日志数据
*/
case class StationLog(
stationId: String, //基站标识符ID
callOut: String, //主叫号码
callIn: String, //被叫号码
callStatus: String, //通话状态
callTime: Long, //通话时间
duration: Long //通话时长
) {
override def toString: String = {
s"$stationId,$callOut,$callIn,$callStatus,$callTime,$duration"
}
}
}
9,创建过滤消费者…Demo09_Kafka_ETL
//TODO 1.加载数据-kafka-stationTopic 实在kafka拉取模拟发送来得大数据,从kafka里面拉取数据来过滤清洗
//TODO 2.处理数据-ETL-过滤出success的数据 需要过滤带有那些字符串的数据…
//TODO 3.输出结果-kafka-etlTopic 是往kafka另外创建的一个主题里面发送过滤完的集合信息,方便业务主体1业务主体2业务主体3不通项目组拉取数据做展示…相当于过滤完存放在另一个队列,等着其他业务部门拉取数据
package cn.itcast.structured
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{
DataFrame, Dataset, Row, SparkSession}
/**
* Author itcast
* Desc 演示StructuredStreaming整合Kafka,
* 从stationTopic消费数据 -->使用StructuredStreaming进行ETL-->将ETL的结果写入到etlTopic
*/
object Demo09_Kafka_ETL {
def main(args: Array[String]): Unit = {
//TODO 0.创建环境
//因为StructuredStreaming基于SparkSQL的且编程API/数据抽象是DataFrame/DataSet,所以这里创建SparkSession即可
val spark: SparkSession = SparkSession.builder().appName("sparksql").master("local[2]")
.config("spark.sql.shuffle.partitions", "4") //本次测试时将分区数设置小一点,实际开发中可以根据集群规模调整大小,默认200
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
import spark.implicits._
//TODO 1.加载数据-kafka-stationTopic
val kafkaDF: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.1.158:9092")
.option("subscribe", "stationTopic")
.load()
val valueDS: Dataset[String] = kafkaDF.selectExpr("CAST(value AS STRING)").as[String]
//TODO 2.处理数据-ETL-过滤出success的数据
val etlResult: Dataset[String] = valueDS.filter(_.contains("success"))
print(etlResult)
//TODO 3.输出结果-kafka-etlTopic
etlResult.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.1.158:9092")
.option("topic", "etlTopic")
.option("checkpointLocation", "L:\\sparkmllib\\checled")
//TODO 4.启动并等待结束
.start()
.awaitTermination()
print(etlResult)
//TODO 5.关闭资源
spark.stop()
}
}
//0.kafka准备好
//1.启动数据模拟程序
//2.启动控制台消费者方便观察
//3.启动Demo09_Kafka_ETL
**
**
文章浏览阅读645次。这个肯定是末尾的IDAT了,因为IDAT必须要满了才会开始一下个IDAT,这个明显就是末尾的IDAT了。,对应下面的create_head()代码。,对应下面的create_tail()代码。不要考虑爆破,我已经试了一下,太多情况了。题目来源:UNCTF。_攻防世界困难模式攻略图文
文章浏览阅读2.9k次,点赞3次,收藏10次。偶尔会用到,记录、分享。1. 数据库导出1.1 切换到dmdba用户su - dmdba1.2 进入达梦数据库安装路径的bin目录,执行导库操作 导出语句:./dexp cwy_init/[email protected]:5236 file=cwy_init.dmp log=cwy_init_exp.log 注释: cwy_init/init_123..._达梦数据库导入导出
文章浏览阅读1.9k次。1. 在官网上下载KindEditor文件,可以删掉不需要要到的jsp,asp,asp.net和php文件夹。接着把文件夹放到项目文件目录下。2. 修改html文件,在页面引入js文件:<script type="text/javascript" src="./kindeditor/kindeditor-all.js"></script><script type="text/javascript" src="./kindeditor/lang/zh-CN.js"_kindeditor.js
文章浏览阅读2.3k次,点赞6次,收藏14次。SPI的详情简介不必赘述。假设我们通过SPI发送0xAA,我们的数据线就会变为10101010,通过修改不同的内容,即可修改SPI中0和1的持续时间。比如0xF0即为前半周期为高电平,后半周期为低电平的状态。在SPI的通信模式中,CPHA配置会影响该实验,下图展示了不同采样位置的SPI时序图[1]。CPOL = 0,CPHA = 1:CLK空闲状态 = 低电平,数据在下降沿采样,并在上升沿移出CPOL = 0,CPHA = 0:CLK空闲状态 = 低电平,数据在上升沿采样,并在下降沿移出。_stm32g431cbu6
文章浏览阅读1.2k次,点赞2次,收藏8次。数据链路层习题自测问题1.数据链路(即逻辑链路)与链路(即物理链路)有何区别?“电路接通了”与”数据链路接通了”的区别何在?2.数据链路层中的链路控制包括哪些功能?试讨论数据链路层做成可靠的链路层有哪些优点和缺点。3.网络适配器的作用是什么?网络适配器工作在哪一层?4.数据链路层的三个基本问题(帧定界、透明传输和差错检测)为什么都必须加以解决?5.如果在数据链路层不进行帧定界,会发生什么问题?6.PPP协议的主要特点是什么?为什么PPP不使用帧的编号?PPP适用于什么情况?为什么PPP协议不_接收方收到链路层数据后,使用crc检验后,余数为0,说明链路层的传输时可靠传输
文章浏览阅读587次。软件测试工程师移民加拿大 无证移民,未受过软件工程师的教育(第1部分) (Undocumented Immigrant With No Education to Software Engineer(Part 1))Before I start, I want you to please bear with me on the way I write, I have very little gen...
文章浏览阅读304次。Thinkpad X250笔记本电脑,装的是FreeBSD,进入BIOS修改虚拟化配置(其后可能是误设置了安全开机),保存退出后系统无法启动,显示:secure boot failed ,把自己惊出一身冷汗,因为这台笔记本刚好还没开始做备份.....根据错误提示,到bios里面去找相关配置,在Security里面找到了Secure Boot选项,发现果然被设置为Enabled,将其修改为Disabled ,再开机,终于正常启动了。_安装完系统提示secureboot failure
文章浏览阅读10w+次,点赞93次,收藏352次。1、用strtok函数进行字符串分割原型: char *strtok(char *str, const char *delim);功能:分解字符串为一组字符串。参数说明:str为要分解的字符串,delim为分隔符字符串。返回值:从str开头开始的一个个被分割的串。当没有被分割的串时则返回NULL。其它:strtok函数线程不安全,可以使用strtok_r替代。示例://借助strtok实现split#include <string.h>#include <stdio.h&_c++ 字符串分割
文章浏览阅读2.3k次。1 .高斯日记 大数学家高斯有个好习惯:无论如何都要记日记。他的日记有个与众不同的地方,他从不注明年月日,而是用一个整数代替,比如:4210后来人们知道,那个整数就是日期,它表示那一天是高斯出生后的第几天。这或许也是个好习惯,它时时刻刻提醒着主人:日子又过去一天,还有多少时光可以用于浪费呢?高斯出生于:1777年4月30日。在高斯发现的一个重要定理的日记_2013年第四届c a组蓝桥杯省赛真题解答
文章浏览阅读851次,点赞17次,收藏22次。摘要:本文利用供需算法对核极限学习机(KELM)进行优化,并用于分类。
文章浏览阅读1.1k次。一、系统弱密码登录1、在kali上执行命令行telnet 192.168.26.1292、Login和password都输入msfadmin3、登录成功,进入系统4、测试如下:二、MySQL弱密码登录:1、在kali上执行mysql –h 192.168.26.129 –u root2、登录成功,进入MySQL系统3、测试效果:三、PostgreSQL弱密码登录1、在Kali上执行psql -h 192.168.26.129 –U post..._metasploitable2怎么进入
文章浏览阅读257次。本文将为初学者提供Python学习的详细指南,从Python的历史、基础语法和数据类型到面向对象编程、模块和库的使用。通过本文,您将能够掌握Python编程的核心概念,为今后的编程学习和实践打下坚实基础。_python人工智能开发从入门到精通pdf