Akka中Actor消息通信的实现原理(源码解析)_actor mailbox 实现原理-程序员宅基地

技术标签: actor  源码分析  scala  akka  

Akka中通过下面的方法向actor发送消息

  • ! tell 意味着 “fire-and-forget”,即异步的发送消息无需等待返回结果
  • ? ask 异步发送消息并返回代表可能回复的Future。

    消息在每个发件人的基础上是有序的。

MailBox

Akka邮箱包含发往Actor的消息。通常每个Actor都有自己的邮箱,但是也有例外,比如BalancingPool所有路由将共享一个邮箱实例。

其中MessageQueue(akka.dispatch.MessageQueue)是形成Akka邮箱的心组件之一。
发送给Actor的普通消息将被排入队列(并随后出队列)它至少需要支持N个生产者和1个消费者的线程安全。 它实现了入队列,出队列等方法

  def enqueue(receiver: ActorRef, handle: Envelope): Unit
  def dequeue(): Envelope
  def numberOfMessages: Int
  def hasMessages: Boolean
  def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit

其中Envelope封装了message:Any和sender:ActorRef两个成员

final case class Envelope private (val message: Any, val sender: ActorRef)

SystemMessageQueue提供了systemEnqueue(入队列)和systemDrain(全部出队列)方法。MailBox继承自系统消息队列SystemMessageQueue和ForkJoinTask,实现了Runnable接口,同时包含ActorCell成员和MessageQueue成员

private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
  extends ForkJoinTask[Unit] with SystemMessageQueue with Runnable {
  var actor: ActorCell = _
  }

其中ForkJoinTask是用少数线程执行海量独立任务的极好架构(独立任务指的是任务和任务之间不要有共享数据,否则会有并发访问的问题)
MailBox代理了MessageQueue的所有方法。MessageQueue的具体类型,根据MailBoxType的不同而不同。

tell 操作

在创建ActorSystem时,初始化默认的dispatcher,默认ForkJoinPool(ExecutorService)
在使用actorRef ! Message发送消息时,调用了actorCell对应的sendMessage方法,其中调用了dispatcher.dispatch方法

可以在ActorRef中可以看到

    def ! (message: Any)(implicit sender: ActorRef = Actor.noSender): Unit

在ActorCell.scala中

    final def sendMessage(message: Any, sender: ActorRef): Unit =
        sendMessage(Envelope(message, sender, system))

之后可以追踪到dungeon的Dispatch.scala文件

  def sendMessage(msg: Envelope): Unit =
    try {
      val msgToDispatch =
        if (system.settings.SerializeAllMessages) serializeAndDeserialize(msg)
        else msg

      dispatcher.dispatch(this, msgToDispatch)
    } catch handleException

而代码里的dispatcher.dispatch可以在dispatch.Dispatcher中找到:

     /**
      * INTERNAL API
      */
     protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = {
       val mbox = receiver.mailbox
       mbox.enqueue(receiver.self, invocation)
       registerForExecution(mbox, true, false)
     }

     protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = {
       if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races
         if (mbox.setAsScheduled()) {
           try {
             executorService execute mbox
             true
           } catch {
             case e: RejectedExecutionException ⇒
               try {
                 executorService execute mbox
                 true
               } catch { //Retry once
                 case e: RejectedExecutionException ⇒
                   mbox.setAsIdle()
                   eventStream.publish(Error(e, getClass.getName, getClass, "registerForExecution was rejected twice!"))
                   throw e
               }
           }
         } else false
       } else false
     }

dispatch方法做了两件事情:
一是将消息放到actorCell的消息队列中(maiBox 是 ActorCell 的成员变量)
二是调用dispather底层的线程池executor execute mbox执行mbox.run()(mailBox继承了
Runnable 接口所以能放入ExecutorService 中执行),

  override final def run(): Unit = {
    try {
      if (!isClosed) { //Volatile read, needed here
        processAllSystemMessages() //First, deal with any system messages
        processMailbox() //Then deal with messages
      }
    } finally {
      setAsIdle() //Volatile write, needed here
      dispatcher.registerForExecution(this, false, false)
    }
  }

  /**
     * Process the messages in the mailbox
     */
    @tailrec private final def processMailbox(
      left:       Int  = java.lang.Math.max(dispatcher.throughput, 1),
      deadlineNs: Long = if (dispatcher.isThroughputDeadlineTimeDefined == true) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0L): Unit =
      if (shouldProcessMessage) {
        val next = dequeue()
        if (next ne null) {
          if (Mailbox.debug) println(actor.self + " processing message " + next)
          actor invoke next
          if (Thread.interrupted())
            throw new InterruptedException("Interrupted while processing actor messages")
          processAllSystemMessages()
          if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0))
            processMailbox(left - 1, deadlineNs)
        }
      }

执行mbox.run()中,先从SystemMessage链表中处理系统消息,
然后从MessageQueue成员中处理用户消息。
处理用户消息时,run 是一个递归函数,每次调用处理一个消息,
处理逻辑通过调用actorCell的invoke方法实现,根据dispatcher
的throughput决定处理多少条消息,
根据dispatcher的throughputDeadlineTime决定处理多长时间,
长度和时间在处理完一条消息后检查一次。

  final def invoke(messageHandle: Envelope): Unit = {
    val influenceReceiveTimeout = !messageHandle.message.isInstanceOf[NotInfluenceReceiveTimeout]
    try {
      currentMessage = messageHandle
      if (influenceReceiveTimeout)
        cancelReceiveTimeout()
      messageHandle.message match {
        case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle)
        case msg                      ⇒ receiveMessage(msg)
      }
      currentMessage = null // reset current message after successful invocation
    } catch handleNonFatalOrInterruptedException { e ⇒
      handleInvokeFailure(Nil, e)
    } finally {
      if (influenceReceiveTimeout)
        checkReceiveTimeout // Reschedule receive timeout
    }
  }

 final def receiveMessage(msg: Any): Unit = actor.aroundReceive(behaviorStack.head, msg)

对 PoisonKill, Terminate 系统消息的处理在 autoReceiveMessage 中,
对普通消息的处理在 receiveMessage 中,

private var behaviorStack: List[Actor.Receive] = emptyBehaviorStack

可以看到behaviorStack 是一个 List[Actor.Receive],

type Receive = PartialFunction[Any, Unit]

其中Receive (PartialFunction[Any, Unit])函数就是我们写的对 message 的处理逻辑。
因为 Actor 支持通过 become/unbecome 切换形态,
所以behaviorStack.head就是当前的Receive处理逻辑。

对于ForkJoinPool这种executor,每次执行execute(mbox)时,实
际上都是先创建一个继承自ForkJoinTask的MailboxExecutionTask,
其中的exec方法调用mbox.run方法,因此每次执行都会创建一个ForkJoinTask对象。

还有一点,消息队列都是放到actor对应的mailbox中(以Envelope的形式封装消息本身和sender),
而执行的task对象会放到Executor的每个线程对应的工作队列中,task和消息分别使用不同的队列。

参考
https://doc.akka.io/docs/akka/snapshot/mailboxes.html
https://doc.akka.io/docs/akka/snapshot/actors.html#send-messages
http://spartan1.iteye.com/blog/1641322

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/springlustre/article/details/79082770

智能推荐

linux中pstree命令的含义,Linux之pstree命令-程序员宅基地

文章浏览阅读564次。1、pstree命令功能简介[root@vms002opt]#whatispstreepstree(1)-displayatreeofprocesses#显示进程树2、pstree命令语法pstree(选项)3、pstree命令常用选项选项含义-a显示每个程序的完整指令,包含路径,参数或是常驻服务的表示-c不使用精简表示法-G使用VT100终端机的列..._linux pstree含义

计算机等级错别字,word教学错别字制作方法(一)-程序员宅基地

文章浏览阅读148次。小学语文教学中经常要用到一些错别字的大幅挂图,我们可以利用下面两种方法制作所需的错别字图片,而不必跑商店去购。这里以制作错别字为例进行介绍。1.在Flash中制作运行Flash,新建一空白Flash文档,单击选中“文本工具(T)”按钮,在“属性”中设定字体为隶体、字号为300(可根据具体情况设定,字号大小直接输入即可),在编辑区中输入“吃”字,用箭头工具选中该文本框,用快捷键“Ctrl+B”将它打..._计算机考试如何该错字

EL和JSTL基本使用_jstl表达提交数据获取数据-程序员宅基地

文章浏览阅读2.1k次,点赞7次,收藏21次。讲解了el和jstl的基本使用,学会使用el和jstl的基本用法,能看懂别人写的el和jstl,并且能够自己编写一些el和jstl。_jstl表达提交数据获取数据

npm ERR!cb.apply is not a function_npm err! cb.apply is not a function-程序员宅基地

文章浏览阅读3.7k次,点赞2次,收藏5次。npm ERR!cb.apply is not a function_npm err! cb.apply is not a function

BeanDefinitionParsingException:Unnamed bean definition specifies neither 'class' nor 'parent'_idea 单测 application.properties文件解析报错 beandefinitio-程序员宅基地

文章浏览阅读1.2k次。BeanDefinitionParsingException:Unnamed bean definition specifies neither 'class' nor 'parent'由于:bean 没有配置完整,少了class等属性的配置 或者配置了空的bean_idea 单测 application.properties文件解析报错 beandefinitionparsinge

三国杀_三国杀博客-程序员宅基地

文章浏览阅读938次。杀的数量是闪的两倍。装诸葛连弩前可以先出一次杀(杀司马懿时)。黑色花色多为进攻的牌;方片多为闪和桃;寒冰剑攻击大乔时,大乔可以选择流离,寒冰剑的技能将对流离的角色使用;寒冰剑的弃牌类似于两次过河拆桥(不能被无懈);寒冰剑打司马懿比较合适;无懈可击的对象是目标锦囊,与角色无关;借刀杀人可以杀该锦囊使用者;诸葛连弩的技能不能被发动(出牌阶段);借刀杀人中反馈的是攻击的源头;不能无懈掉桃源结义;南蛮入侵_三国杀博客

随便推点

Qt CMake构建错误:MSB1009_msbuild : error msb1009: 项目文件不存在-程序员宅基地

文章浏览阅读2.4k次,点赞3次,收藏6次。Qt使用CMake构建项目提示:MSB1009检查build生成的项目名称与CMakeLists中的项目名称是否一致。将Targets改为“Current executable”。_msbuild : error msb1009: 项目文件不存在

linux用串口登陆,linux串口登陆-程序员宅基地

文章浏览阅读1.9k次。在这个互连网高速发展的时代, 企业内网络设备的安全和运行稳定也成为人们的焦点话题之一,在网络安全方面有句行话"技术不是万能药!".在整个安全问题的全方面考虑中,人才是最重要的环节.为此,非常多企业的重要服务器都没有输出或输入设备,就算有,也只是安装系统时使用,一旦投入生产式管理,基本依靠网络进行,网络毕竟不是这么可靠,如果服务器的网络瘫痪了,你正好需要远程解决网络的问题,那该怎么做?其实Linux..._服务器串口登录

数字图像处理总结(冈萨雷斯版)_冈萨雷斯数字图像处理-程序员宅基地

文章浏览阅读1.9w次,点赞64次,收藏787次。数字图像处理(冈萨雷斯版本)课程复习_冈萨雷斯数字图像处理

win10系统如何配置web服务器,如何在windows10下搭建web服务器-程序员宅基地

文章浏览阅读9.8k次,点赞3次,收藏31次。我们都知道Web服务器的使用,让很多的用户们实现了局域网内资源共享的方法,因为利用Web服务器,我们就可以将自己的信息上传到自己指定的服务器端,不仅可以实现资源共享,而且还有效地达到信息的同步,是一个非常不错的信息平台。那么如何在windows10下,成功地实现web服务器的搭建呢?系统堂官网告诉你在如何在windows10下搭建web服务器步骤如下所示:1.首先打开电脑的控制面板,选择并进入“程..._win10搭建web服务器

android微信打开app接口被封,android app使用微信登录接口回调没有被执行的问题研究...-程序员宅基地

文章浏览阅读223次。本人开发的一个app使用了sharesdk集成微信登录功能,在测试的过程中微信授权登录界面有调用,但是授权后原应用的回调没有被执行应用的包名是com.kimi.searcher首先,确认微信点击授权后有没有执行回调,方法是通过日志过滤activitymanager,日志中有出现04-16 13:27:43.345 1805-3279/? I/ActivityManager: START u0 {f..._app里面的接口被封禁原因

docker启动的nginx配置访问静态资源,访问失败的解决方法_docker启动nginx不能访问-程序员宅基地

文章浏览阅读453次,点赞11次,收藏6次。注意查看 nginx 的 error 日志,显示是 404。docker 比较特殊,它的 nginx 目录都是分散的,所以对于 nginx 配置文件中的root 路径最好写绝对路径,而且得是 docker 容器内的路径,如下。做了挂载映射的,不要写成宿主机的挂载路径了,要改成docker 容器内的路径!_docker启动nginx不能访问