简称producer,负责消息产生并发送到meta服务器
简称consumer,负责消息的消费,meta采用pull模型,由消费者主动从meta服务器拉取数据并解析成消息消费。
消息的主题,由用户定义并在服务端配置。producer发送消息到某个topic下,consumer从某个topic下消费消息
为了集群和负载均衡,同一个topic下面还分为多个分区,如meta-test这个topic我们可以分为10个分区,分别有两台服务器提供,那么可能每台服务器提供5个分区,假设服务器id分别为0和1,则所有分区为0-0、0-1、0-2、0-3、0-4、1-0、1-1、1-2、1-3、1-4
。
消息,负载用户数据,并在生产者、服务端和消费者之间传输
meta的服务端或者说是服务器,在消息中间件中通常称为broker
消费者可以是多个消费者共同消费一个topic下的消息,每个消费者消费部分消息。这些消费者就组成一个分组,拥有同一个分组名称,通常也称为消费者集群
消息在broker上的每个分区都是组织成一个文件列表,消费者拉取数据需要知道数据在文件中的偏移量,这个偏移量就是所谓offset。Offset是绝对偏移量,服务器会将offset转化为具体文件的相对偏移量。详细内容参见#消息的存储结构
服务的发现,维护了Broker的地址列表和Topic及Topic对应队列的地址列表,与每一个broker保持心跳连接,检车broker是否存活,在producer和consumer需要发布或者消费消息的时候,想nameserver发出请求获取连接。
broker节点是MetaQ消息存储的位置,所有的消息都由节点broker负责存储。broker会与nameserver建立长连接,将自身的相关信息(比如ip、topic、分区数等)发送到nameserver中,由nameserver集群对节点集群的相应信息进行维护。
生产者发消息前需要与nameserver建立长连接,生产者将从nameserver上获取到的相关broker节点的信息(比如broker上的topic类型,分区大小等)保存在本地。生产者根据自身需要发送的消息topic类型,从本地broker缓存文件中获取到节点列表,并从中选择某个broker节点,得到节点的相应address信息,然后生产者与这个节点建立连接。如果需要发送的topic在本地缓存中找不到对应的broker,生产者则会根据自身启动时初始化的remotingClient变量(记录了服务器的相关ip和端口),对nameserver进行请求。将自身所提供的topic信息注册到对应的broker节点上。如果注册成功,则最终会返回一个topicPublishInfo,其中记录了存有topic的对应broker列表信息。
生产者产生的消息体的主要由四部分构成:topic、tag、key、msgBody。
topic:消息的主题。发布和订阅消息是,都以topic为区分标准
tag:可看做topic的二级分类。一般可以用来对消息进行过滤处理,以获取到更精确的消息。
key:消息的唯一标识符。用于标识一个消息。当消息的传输和存储过程中出现故障问题时,可通过key来对失败的消息进行快速定位与查找,对于后期维护以及排查错误是一个关键的变量。
msgBody:消息体。消息的主体部分一般是byte类型数组,在java开发过程中,往往需要通过序列化和压缩等方式对其进行组装。
同生产者一样,消费者需要与nameserver建立长连接,消费者将从nameserver上获取到的相关broker节点的信息(比如broker上的topic类型、节点数量、分区大小等)保存在本地。
消费者在进行消息获取时有两种方式:第一种是pull模型,第二种是push模型。
**pull模型:**即消费者主动与broker节点进行连接通信,然后根据自己所需要的topic消息类型,从broker上拉取下来指定数量的消息。主动权在消费者手中,pull方式的循环间隔不好设定,间隔设定太短,处在“忙等”状态,浪费资源,间隔太长,消息不能及时处理
**push模型:**broker节点在收到生产者消息后,主动将消息推送到consumer上。Push方式的实时性比较高,但是会加大server端的工作压力,而且由于client的处理能力不能,client不能受server控制。
metaQ采用长轮询pull方式,既解决了pull实时性不够的问题,又不至于大量浪费资源。
MetaQ的存储方式采用物理队列+逻辑队列的形式。
一台机器只有一个,也就是本地的文件系统(图中的commit log),存储着实际的数据文件。MetaQ将消息存储在本地文件中,每个文件最大大小为1G,如果写入新的消息时,超过当前文件大小,则会自动新建一个文件。文件名称为起始字节大小。以起始字节大小命名并排序这些文件是有诸多好处的,当消费者要抓取某个起始偏移量开始位置的数据,会变的很简单,只要根据传上来的offset二分查找文件列表,定位到具体文件,然后将绝对offset减去文件的起始节点转化为相对offset,即可找到对应的数据。
commit log是以append的方式编写的,保证了顺序写磁盘,顺序写磁盘效率比随机写内存还高,高吞吐量的保证
metaQ的所有消息都是持久化的,先写入系统的PageCache(页高速缓存),然后刷盘,可以保证内存和磁盘都有一份数据,访问时,直接从内存中获取。
[外链图片转存失败(img-fuGJtODu-1562040940329)(C:\Users\xiaozhan.fc\AppData\Roaming\Typora\typora-user-images\1561026231679.png)]
一台机器可以有多个(topicA_3 topicA-4 等等),逻辑队列中的存储的是索引文件。服务器将消息存储到文件后,会将该消息在文件的物理位置,消息大小,消息类型封装成一个固定大小的数据结构,暂且称这个数据结构为索引单元吧,大小固定为 16 byte,消息在物理文件的位置称为offset,8个字节,消息size占4个字节,MessageType占4个字节。多个索引单元组成了一个索引文件,索引文件默认固定大小为 20M,和消息文件一样,文件名是起始字节位置,写满后,产生一个新的文件。metaq对于客户端展现的是逻辑队列就是消费队列,consumer从消费队列里顺序取消息进行消费。
这种设计是把物理和逻辑分离,消费队列更加轻量化。所以metaq可以支撑更多的消费队列数,提升消息的吞吐量,并且有一定的消息堆积能力。但是也有缺点:虽然是顺序写入,但是读却是随机读的。
消息生产者发送消息后返回SendResult,如果isSuccess返回true,则表示消息已经确认发送到服务器,并且被服务器接受存储。整个发送过程是一个同步的过程,保证消息送到服务器并且返回结果。
收到消息之后,写入磁盘,写入成功之后,返回应答给生产者
os对系统有缓冲:
每1000条(可配置),即强制调用一次force来写入磁盘设备。
每隔10秒(可配置),强制调用一次force来写入磁盘设备。
因此,Meta通过配置可保证在异常情况下(如磁盘掉电)10秒内最多丢失1000条消息。当然通过参数调整你甚至可以在掉电情况下不丢失任何消息。
消费者是一条一条消费信息。
如果消费某条信息失败(如异常),则会尝试重试消费这条消息(默认最大次数5次)
超过最大次数无法消费,则将消息存储在本地磁盘,由后台线程继续重试。主线程往后走,继续消费。只有在MessageListener确认成功消费一条消息后,meta的消费者才会继续消费另一条消息。由此来保证消息的可靠消费。
存储方案,offset存储
默认处理原则:谁先到达服务器并写入磁盘,则先处理谁。因为消费者针对每个分区都是按照从前到后递增offset的顺序拉取消息。
Meta可以保证,在单线程内使用该producer发送的消息按照发送的顺序达到服务器并存储,并按照相同顺序被消费者消费,前提是这些消息发往同一台服务器的同一个分区。
public interface PartitionSelector {
public Partition getPartition(String topic, List<Partition> partitions, Message message) throws MetaClientException;
}
消息重复发生的例子:
生产者:生产者发送消息,等待服务器应答,这个时候发生网络故障,服务器实际上已经将消息写入成功,但是由于网络故障没有返回应答,那么生产者会重发,这个时候服务器收到了两条相同的信息。
这种由故障引起的重复,meta是无法避免的,不判断消息的data是否一致,meta仅仅作为载荷来传输
针对消费者来说也会有这个问题,消费的时候,机器突然断电,没有及时将前进后的offset存储起来,则下次启动的时候或者其他同个分组的消费者owner到这个分区的时候,会重复消费这条消息。
首先会用到消息回话工厂类——MessageSessionFactory,这个工厂作用是创建生产者和消费者。创建生产者代码:
final MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(initMetaConfig());
// create producer,强烈建议使用单例
final MessageProducer producer = sessionFactory.createProducer();
点进去 MetaMessageSessionFactory之后,看看这个工厂帮助我们做了哪些事情。
public MetaMessageSessionFactory(final MetaClientConfig metaClientConfig) throws MetaClientException {
super();
try {
this.checkConfig(metaClientConfig);
this.metaClientConfig = metaClientConfig;
final ClientConfig clientConfig = new ClientConfig();
clientConfig.setTcpNoDelay(TCP_NO_DELAY);
clientConfig.setMaxReconnectTimes(MAX_RECONNECT_TIMES);
clientConfig.setWireFormatType(new MetamorphosisWireFormatType());
clientConfig.setMaxScheduleWrittenBytes(MAX_SCHEDULE_WRITTEN_BYTES);
try {
this.remotingClient = new RemotingClientWrapper(RemotingFactory.connect(clientConfig));
}
catch (final NotifyRemotingException e) {
throw new NetworkException("Create remoting client failed", e);
}
// 如果有设置,则使用设置的url并连接,否则使用zk发现服务器
if (this.metaClientConfig.getServerUrl() != null) {
this.connectServer(this.metaClientConfig);
}
else {
this.initZooKeeper();
}
this.producerZooKeeper =
new ProducerZooKeeper(this.metaZookeeper, this.remotingClient, this.zkClient, metaClientConfig);
this.sessionIdGenerator = new IdGenerator();
// modify by wuhua
this.consumerZooKeeper = this.initConsumerZooKeeper(this.remotingClient, this.zkClient, this.zkConfig);
this.zkClientChangedListeners.add(this.producerZooKeeper);
this.zkClientChangedListeners.add(this.consumerZooKeeper);
this.subscribeInfoManager = new SubscribeInfoManager();
this.recoverManager = new RecoverStorageManager(this.metaClientConfig, this.subscribeInfoManager);
this.shutdownHook = new Thread() {
@Override
public void run() {
try {
MetaMessageSessionFactory.this.isHutdownHookCalled = true;
MetaMessageSessionFactory.this.shutdown();
}
catch (final MetaClientException e) {
log.error("关闭session factory失败", e);
}
}
};
Runtime.getRuntime().addShutdownHook(this.shutdownHook);
}
catch (MetaClientException e) {
this.shutdown();
throw e;
}
catch (Exception e) {
this.shutdown();
throw new MetaClientException("Construct message session factory failed.", e);
}
}
可以看到,这个工厂大致帮我们做了下面几个事情:
服务的查找和发现,通过diamond和zookeeper帮你查找日常的meta服务器地址列表
连接的创建和销毁,自动创建和销毁到meta服务器的连接,并做连接复用,也就是到同一台meta的服务器在一个工厂内只维持一个连接。
消息消费者的消息存储和恢复。
协调和管理各种资源,包括创建的生产者和消费者的。
消息是邮件,MetaQ是邮局,消息填好目的地之后,放在邮局,邮局就可以帮助你送到目的地点
MetaQ的消息在Java客户端里是com.taobao.metamorphosis.Message类,它主要包括这么几个属性:
public class Message implements Serializable {
文章浏览阅读5k次,点赞5次,收藏53次。前言在做可视化大屏的时候,我们首先要保证UI图的比例不变,例如16:9的UI图,但大屏的比例可能是2:1,很多时候大屏的比例往往很少能与UI图的比例一模一样的,这个时候我们就要利用公式换算来适配大屏。例如16:9的UI图:适配大屏当页面首次加载时,判断视口的宽高,如果视口的宽/高 > 16/9 则说明视口宽度比较设计图宽,实际的显示宽度应该等于视口的高度*16/9。如果视口的宽/高 < 16/9 则说明视口高度比设计图高,实际的显示宽度应该等于视口的宽度,显示高度应等_大屏还原ui适配宽度
文章浏览阅读3.3k次。1. 查看计划任务ls /var/spool/cron删除异常任务其配置项。如果当前系统之前并未配置过计划任务,可以直接删除计划脚本目录即可:rm -rf /var/spool/cron/*2. 查看密钥认证文件删除木马创建的密钥认证文件,如果当前系统之前并未配置过密钥认证,可以直接清空认证存放目录:rm -rf /root/.ssh/*如果有配置过密钥认证,需要删除指定的黑客创建的认证文件即可。3. 修复 SSH 配置项一般默认脚本中进行修改的 PermitRootLogin、R._xmrig miner怎么彻底清除
文章浏览阅读358次。原作者:Julie Zhuo,前Facebook产品设计副总裁原文地址:https://medium.com/the-year-of-the-looking-glass/how-to-work-with-engineers-a3163ff1eced很久以前,我当过项目经理。之后,我成为了工程师。最近7年,我担任产品设计师。每天我都与这三个岗位的人一起工作。每天,我都能发现新的方式来理解这产品研发背后的三大支柱的职责、挑战和艺术。研发工程师是魔术师,他们只需要轻轻动几下手指调整像素,瞧,一个能运作的产_研发是把想法变成现实的桥梁
文章浏览阅读3k次。vMotion可在主机之间迁移正在运行的虚拟机,因此进行计划内服务器维护时无需中断应用的使用。概览vSphere vMotion 能在实现零停机和服务连续可用的情况下将正在运行的虚拟机从一台物理服务器实时迁移到另一台物理服务器上,并且能够完全保证事务的完整性。 vMotion 是创建动态、自动化并自我优化的数据中心所需的关键促成技术。即时迁移正在运行的虚拟机轻松管理和安排实时迁移即时迁移正在运行的..._vmware 虚机 漫游 实时性
文章浏览阅读184次。文章:From Word Embeddings To Document Distances研究背景在此之前,文档的两种最常见的表述方式是通过袋词(BOW)或词频-逆文档频率(TF-IDF)。但是无法解决近义词之间的距离问题,即是BOW和TF-IDF无法表达词语的含义。由于word2vec的出现,近义词的问题解决了,通过word2vec将词语表示成向量,用两向量的距离表示两词语的距离可以很好的区分“..._文档词频矩阵
文章浏览阅读1.6k次,点赞9次,收藏13次。Oracle中单引号与双引号的使用及动态拼接。_oracle拼接单引号
文章浏览阅读65次。一、工具我们需要现在两个工具:MySQL 服务器(mysql-5.7.18)、MySQL GUI(mysql-workbench)MySQL 服务器包含了MySQL 的整个运行环境,安装了它就能通过命令行让 MySQL 运行的很好了。当然如果你不喜欢命令行,可以下载一个 GUI工具来管理 MySQL。GUI的功能很丰富,包括权限设置,创建数据库、创建表格等等。二、安装1.MySQL 服务器下载地址..._mysql workbench 6.3安装
文章浏览阅读1.6k次。第一种方法:高通AR(Vuforia) Vuforia插件下载地址(官网):https://developer.vuforia.com/downloads/sdkVuforia实现图片识别1、新建项目,导入Vuforia插件2、将默认摄像机删除,将Vuforia/Prefabs中的ARCamera和ImageTarget预置体拖到场景中。并进行调整3、将要识别的图片..._关于unity3dar
文章浏览阅读462次。25 2018 CVPREnhanced Optic Disk and Cup Segmentation with Glaucoma Screening from Fundus Images using Position encoded CNNsMethod : 分割 + 分类Dataset :Architecture : Unet(DenseNet)+ DenseNet201/ResN..._cvpr fundus
文章浏览阅读772次。作者|陆首群出品 |COPU开源联盟(ID:COPU2004)谈到华为自研鸿蒙内核和操作系统,从华为透漏出来的信息来看,有点自相矛盾、扑朔迷离!我曾说过:真真假假,虚虚实实!这里有技术原因,也有外部原因。一开始(大概是 2016 年左右),华为的说法:其自研的首款操作系统是搭载物联网(IoT)的(这时要求操作系统小型化、实时性、响应快,采用微内核), 连任正非都坚持这个说法。当时我就看出来,华为..._鸿蒙内核是linux吗
文章浏览阅读439次,点赞18次,收藏3次。MyBatis 是一个可以自定义 SQL、存储过程和高级映射的持久层框架。
文章浏览阅读211次。《微机外设与维修》期末考试样题一、填空题1、显示器的分辨率由和组成。2、计算机常见的外围设备主要有:___________、__________、___________等。3、目前显卡与主板的接口有和。最流行的是接口。4、主板上的一个IDE接口可以接___________个IDE设备,一个称为___________设备,另一个称为___________设备。5、CPU的主频与外频的关系:。6、主板..._当计算机等新兴技术设备新安装的显卡打印机等硬件或外围设备后需要安装什么才