【问题标题】:Where is the jar for software.amazon.kinesis.*software.amazon.kinesis.* 的 jar 在哪里
【发布时间】:2019-01-21 00:02:10
【问题描述】:

Java 用户为 0:

我一直在关注这个页面:https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html 它要我导入这些:

    import software.amazon.kinesis.exceptions.InvalidStateException;
    import software.amazon.kinesis.exceptions.ShutdownException;
    import software.amazon.kinesis.lifecycle.events.InitializationInput;
    import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
    import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
    import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
    import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
    import software.amazon.kinesis.processor.ShardRecordProcessor;

我正在从 Java 迁移从 AWS SDK 2.0 下载的示例文件,看起来亚马逊还没有开始迁移? https://docs.aws.amazon.com/sdk-for-java/v2/developer-guide/advanced-topics.html

但对于我来说,我无法找到包含 software.amazon.kinesis.* 类的 JAR 文件,即使在谷歌搜索和搜索 Maven Central 等几个小时之后也是如此。

这是我迁移的目标/pom.xml:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>com.amazonaws</groupId>
      <artifactId>samples</artifactId>
      <version>1.0.0</version>
      <dependencies>
        <dependency>
          <groupId>com.amazonaws</groupId>
          <artifactId>aws-java-sdk</artifactId>
          <version>1.11.486</version>
          <scope>compile</scope>
        </dependency>
        <dependency>
          <groupId>com.amazonaws</groupId>
          <artifactId>amazon-kinesis-client</artifactId>
          <version>1.9.3</version>
          <scope>compile</scope>
        </dependency>
      </dependencies>
    </project>

罐子在哪里?谢谢你的建议。

【问题讨论】:

    标签: amazon-kinesis


    【解决方案1】:

    你可以在这里找到罐子:https://mvnrepository.com/artifact/software.amazon.kinesis/amazon-kinesis-client

    并且你需要在你的 pom 中添加以下依赖:

        <dependency>
            <groupId>software.amazon.kinesis</groupId>
            <artifactId>amazon-kinesis-client</artifactId>
            <version>2.1.0</version>
        </dependency>
    

    【讨论】:

      【解决方案2】:

      Jar 工作了,这是我为那些做同样事情的人迁移的结果:

      package com.amazonaws.samples;
      
      import java.util.UUID;
      
      import com.amazonaws.AmazonClientException;
      import com.amazonaws.auth.profile.ProfileCredentialsProvider;
      import com.amazonaws.regions.Regions;
      import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
      import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
      import com.amazonaws.services.kinesis.AmazonKinesis;
      import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
      import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
      
      import software.amazon.awssdk.regions.Region;
      import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
      import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
      import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
      import software.amazon.kinesis.common.ConfigsBuilder;
      import software.amazon.kinesis.common.InitialPositionInStream;
      import software.amazon.kinesis.coordinator.Scheduler;
      
      /**
       * Sample Amazon Kinesis Application.
       */
      public final class AmazonKinesisApplicationSample {
      
          /*
           * Before running the code:
           *      Fill in your AWS access credentials in the provided credentials
           *      file template, and be sure to move the file to the default location
           *      (/home/user1/.aws/credentials) where the sample code will load the
           *      credentials from.
           *      https://console.aws.amazon.com/iam/home?#security_credential
           *
           * WARNING:
           *      To avoid accidental leakage of your credentials, DO NOT keep
           *      the credentials file in your source directory.
           */
      
          public static final String SAMPLE_APPLICATION_STREAM_NAME = "myFirstStream";
      
          private static final String SAMPLE_APPLICATION_NAME = "SampleKinesisApplication";
      
          // Initial position in the stream when the application starts up for the first time.
          // Position can be one of LATEST (most recent data) or TRIM_HORIZON (oldest available data)
          private static final InitialPositionInStream SAMPLE_APPLICATION_INITIAL_POSITION_IN_STREAM =
                  InitialPositionInStream.LATEST;
      
          private static ProfileCredentialsProvider credentialsProvider;
      
          private static void init() {
              // Ensure the JVM will refresh the cached IP values of AWS resources (e.g. service endpoints).
              java.security.Security.setProperty("networkaddress.cache.ttl", "60");
      
              /*
               * The ProfileCredentialsProvider will return your [user1]
               * credential profile by reading from the credentials file located at
               * (/home/user1/.aws/credentials).
               */
              credentialsProvider = new ProfileCredentialsProvider("user1");
              try {
                  credentialsProvider.getCredentials();
              } catch (Exception e) {
                  throw new AmazonClientException("Cannot load the credentials from the credential profiles file. "
                          + "Please make sure that your credentials file is at the correct "
                          + "location (/home/user1/.aws/credentials), and is in valid format.", e);
              }
          }
      
          public static void deleteResources() {
              // Delete the stream
              AmazonKinesis kinesis = AmazonKinesisClientBuilder.standard()
                  .withCredentials(credentialsProvider)
                  .withRegion(Regions.US_EAST_1)
                  .build();
      
              System.out.printf("Deleting the Amazon Kinesis stream used by the sample. Stream Name = %s.\n",
                      SAMPLE_APPLICATION_STREAM_NAME);
              try {
                  kinesis.deleteStream(SAMPLE_APPLICATION_STREAM_NAME);
              } catch (ResourceNotFoundException ex) {
                  // The stream doesn't exist.
              }
      
              // Delete the table
              AmazonDynamoDB dynamoDB = AmazonDynamoDBClientBuilder.standard()
                  .withCredentials(credentialsProvider)
                  .withRegion("us-west-2")
                  .build();
              System.out.printf("Deleting the Amazon DynamoDB table used by the Amazon Kinesis Client Library. Table Name = %s.\n",
                      SAMPLE_APPLICATION_NAME);
              try {
                  dynamoDB.deleteTable(SAMPLE_APPLICATION_NAME);
              } catch (com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException ex) {
                  // The table doesn't exist.
              }
          }
      
          public static void main(String[] args) throws Exception {
              init();
      
              if (args.length == 1 && "delete-resources".equals(args[0])) {
                  deleteResources();
                  return;
              }
      
              Region region = Region.US_EAST_1;
              KinesisAsyncClient kinesisClient = KinesisAsyncClient.builder().region(region).build();
              DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
              CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
      
              ConfigsBuilder configsBuilder = new ConfigsBuilder(SAMPLE_APPLICATION_STREAM_NAME, SAMPLE_APPLICATION_NAME, 
                      kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new AmazonKinesisApplicationRecordProcessorFactory());
      
              Scheduler scheduler = new Scheduler(
                      configsBuilder.checkpointConfig(),
                      configsBuilder.coordinatorConfig(),
                      configsBuilder.leaseManagementConfig(),
                      configsBuilder.lifecycleConfig(),
                      configsBuilder.metricsConfig(),
                      configsBuilder.processorConfig(),
                      configsBuilder.retrievalConfig()
                      );
      
      
              System.out.printf("Running %s to process stream %s as worker %s...\n",
                      SAMPLE_APPLICATION_NAME,
                      SAMPLE_APPLICATION_STREAM_NAME,
                      null);
      
              int exitCode = 0;
              try {
                  scheduler.run();
              } catch (Throwable t) {
                  System.err.println("Caught throwable while processing data.");
                  t.printStackTrace();
                  exitCode = 1;
              }
              System.exit(exitCode);
          }
      }
      
      package com.amazonaws.samples;
      /*
       * Copyright 2012-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
       *
       * Licensed under the Apache License, Version 2.0 (the "License").
       * You may not use this file except in compliance with the License.
       * A copy of the License is located at
       *
       *  http://aws.amazon.com/apache2.0
       *
       * or in the "license" file accompanying this file. This file is distributed
       * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
       * express or implied. See the License for the specific language governing
       * permissions and limitations under the License.
       */
      
      import java.nio.charset.CharacterCodingException;
      import java.nio.charset.Charset;
      import java.nio.charset.CharsetDecoder;
      import java.util.List;
      
      import org.apache.commons.logging.Log;
      import org.apache.commons.logging.LogFactory;
      
      import software.amazon.kinesis.exceptions.InvalidStateException;
      import software.amazon.kinesis.exceptions.ShutdownException;
      import software.amazon.kinesis.exceptions.ThrottlingException;
      import software.amazon.kinesis.lifecycle.events.InitializationInput;
      import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
      import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
      import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
      import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
      import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
      import software.amazon.kinesis.processor.ShardRecordProcessor;
      import software.amazon.kinesis.retrieval.KinesisClientRecord;
      
      /**
       * Processes records and checkpoints progress.
       */
      public class AmazonKinesisApplicationSampleRecordProcessor implements ShardRecordProcessor {
      
          private static final Log LOG = LogFactory.getLog(AmazonKinesisApplicationSampleRecordProcessor.class);
          private String kinesisShardId;
      
          // Backoff and retry settings
          private static final long BACKOFF_TIME_IN_MILLIS = 3000L;
          private static final int NUM_RETRIES = 10;
      
          // Checkpoint about once a minute
          private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L;
          private long nextCheckpointTimeInMillis;
      
          private final CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
      
          /**
           * {@inheritDoc}
           */
          @Override
          public void initialize(InitializationInput initializationInput) {
              LOG.info("Initializing record processor for shard: " + initializationInput.shardId());
              this.kinesisShardId = initializationInput.shardId();
          }
      
          /**
           * {@inheritDoc}
           */
          @Override
          public void processRecords(ProcessRecordsInput processRecordsInput) {
              LOG.info("Processing " + processRecordsInput.records().size() + " records from " + kinesisShardId);
      
              // Process records and perform all exception handling.
              processRecordsWithRetries(processRecordsInput.records());
      
              // Checkpoint once every checkpoint interval.
              if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
                  checkpoint(processRecordsInput.checkpointer());
                  nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
              }
          }
      
          /**
           * Process records performing retries as needed. Skip "poison pill" records.
           * 
           * @param list Data records to be processed.
           */
          private void processRecordsWithRetries(List<KinesisClientRecord> list) {
              for (KinesisClientRecord record : list) {
                  boolean processedSuccessfully = false;
                  for (int i = 0; i < NUM_RETRIES; i++) {
                      try {
                          //
                          // Logic to process record goes here.
                          //
                          processSingleRecord(record);
      
                          processedSuccessfully = true;
                          break;
                      } catch (Throwable t) {
                          LOG.warn("Caught throwable while processing record " + record, t);
                      }
      
                      // backoff if we encounter an exception.
                      try {
                          Thread.sleep(BACKOFF_TIME_IN_MILLIS);
                      } catch (InterruptedException e) {
                          LOG.debug("Interrupted sleep", e);
                      }
                  }
      
                  if (!processedSuccessfully) {
                      LOG.error("Couldn't process record " + record + ". Skipping the record.");
                  }
              }
          }
      
          /**
           * Process a single record.
           * 
           * @param record The record to be processed.
           */
          private void processSingleRecord(KinesisClientRecord record) {
              // TODO Add your own record processing logic here
      
              String data = null;
              try {
                  // For this app, we interpret the payload as UTF-8 chars.
                  data = decoder.decode(record.data()).toString();
                  // Assume this record came from AmazonKinesisSample and log its age.
                  long recordCreateTime = new Long(data.substring("testData-".length()));
                  long ageOfRecordInMillis = System.currentTimeMillis() - recordCreateTime;
      
                  LOG.info(record.sequenceNumber() + ", " + record.partitionKey() + ", " + data + ", Created "
                          + ageOfRecordInMillis + " milliseconds ago.");
              } catch (NumberFormatException e) {
                  LOG.info("Record does not match sample record format. Ignoring record with data; " + data);
              } catch (CharacterCodingException e) {
                  LOG.error("Malformed data: " + data, e);
              }
          }
      
          /** Checkpoint with retries.
           * @param checkpointer
           */
          private void checkpoint(RecordProcessorCheckpointer checkpointer) {
              LOG.info("Checkpointing shard " + kinesisShardId);
              for (int i = 0; i < NUM_RETRIES; i++) {
                  try {
                      checkpointer.checkpoint();
                      break;
                  } catch (ShutdownException se) {
                      // Ignore checkpoint if the processor instance has been shutdown (fail over).
                      LOG.info("Caught shutdown exception, skipping checkpoint.", se);
                      break;
                  } catch (ThrottlingException e) {
                      // Backoff and re-attempt checkpoint upon transient failures
                      if (i >= (NUM_RETRIES - 1)) {
                          LOG.error("Checkpoint failed after " + (i + 1) + "attempts.", e);
                          break;
                      } else {
                          LOG.info("Transient issue when checkpointing - attempt " + (i + 1) + " of "
                                  + NUM_RETRIES, e);
                      }
                  } catch (InvalidStateException e) {
                      // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS).
                      LOG.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
                      break;
                  }
                  try {
                      Thread.sleep(BACKOFF_TIME_IN_MILLIS);
                  } catch (InterruptedException e) {
                      LOG.debug("Interrupted sleep", e);
                  }
              }
          }
      
      
          public void shardEnded(ShardEndedInput shardEndedInput) {
              try {
                  shardEndedInput.checkpointer().checkpoint();
              } catch (ShutdownException | InvalidStateException e) {
                  //
                  // Swallow the exception
                  //
                  e.printStackTrace();
              }       
          }
          @Override
          public void leaseLost(LeaseLostInput leaseLostInput) {
      
          }
      
      
          @Override
          public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
              try {
                  shutdownRequestedInput.checkpointer().checkpoint();
              } catch (ShutdownException | InvalidStateException e) {
                  //
                  // Swallow the exception
                  //
                  e.printStackTrace();
              }
          }
      }
      
      
      package com.amazonaws.samples;
      
      import software.amazon.kinesis.processor.ShardRecordProcessor;
      import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
      
      /**
       * Used to create new record processors.
       */
      public class AmazonKinesisApplicationRecordProcessorFactory implements ShardRecordProcessorFactory {
      
          @Override
          public ShardRecordProcessor shardRecordProcessor() {
              return new AmazonKinesisApplicationSampleRecordProcessor();
          }
      }
      
      
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2017-05-28
        • 2017-03-07
        • 1970-01-01
        • 2016-04-15
        • 1970-01-01
        相关资源
        最近更新 更多