Akka中通过下面的方法向actor发送消息
? ask 异步发送消息并返回代表可能回复的Future。
消息在每个发件人的基础上是有序的。
Akka邮箱包含发往Actor的消息。通常每个Actor都有自己的邮箱,但是也有例外,比如BalancingPool所有路由将共享一个邮箱实例。
其中MessageQueue(akka.dispatch.MessageQueue)是形成Akka邮箱的心组件之一。
发送给Actor的普通消息将被排入队列(并随后出队列)它至少需要支持N个生产者和1个消费者的线程安全。 它实现了入队列,出队列等方法
def enqueue(receiver: ActorRef, handle: Envelope): Unit
def dequeue(): Envelope
def numberOfMessages: Int
def hasMessages: Boolean
def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit
其中Envelope封装了message:Any和sender:ActorRef两个成员
final case class Envelope private (val message: Any, val sender: ActorRef)
SystemMessageQueue提供了systemEnqueue(入队列)和systemDrain(全部出队列)方法。MailBox继承自系统消息队列SystemMessageQueue和ForkJoinTask,实现了Runnable接口,同时包含ActorCell成员和MessageQueue成员
private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
extends ForkJoinTask[Unit] with SystemMessageQueue with Runnable {
var actor: ActorCell = _
}
其中ForkJoinTask是用少数线程执行海量独立任务的极好架构(独立任务指的是任务和任务之间不要有共享数据,否则会有并发访问的问题)
MailBox代理了MessageQueue的所有方法。MessageQueue的具体类型,根据MailBoxType的不同而不同。
在创建ActorSystem时,初始化默认的dispatcher,默认ForkJoinPool(ExecutorService)
在使用actorRef ! Message发送消息时,调用了actorCell对应的sendMessage方法,其中调用了dispatcher.dispatch方法
可以在ActorRef中可以看到
def ! (message: Any)(implicit sender: ActorRef = Actor.noSender): Unit
在ActorCell.scala中
final def sendMessage(message: Any, sender: ActorRef): Unit =
sendMessage(Envelope(message, sender, system))
之后可以追踪到dungeon的Dispatch.scala文件
def sendMessage(msg: Envelope): Unit =
try {
val msgToDispatch =
if (system.settings.SerializeAllMessages) serializeAndDeserialize(msg)
else msg
dispatcher.dispatch(this, msgToDispatch)
} catch handleException
而代码里的dispatcher.dispatch可以在dispatch.Dispatcher中找到:
/**
* INTERNAL API
*/
protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = {
val mbox = receiver.mailbox
mbox.enqueue(receiver.self, invocation)
registerForExecution(mbox, true, false)
}
protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = {
if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races
if (mbox.setAsScheduled()) {
try {
executorService execute mbox
true
} catch {
case e: RejectedExecutionException ⇒
try {
executorService execute mbox
true
} catch { //Retry once
case e: RejectedExecutionException ⇒
mbox.setAsIdle()
eventStream.publish(Error(e, getClass.getName, getClass, "registerForExecution was rejected twice!"))
throw e
}
}
} else false
} else false
}
dispatch方法做了两件事情:
一是将消息放到actorCell的消息队列中(maiBox 是 ActorCell 的成员变量)
二是调用dispather底层的线程池executor execute mbox执行mbox.run()(mailBox继承了
Runnable 接口所以能放入ExecutorService 中执行),
override final def run(): Unit = {
try {
if (!isClosed) { //Volatile read, needed here
processAllSystemMessages() //First, deal with any system messages
processMailbox() //Then deal with messages
}
} finally {
setAsIdle() //Volatile write, needed here
dispatcher.registerForExecution(this, false, false)
}
}
/**
* Process the messages in the mailbox
*/
@tailrec private final def processMailbox(
left: Int = java.lang.Math.max(dispatcher.throughput, 1),
deadlineNs: Long = if (dispatcher.isThroughputDeadlineTimeDefined == true) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0L): Unit =
if (shouldProcessMessage) {
val next = dequeue()
if (next ne null) {
if (Mailbox.debug) println(actor.self + " processing message " + next)
actor invoke next
if (Thread.interrupted())
throw new InterruptedException("Interrupted while processing actor messages")
processAllSystemMessages()
if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0))
processMailbox(left - 1, deadlineNs)
}
}
执行mbox.run()中,先从SystemMessage链表中处理系统消息,
然后从MessageQueue成员中处理用户消息。
处理用户消息时,run 是一个递归函数,每次调用处理一个消息,
处理逻辑通过调用actorCell的invoke方法实现,根据dispatcher
的throughput决定处理多少条消息,
根据dispatcher的throughputDeadlineTime决定处理多长时间,
长度和时间在处理完一条消息后检查一次。
final def invoke(messageHandle: Envelope): Unit = {
val influenceReceiveTimeout = !messageHandle.message.isInstanceOf[NotInfluenceReceiveTimeout]
try {
currentMessage = messageHandle
if (influenceReceiveTimeout)
cancelReceiveTimeout()
messageHandle.message match {
case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle)
case msg ⇒ receiveMessage(msg)
}
currentMessage = null // reset current message after successful invocation
} catch handleNonFatalOrInterruptedException { e ⇒
handleInvokeFailure(Nil, e)
} finally {
if (influenceReceiveTimeout)
checkReceiveTimeout // Reschedule receive timeout
}
}
final def receiveMessage(msg: Any): Unit = actor.aroundReceive(behaviorStack.head, msg)
对 PoisonKill, Terminate 系统消息的处理在 autoReceiveMessage 中,
对普通消息的处理在 receiveMessage 中,
private var behaviorStack: List[Actor.Receive] = emptyBehaviorStack
可以看到behaviorStack 是一个 List[Actor.Receive],
type Receive = PartialFunction[Any, Unit]
其中Receive (PartialFunction[Any, Unit])函数就是我们写的对 message 的处理逻辑。
因为 Actor 支持通过 become/unbecome 切换形态,
所以behaviorStack.head就是当前的Receive处理逻辑。
对于ForkJoinPool这种executor,每次执行execute(mbox)时,实
际上都是先创建一个继承自ForkJoinTask的MailboxExecutionTask,
其中的exec方法调用mbox.run方法,因此每次执行都会创建一个ForkJoinTask对象。
还有一点,消息队列都是放到actor对应的mailbox中(以Envelope的形式封装消息本身和sender),
而执行的task对象会放到Executor的每个线程对应的工作队列中,task和消息分别使用不同的队列。
参考
https://doc.akka.io/docs/akka/snapshot/mailboxes.html
https://doc.akka.io/docs/akka/snapshot/actors.html#send-messages
http://spartan1.iteye.com/blog/1641322
文章浏览阅读564次。1、pstree命令功能简介[root@vms002opt]#whatispstreepstree(1)-displayatreeofprocesses#显示进程树2、pstree命令语法pstree(选项)3、pstree命令常用选项选项含义-a显示每个程序的完整指令,包含路径,参数或是常驻服务的表示-c不使用精简表示法-G使用VT100终端机的列..._linux pstree含义
文章浏览阅读148次。小学语文教学中经常要用到一些错别字的大幅挂图,我们可以利用下面两种方法制作所需的错别字图片,而不必跑商店去购。这里以制作错别字为例进行介绍。1.在Flash中制作运行Flash,新建一空白Flash文档,单击选中“文本工具(T)”按钮,在“属性”中设定字体为隶体、字号为300(可根据具体情况设定,字号大小直接输入即可),在编辑区中输入“吃”字,用箭头工具选中该文本框,用快捷键“Ctrl+B”将它打..._计算机考试如何该错字
文章浏览阅读2.1k次,点赞7次,收藏21次。讲解了el和jstl的基本使用,学会使用el和jstl的基本用法,能看懂别人写的el和jstl,并且能够自己编写一些el和jstl。_jstl表达提交数据获取数据
文章浏览阅读3.7k次,点赞2次,收藏5次。npm ERR!cb.apply is not a function_npm err! cb.apply is not a function
文章浏览阅读1.2k次。BeanDefinitionParsingException:Unnamed bean definition specifies neither 'class' nor 'parent'由于:bean 没有配置完整,少了class等属性的配置 或者配置了空的bean_idea 单测 application.properties文件解析报错 beandefinitionparsinge
文章浏览阅读938次。杀的数量是闪的两倍。装诸葛连弩前可以先出一次杀(杀司马懿时)。黑色花色多为进攻的牌;方片多为闪和桃;寒冰剑攻击大乔时,大乔可以选择流离,寒冰剑的技能将对流离的角色使用;寒冰剑的弃牌类似于两次过河拆桥(不能被无懈);寒冰剑打司马懿比较合适;无懈可击的对象是目标锦囊,与角色无关;借刀杀人可以杀该锦囊使用者;诸葛连弩的技能不能被发动(出牌阶段);借刀杀人中反馈的是攻击的源头;不能无懈掉桃源结义;南蛮入侵_三国杀博客
文章浏览阅读2.4k次,点赞3次,收藏6次。Qt使用CMake构建项目提示:MSB1009检查build生成的项目名称与CMakeLists中的项目名称是否一致。将Targets改为“Current executable”。_msbuild : error msb1009: 项目文件不存在
文章浏览阅读1.9k次。在这个互连网高速发展的时代, 企业内网络设备的安全和运行稳定也成为人们的焦点话题之一,在网络安全方面有句行话"技术不是万能药!".在整个安全问题的全方面考虑中,人才是最重要的环节.为此,非常多企业的重要服务器都没有输出或输入设备,就算有,也只是安装系统时使用,一旦投入生产式管理,基本依靠网络进行,网络毕竟不是这么可靠,如果服务器的网络瘫痪了,你正好需要远程解决网络的问题,那该怎么做?其实Linux..._服务器串口登录
文章浏览阅读1.9w次,点赞64次,收藏787次。数字图像处理(冈萨雷斯版本)课程复习_冈萨雷斯数字图像处理
文章浏览阅读9.8k次,点赞3次,收藏31次。我们都知道Web服务器的使用,让很多的用户们实现了局域网内资源共享的方法,因为利用Web服务器,我们就可以将自己的信息上传到自己指定的服务器端,不仅可以实现资源共享,而且还有效地达到信息的同步,是一个非常不错的信息平台。那么如何在windows10下,成功地实现web服务器的搭建呢?系统堂官网告诉你在如何在windows10下搭建web服务器步骤如下所示:1.首先打开电脑的控制面板,选择并进入“程..._win10搭建web服务器
文章浏览阅读223次。本人开发的一个app使用了sharesdk集成微信登录功能,在测试的过程中微信授权登录界面有调用,但是授权后原应用的回调没有被执行应用的包名是com.kimi.searcher首先,确认微信点击授权后有没有执行回调,方法是通过日志过滤activitymanager,日志中有出现04-16 13:27:43.345 1805-3279/? I/ActivityManager: START u0 {f..._app里面的接口被封禁原因
文章浏览阅读453次,点赞11次,收藏6次。注意查看 nginx 的 error 日志,显示是 404。docker 比较特殊,它的 nginx 目录都是分散的,所以对于 nginx 配置文件中的root 路径最好写绝对路径,而且得是 docker 容器内的路径,如下。做了挂载映射的,不要写成宿主机的挂载路径了,要改成docker 容器内的路径!_docker启动nginx不能访问