【问题标题】:Akka event message is not dispatch or trigger another event. Always getting triggered first event other events are not getting dispatchAkka 事件消息未调度或触发另一个事件。总是被触发第一个事件其他事件没有得到调度
【发布时间】:2017-03-18 06:51:15
【问题描述】:

我是 Akka 的新手,我正在尝试向 Akka Actor 触发或发送事件消息,我有 3 个事件消息,我正在触发一个,但是为什么只有一个第一个事件被触发。

可能是因为:receive(receiveEvent); 这个方法在我的EventProcessActor 构造函数中调用。

但是在那之后我们也调用了其他事件,但是我在这里遗漏了一些为什么它不分派给其他匹配事件。

我总是低于控制台输出:

[INFO] [03/18/2017 13:35:53.446]... We received the Events need to process it

我的预期输出是:

[INFO] [03/18/2017 13:35:53.446] ... We received the Events need to process it

[INFO] [03/18/2017 13:35:53.447]...  We are processing Events

[INFO] [03/18/2017 13:35:53.446]... Completed Events processing

在控制台输出上方,我用... 删除了[default-akka.actor.default-dispatcher-4] [akka://default/user/EventProcessing]

当我触发如下事件时:

procsssEvents.tell(new EventProcessActor.EventActivity(Events.STSRT, Paths.get("/")), procsssEvents);
procsssEvents.tell(new EventProcessActor.EventActivity(Events.READING_LINE, Paths.get("/")), procsssEvents);
procsssEvents.tell(new ventProcessActor.EventActivity(Events.END_OR,Paths.get("/")), procsssEvents); 

下面是我的 Acotr 类和 Message 类以及 pom.xml 文件。

AkkaActor:

package com.ebc.biz.akka.event.trigger;

import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;

import scala.PartialFunction;
import scala.runtime.BoxedUnit;

import akka.actor.AbstractLoggingActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.ActorSystemImpl;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;

import static com.ebc.biz.akka.event.trigger.EventMessage.Events;

public class EventProcessActor extends AbstractLoggingActor {

    public static class EventActivity {
        final EventMessage startOfEventMessage;

        public EventMessage getStartOfEventMessage() {
            return startOfEventMessage;
        }

        public EventActivity(Events events, Path eventPath) {
            startOfEventMessage = new EventMessage(events, eventPath);
        }

    }

    public static class EventReadingActivity {

        final EventMessage startOfReadingMessage;

        public EventMessage getStartOfReadingMessage() {
            return startOfReadingMessage;
        }

        public EventReadingActivity(Events events, Path eventPath) {
            startOfReadingMessage = new EventMessage(events, eventPath);

        }

    }

    public static class EndOfEventActivity {

        final EventMessage endOfEventMessage;

        public EventMessage getEndOfEventMessage() {
            return endOfEventMessage;
        }

        public EndOfEventActivity(Events events, Path eventPath) {
            endOfEventMessage = new EventMessage(Events.END_OR, eventPath);

        }
    }

    private final PartialFunction<Object, BoxedUnit> receiveEvent;

    private final PartialFunction<Object, BoxedUnit> startEventsProcessing;

    private final PartialFunction<Object, BoxedUnit> completeEventProcessing;

    public EventProcessActor() {

        receiveEvent = ReceiveBuilder
                .match(EventActivity.class, this::onStartEventReceive)
                .match(EventReadingActivity.class, this::readEventLine).build();

        startEventsProcessing = ReceiveBuilder
                .match(EventReadingActivity.class, this::readEventLine)
                .match(EndOfEventActivity.class, this::onEndOfEventProcessing)
                .build();

        completeEventProcessing = ReceiveBuilder.match(
                EndOfEventActivity.class, this::onEndOfEventProcessing).build();

        receive(receiveEvent);
    }

    public static Props props() {

        return Props.create(EventProcessActor.class);
    }

    public void onStartEventReceive(EventActivity fileActivity) {
        log().info("We received the Events need to process it");
        getContext().become(startEventsProcessing);
    }

    public void readEventLine(EventReadingActivity fileActivity) {
        log().info("We are processing Events");
        getContext().become(completeEventProcessing);

    }

    public void onEndOfEventProcessing(EndOfEventActivity fileActivity) {
        log().info("Completed Events processing");

    }

    public static void main(String args[]) throws IOException {

        ActorSystem syste = ActorSystemImpl.create();
        final ActorRef procsssEvents = syste.actorOf(EventProcessActor.props(),
                "Event" + "Processing");

        procsssEvents.tell(new EventProcessActor.EventActivity(Events.STSRT,
                Paths.get("/")), procsssEvents);
        procsssEvents.tell(new EventProcessActor.EventActivity(
                Events.READING_LINE, Paths.get("/")), procsssEvents);
        procsssEvents.tell(new EventProcessActor.EventActivity(Events.END_OR,
                Paths.get("/")), procsssEvents);

        System.out.println("Enter to terminate");
        System.in.read();

    }

}

事件消息

package com.ebc.biz.akka.event.trigger;

import java.nio.file.Path;

public class EventMessage {

    public static enum Events {

        STSRT, READING_LINE, END_OR;

    }

    private final Events readEvents;
    private final Path pathOfEvents;

    public Path getPathOfEvents() {
        return pathOfEvents;
    }

    public Events getReadEvents() {
        return readEvents;
    }

    public EventMessage(Events readEvents, Path pathOfFile) {
        this.readEvents = readEvents;
        this.pathOfEvents = pathOfFile;
    }

}

Pom.xml

 <groupId>com.ebc.biz</groupId>
  <artifactId>akka.event.trigger</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <properties>
        <akka.version>2.4.9</akka.version>
        <maven-dependency-plugin.version>3.0.0</maven-dependency-plugin.version>
        <maven.compiler.plugin>3.6.1</maven.compiler.plugin>
        <java.compiler.target>1.8</java.compiler.target>
        <java.compiler.source>1.8</java.compiler.source>
    </properties>
    <dependencies>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor_2.11</artifactId>
            <version>${akka.version}</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-http-core_2.11</artifactId>
            <version>${akka.version}</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-http-experimental_2.11</artifactId>
            <version>${akka.version}</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-http-jackson-experimental_2.11</artifactId>
            <version>${akka.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-dependency-plugin</artifactId>
            <version>${maven-dependency-plugin.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <!-- This will download source so easy to see API and java doc. -->
                <artifactId>maven-source-plugin</artifactId>
                <executions>
                    <execution>
                        <id>attach-sources</id>
                        <phase>verify</phase>
                        <goals>
                            <goal>jar</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!-- Java 8 compiler plugin -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>${maven.compiler.plugin}</version>
                <configuration>
                    <source>${java.compiler.source}</source>
                    <target>${java.compiler.target}</target>
                </configuration>
            </plugin>

        </plugins>
    </build>
</project>

为什么我的消息没有发送到另一个。我想我错过了一些东西。

提前感谢您提供任何类型的信息和帮助。

【问题讨论】:

    标签: java scala events akka messages


    【解决方案1】:

    编辑:问题是 - 你发送 EventActivity 而演员期望每个设计有另一种类型,所以你应该更新你的 main

        procsssEvents.tell(new EventProcessActor.EventActivity(Events.STSRT, Paths.get("/")), procsssEvents);
        procsssEvents.tell(new EventProcessActor.EventReadingActivity(Events.READING_LINE, Paths.get("/")), procsssEvents);
        procsssEvents.tell(new EventProcessActor.EndOfEventActivity(Events.END_OR, Paths.get("/")), procsssEvents);
    

    【讨论】:

    • 如果我没有调用receive(receiveEvent);在 EventProcessActor 构造函数中,我得到了异常,并且我从 main 方法发送的所有消息都将“遇到死信”。而且只有第一条消息被触发,因为我正在调用“receive(receiveEvent);”那是第一个事件。我试图在调用每一行“告诉调用”之前让线程休眠,但同样的问题只有第一个事件被记录或触发。我也尝试在每次调用之前将 Thread.sleep (2000) 放入。
    • 对,问题出在其他地方,我已经更新了答案。
    猜你喜欢
    • 2015-07-18
    • 2015-07-08
    • 1970-01-01
    • 2018-05-13
    • 1970-01-01
    • 2011-12-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多