【问题标题】:Google Pub/Sub Java examplesGoogle Pub/Sub Java 示例
【发布时间】:2017-11-13 02:32:00
【问题描述】:

我找不到使用 java 从 pub/sub 读取消息的方法。

我在我的 pom 中使用这个 maven 依赖项

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>google-cloud-pubsub</artifactId>
  <version>0.17.2-alpha</version>
</dependency>

我实现了这个 main 方法来创建一个新主题:

public static void main(String... args) throws Exception {

        // Your Google Cloud Platform project ID
        String projectId = ServiceOptions.getDefaultProjectId();

        // Your topic ID
        String topicId = "my-new-topic-1";
        // Create a new topic
        TopicName topic = TopicName.create(projectId, topicId);
        try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
            topicAdminClient.createTopic(topic); 
        }
}

上面的代码运行良好,确实,我可以看到我使用谷歌云控制台创建的新主题。

我实现了以下主要方法来向我的主题写入消息:

public static void main(String a[]) throws InterruptedException, ExecutionException{
        String projectId = ServiceOptions.getDefaultProjectId(); 
        String topicId = "my-new-topic-1";

        String payload = "Hellooooo!!!";
        PubsubMessage pubsubMessage =
                  PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(payload)).build();

        TopicName topic = TopicName.create(projectId, topicId);

        Publisher publisher;
        try {
            publisher = Publisher.defaultBuilder(
                    topic)
                    .build();
            publisher.publish(pubsubMessage);

            System.out.println("Sent!");
        } catch (IOException e) {
            System.out.println("Not Sended!");
            e.printStackTrace();
        }
}

现在我无法验证这条消息是否真的发送了。 我想使用订阅我的主题来实现消息阅读器。 有人可以向我展示一个关于从主题中读取消息的正确且有效的 java 示例吗?

有人可以帮助我吗? 提前致谢!

【问题讨论】:

  • 欢迎来到 Stack Overflow!要求我们推荐或查找书籍、工具、软件库、教程或其他场外资源的问题对于 Stack Overflow 来说是无关紧要的,因为它们往往会吸引固执己见的答案和垃圾邮件。相反,describe the problem 以及迄今为止为解决它所做的工作。
  • 也许我不清楚。我不是在寻找教程/书籍或外部资源。我正在寻找一些代表如何通过 java 从 pubsub 读取消息的示例的 java 代码行。我会更新我的问题。
  • 这是一个很好的链接:cloud.google.com/pubsub/docs/… 显示 Receiver 部分。
  • 我卡在 topicAdminClient.createTopic(topic); 上,我在控制台中看到:com.google.auth.oauth2.DefaultCredentialsProvider warnAboutProblematicCredentials 警告:您的应用程序已使用来自 Google Cloud 的最终用户凭据进行身份验证开发工具包。我们建议大多数服务器应用程序改用服务帐户。如果您的应用程序继续使用来自 Cloud SDK 的最终用户凭据,您可能会收到“超出配额”或“API 未启用”错误。有关服务帐户的详细信息,请参阅cloud.google.com/docs/authentication。有什么想法吗??

标签: java google-cloud-pubsub


【解决方案1】:

这是使用谷歌云客户端库的版本。


package com.techm.data.client;

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;

/**
 * A snippet for Google Cloud Pub/Sub showing how to create a Pub/Sub pull
 * subscription and asynchronously pull messages from it.
 */
public class CreateSubscriptionAndConsumeMessages {

    private static String projectId = "projectId";
    private static String topicId = "topicName";
    private static String subscriptionId = "subscriptionName";

    public static void createSubscription() throws Exception {
        ProjectTopicName topic = ProjectTopicName.of(projectId, topicId);
        ProjectSubscriptionName subscription = ProjectSubscriptionName.of(projectId, subscriptionId);

        try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
            subscriptionAdminClient.createSubscription(subscription, topic, PushConfig.getDefaultInstance(), 0);
        }
    }

    public static void main(String... args) throws Exception {
        ProjectSubscriptionName subscription = ProjectSubscriptionName.of(projectId, subscriptionId);       

        createSubscription();


        MessageReceiver receiver = new MessageReceiver() {
            @Override
            public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
                System.out.println("Received message: " + message.getData().toStringUtf8());
                consumer.ack();
            }
        };
        Subscriber subscriber = null;
        try {
            subscriber = Subscriber.newBuilder(subscription, receiver).build();
            subscriber.addListener(new Subscriber.Listener() {
                @Override
                public void failed(Subscriber.State from, Throwable failure) {
                    // Handle failure. This is called when the Subscriber encountered a fatal error
                    // and is
                    // shutting down.
                    System.err.println(failure);
                }
            }, MoreExecutors.directExecutor());
            subscriber.startAsync().awaitRunning();         

            // In this example, we will pull messages for one minute (60,000ms) then stop.
            // In a real application, this sleep-then-stop is not necessary.
            // Simply call stopAsync().awaitTerminated() when the server is shutting down,
            // etc.
            Thread.sleep(60000);
        } finally {
            if (subscriber != null) {
                subscriber.stopAsync().awaitTerminated();
            }
        }
    }
}

这对我来说很好。

【讨论】:

    【解决方案2】:

    Cloud Pub/Sub Pull Subscriber Guide 具有从主题读取消息的示例代码。

    【讨论】:

    • 不幸的是,示例代码没有很好地解释如何使用客户端 API。
    • 如果您只是将创建订阅者的代码放在您的 main 方法中,那么它应该会退出。 startAsync 方法立即返回。您需要放置一些东西来代替 // ... 以保持主线程运行。
    • 如果您能详细说明一下,这将对我有很大帮助。那会是什么?等待读取消息的 Thread.sleep() 会引发错误。
    【解决方案3】:

    我没有使用谷歌云客户端库,但使用了 api 客户端库。以下是我创建订阅的方式。

    package com.techm.datapipeline.client;
    
    import java.io.IOException;
    import java.security.GeneralSecurityException;
    
    import com.google.api.client.googleapis.json.GoogleJsonResponseException;
    import com.google.api.client.http.HttpStatusCodes;
    import com.google.api.services.pubsub.Pubsub;
    import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions.Create;
    import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions.Get;
    import com.google.api.services.pubsub.Pubsub.Projects.Topics;
    import com.google.api.services.pubsub.model.ExpirationPolicy;
    import com.google.api.services.pubsub.model.Subscription;
    import com.google.api.services.pubsub.model.Topic;
    import com.techm.datapipeline.factory.PubsubFactory;
    
    public class CreatePullSubscriberClient {
    
        private final static String PROJECT_NAME = "yourProjectId";
        private final static String TOPIC_NAME = "yourTopicName";
        private final static String SUBSCRIPTION_NAME = "yourSubscriptionName";
    
        public static void main(String[] args) throws IOException, GeneralSecurityException {
            Pubsub pubSub = PubsubFactory.getService();
    
            String topicName = String.format("projects/%s/topics/%s", PROJECT_NAME, TOPIC_NAME);
            String subscriptionName = String.format("projects/%s/subscriptions/%s", PROJECT_NAME, SUBSCRIPTION_NAME);
    
            Topics.Get listReq = pubSub.projects().topics().get(topicName);
            Topic topic = listReq.execute();
    
            if (topic == null) {
                System.err.println("Topic doesn't exist...run CreateTopicClient...to create the topic");
                System.exit(0);
            }
    
            Subscription subscription = null;
            try {
                Get getReq = pubSub.projects().subscriptions().get(subscriptionName);
                subscription = getReq.execute();
            } catch (GoogleJsonResponseException e) {
                if (e.getStatusCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) {
                    System.out.println("Subscription " + subscriptionName + " does not exist...will create it");
                }
            }
    
            if (subscription != null) {
                System.out.println("Subscription already exists ==> " + subscription.toPrettyString());
                System.exit(0);
            }
    
            subscription = new Subscription();
    
            subscription.setTopic(topicName);
            subscription.setPushConfig(null); // indicating a pull
    
            ExpirationPolicy expirationPolicy = new ExpirationPolicy();
            expirationPolicy.setTtl(null); // never expires;
            subscription.setExpirationPolicy(expirationPolicy);
    
            subscription.setAckDeadlineSeconds(null); // so defaults to 10 sec
    
            subscription.setRetainAckedMessages(true);
    
            Long _week = 7L * 24 * 60 * 60;
            subscription.setMessageRetentionDuration(String.valueOf(_week)+"s");
    
            subscription.setName(subscriptionName);
    
            Create createReq = pubSub.projects().subscriptions().create(subscriptionName, subscription);
            Subscription createdSubscription = createReq.execute();
    
            System.out.println("Subscription created ==> " + createdSubscription.toPrettyString());
        }
    
    }
    

    一旦您创建了订阅(拉取类型)...这就是您从主题中拉取消息的方式。

    package com.techm.datapipeline.client;
    
    import java.io.IOException;
    import java.security.GeneralSecurityException;
    import java.util.ArrayList;
    import java.util.List;
    
    import com.google.api.client.googleapis.json.GoogleJsonResponseException;
    import com.google.api.client.http.HttpStatusCodes;
    import com.google.api.client.util.Base64;
    import com.google.api.services.pubsub.Pubsub;
    import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions.Acknowledge;
    import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions.Get;
    import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions.Pull;
    import com.google.api.services.pubsub.model.AcknowledgeRequest;
    import com.google.api.services.pubsub.model.Empty;
    import com.google.api.services.pubsub.model.PullRequest;
    import com.google.api.services.pubsub.model.PullResponse;
    import com.google.api.services.pubsub.model.ReceivedMessage;
    import com.techm.datapipeline.factory.PubsubFactory;
    
    public class PullSubscriptionsClient {
    
        private final static String PROJECT_NAME = "yourProjectId";
        private final static String SUBSCRIPTION_NAME = "yourSubscriptionName";
    
        private final static String SUBSCRIPTION_NYC_NAME = "test";
    
    
        public static void main(String[] args) throws IOException, GeneralSecurityException {
            Pubsub pubSub = PubsubFactory.getService();
    
            String subscriptionName = String.format("projects/%s/subscriptions/%s", PROJECT_NAME, SUBSCRIPTION_NAME);
            //String subscriptionName = String.format("projects/%s/subscriptions/%s", PROJECT_NAME, SUBSCRIPTION_NYC_NAME);
    
            try {
                Get getReq = pubSub.projects().subscriptions().get(subscriptionName);
                getReq.execute();
            } catch (GoogleJsonResponseException e) {
                if (e.getStatusCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) {
                    System.out.println("Subscription " + subscriptionName
                            + " does not exist...run CreatePullSubscriberClient to create");
                }
            }
    
            PullRequest pullRequest = new PullRequest();
            pullRequest.setReturnImmediately(false); // wait until you get a message
            pullRequest.setMaxMessages(1000);
    
            Pull pullReq = pubSub.projects().subscriptions().pull(subscriptionName, pullRequest);
            PullResponse pullResponse = pullReq.execute();
    
            List<ReceivedMessage> msgs = pullResponse.getReceivedMessages();
            List<String> ackIds = new ArrayList<String>();
            int i = 0;
            if (msgs != null) {
                for (ReceivedMessage msg : msgs) {
                    ackIds.add(msg.getAckId());
                    //System.out.println(i++ + ":===:" + msg.getAckId());
                    String object = new String(Base64.decodeBase64(msg.getMessage().getData()));
                    System.out.println("Decoded object String ==> " + object );
                }
    
                //acknowledge all the received messages
                AcknowledgeRequest content = new AcknowledgeRequest();
                content.setAckIds(ackIds);
                Acknowledge ackReq = pubSub.projects().subscriptions().acknowledge(subscriptionName, content);
                Empty empty = ackReq.execute();
            }
    
        }
    
    }
    

    注意:此客户端只会等待直到它收到至少一条消息,并在它一次收到一条消息(达到最大值 - 在 MaxMessages 中设置)时终止.

    如果这有帮助,请告诉我。我将很快尝试云客户端库,并在我掌握它们后发布更新。

    这是缺少的工厂类...如果您打算运行它...

    package com.techm.datapipeline.factory;
    
    
    import java.io.IOException;
    import java.security.GeneralSecurityException;
    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.logging.Level;
    import java.util.logging.Logger;
    
    import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
    import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
    import com.google.api.client.http.HttpTransport;
    import com.google.api.client.json.JsonFactory;
    import com.google.api.client.json.jackson2.JacksonFactory;
    import com.google.api.services.pubsub.Pubsub;
    import com.google.api.services.pubsub.PubsubScopes;
    
    public class PubsubFactory {
    
        private static Pubsub instance = null;
        private static final Logger logger = Logger.getLogger(PubsubFactory.class.getName());
    
        public static synchronized Pubsub getService() throws IOException, GeneralSecurityException {
            if (instance == null) {
                instance = buildService();
            }
            return instance;
        }
    
        private static Pubsub buildService() throws IOException, GeneralSecurityException {
            logger.log(Level.FINER, "Start of buildService");
            HttpTransport transport = GoogleNetHttpTransport.newTrustedTransport();
            JsonFactory jsonFactory = new JacksonFactory();
            GoogleCredential credential = GoogleCredential.getApplicationDefault(transport, jsonFactory);
    
            // Depending on the environment that provides the default credentials (for
            // example: Compute Engine, App Engine), the credentials may require us to
            // specify the scopes we need explicitly. 
            if (credential.createScopedRequired()) {
                Collection<String> scopes = new ArrayList<>();
                scopes.add(PubsubScopes.PUBSUB);
                credential = credential.createScoped(scopes);
            }
    
            logger.log(Level.FINER, "End of buildService");
    
            // TODO - Get the application name from outside.
            return new Pubsub.Builder(transport, jsonFactory, credential).setApplicationName("Your Application Name/Version")
                    .build();
        }
    
    }
    

    【讨论】:

      【解决方案4】:

      消息阅读器被注入订阅者。 code 的这一部分将处理消息:

      MessageReceiver receiver =
          new MessageReceiver() {
            @Override
            public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
              // handle incoming message, then ack/nack the received message
              System.out.println("Id : " + message.getMessageId());
              System.out.println("Data : " + message.getData().toStringUtf8());
              consumer.ack();
            }
          };
      

      【讨论】:

        猜你喜欢
        • 2019-10-09
        • 2022-01-13
        • 1970-01-01
        • 2017-12-07
        • 1970-01-01
        • 2021-06-29
        • 2017-09-07
        • 2021-12-12
        • 1970-01-01
        相关资源
        最近更新 更多