【问题标题】:Serialization in the hazelcast jethazelcast jet 中的序列化
【发布时间】:2021-03-06 05:07:06
【问题描述】:

我在尝试处理来自 hazelcast jet 的数据时遇到以下错误。

Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
    at com.example.LearnJet.joins.LeftJoins.$deserializeLambda$(LeftJoins.java:1)
    ... 59 more

这里是代码:- AddToCart1 实例

public class AddToCart1 implements Serializable {
    private int number;
    private String cart;

    public AddToCart1() {
        super();
        // TODO Auto-generated constructor stub
    }

    public int getNumber() {
        return number;
    }

    public AddToCart1(int number, String cart) {
        super();
        this.number = number;
        this.cart = cart;
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((cart == null) ? 0 : cart.hashCode());
        result = prime * result + number;
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        AddToCart1 other = (AddToCart1) obj;
        if (cart == null) {
            if (other.cart != null)
                return false;
        } else if (!cart.equals(other.cart))
            return false;
        if (number != other.number)
            return false;
        return true;
    }

    public void setNumber(int number) {
        this.number = number;
    }

    public String getCart() {
        return cart;
    }

    public void setCart(String cart) {
        this.cart = cart;
    }

}

PageVisit1 实例

public class PageVisit1 implements Serializable {

    /**
     * 
     */
    private int number;
    private String pageName;

    public PageVisit1() {
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + number;
        result = prime * result + ((pageName == null) ? 0 : pageName.hashCode());
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        PageVisit1 other = (PageVisit1) obj;
        if (number != other.number)
            return false;
        if (pageName == null) {
            if (other.pageName != null)
                return false;
        } else if (!pageName.equals(other.pageName))
            return false;
        return true;
    }

    public PageVisit1(int number, String pageName) {
        super();
        this.number = number;
        this.pageName = pageName;
    }

    /**
     * @return the number
     */
    public int getNumber() {
        return number;
    }

    /**
     * @param number the number to set
     */
    public void setNumber(int number) {
        this.number = number;
    }

    /**
     * @return the pageName
     */
    public String getPageName() {
        return pageName;
    }

    /**
     * @param pageName the pageName to set
     */
    public void setPageName(String pageName) {
        this.pageName = pageName;
    }

}

这里是主类


public class LeftJoins {

    public static void main(String[] args) throws InvocationTargetException {

        JetInstance jet = Jet.bootstrappedInstance();

        IList<AddToCart1> addToCartList = jet.getList("cart");
        IList<PageVisit1> paymentList = jet.getList("page");

        // AddToCartData
        AddToCart1 ad1 = new AddToCart1();
        ad1.setNumber(1);
        ad1.setCart("lulu bazar");

        AddToCart1 ad2 = new AddToCart1();
        ad2.setNumber(2);
        ad2.setCart("krishna bazar");

        AddToCart1 ad3 = new AddToCart1();
        ad3.setNumber(3);
        ad3.setCart("ram bazar");

        addToCartList.add(ad1);
        addToCartList.add(ad2);
        addToCartList.add(ad3);

        // Page Data
        PageVisit1 pg1 = new PageVisit1();
        pg1.setNumber(1);
        pg1.setPageName("k login");

        PageVisit1 pg2 = new PageVisit1();
        pg2.setNumber(2);
        pg2.setPageName("plogin");

        paymentList.add(pg1);
        paymentList.add(pg2);

        // creating a piple-line here
        Pipeline p = Pipeline.create();
        BatchStageWithKey<AddToCart1, Object> cart = p.readFrom(Sources.<AddToCart1>list("cart"))
                .groupingKey(cart1 -> cart1.getNumber());
        BatchStageWithKey<PageVisit1, Object> page = p.readFrom(Sources.<PageVisit1>list("page"))
                .groupingKey(page1 -> page1.getNumber());

        BatchStage<Tuple2<List<PageVisit1>, List<AddToCart1>>> joinedLists1 = page.aggregate2(toList(), cart, toList())
                .map(Entry::getValue);

        BatchStage<Tuple2<List<PageVisit1>, List<AddToCart1>>> m = joinedLists1.filter(pair -> !pair.f0().isEmpty());

        m.writeTo(Sinks.logger());

        jet.newJob(p).join();
        // joinedLists.filter(pair -> !pair.isEmpty());
    }

【问题讨论】:

    标签: java hazelcast-jet


    【解决方案1】:

    代码显然不完整,因为没有page 变量,所以page.aggregate2(...) 不应该编译。

    因此,我无法向您指出问题发生的确切位置。但是,错误消息告诉您正在使用来自 JDK 的“标准”lambda,它不是 Serializable,而您应该使用来自 Jet 的那些。

    请查看this package

    编辑:

    我用上面的代码创建了一个专用的GitHub project

    一切都按预期进行。 2个元组在日志中正确显示:

    09:33:59.974 [ INFO] [c.h.j.i.c.WriteLoggerP] [05d8-7d0a-e3c0-0001/loggerSink#0] ([ch.frankel.so.PageVisit1@c544b8f8], [ch.frankel.so.AddToCart1@41c722ed])
    09:33:59.974 [ INFO] [c.h.j.i.c.WriteLoggerP] [05d8-7d0a-e3c0-0001/loggerSink#0] ([ch.frankel.so.PageVisit1@58fbcb54], [ch.frankel.so.AddToCart1@c666a2c4])
    

    【讨论】:

    • 我已经更新了这个问题,所以请您检查一下问题出在哪里,或者任何建议都会有所帮助。
    • 我用 your 代码创建了一个最小项目:github.com/nfrankel/so-jet-serializable.Everything 按预期工作。日志显示 2 个元组。
    • 嗨尼古拉斯。我运行了您通过 github 共享的代码,但仍然遇到相同的错误。Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization at ch.frankel.so.LeftJoins.$deserializeLambda$(LeftJoins.java:1) ... 59 more 我正在使用 java 11
    • 你能分享整个堆栈跟踪吗?另外,如何启动代码?我已经毫无问题地使用了 JDK 15,它也适用于 Jet 4.1,因此我们可以排除这种情况。
    • 我无法上传完整的异常 Exception in thread "main" com.hazelcast.nio.serialization.HazelcastSerializationException: Error deserializing vertex 'fused(map, filter)': com.hazelcast.nio.serialization.HazelcastSerializationException: java.io.InvalidObjectException: ReflectiveOperationException during deserialization at com.hazelcast.jet.core.Vertex.readData(Vertex.java:219) 在 leftJoins 类中将右键单击并选择作为 java 应用程序运行。
    猜你喜欢
    • 1970-01-01
    • 2021-11-30
    • 1970-01-01
    • 2023-03-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多