示例实现beam用java编程,监听kafka的testmsg主题,然后将收取到的单词,按5秒做一次统计。结果输出到outputmessage 的kafka主题,同时同步到elasticSearch。

kafka需要运行

启动:
cd /root/kafuka/kafka_2.12-0.11.0.0 nohup bin/zookeeper-server-start.sh config/zookeeper.properties & nohup bin/kafka-server-start.sh config/server.properties & 创建topic: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testmsg bin/kafka-topics.sh --list --zookeeper localhost:2181 生产者producer bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 消费者consumer bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

elasticSearch

创建索引Put http://192.168.11.100:9200/myindex?pretty
查看所有索引: http://192.168.11.100:9200/_cat/indices?v

获取内容Get http://192.168.11.100:9200/myindex/_search?q=*&pretty
http://192.168.11.100:9200/myindex/_search?q=*&sort=_id:desc&pretty

用mvn自动生成项目代码:

windows在powershell中运行:
 mvn archetype:generate `
 -D archetypeGroupId=org.apache.beam `
 -D archetypeArtifactId=beam-sdks-java-maven-archetypes-examples `
 -D archetypeVersion=2.8.0 `
 -D groupId=org.example `
 -D artifactId=word-count-beam `
 -D version="0.1" `
 -D package=org.apache.beam.examples `
 -D interactiveMode=false

其他参考beam官方文档: <https://beam.apache.org/get-started/quickstart-java/> 

替换pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
    The ASF licenses this file to You under the Apache License, Version 2.0
    (the "License"); you may not use this file except in compliance with
    the License.  You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>org.example</groupId>
  <artifactId>word-count-beam</artifactId>
  <version>0.1</version>

  <packaging>jar</packaging>

  <properties>
    <beam.version>2.8.0</beam.version>

    <bigquery.version>v2-rev402-1.24.1</bigquery.version>
    <google-clients.version>1.24.1</google-clients.version>
    <guava.version>20.0</guava.version>
    <hamcrest.version>1.3</hamcrest.version>
    <jackson.version>2.9.5</jackson.version>
    <joda.version>2.4</joda.version>
    <junit.version>4.12</junit.version>
    <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
    <maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
    <maven-jar-plugin.version>3.0.2</maven-jar-plugin.version>
    <maven-shade-plugin.version>3.1.0</maven-shade-plugin.version>
    <mockito.version>1.10.19</mockito.version>
    <pubsub.version>v1-rev399-1.24.1</pubsub.version>
    <slf4j.version>1.7.25</slf4j.version>
    <spark.version>2.3.2</spark.version>
    <hadoop.version>2.7.3</hadoop.version>
    <maven-surefire-plugin.version>2.21.0</maven-surefire-plugin.version>
    
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <repositories>
    <repository>
      <id>apache.snapshots</id>
      <name>Apache Development Snapshot Repository</name>
      <url>https://repository.apache.org/content/repositories/snapshots/</url>
      <releases>
        <enabled>false</enabled>
      </releases>
      <snapshots>
        <enabled>true</enabled>
      </snapshots>
    </repository>
  </repositories>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>${maven-compiler-plugin.version}</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>${maven-surefire-plugin.version}</version>
        <configuration>
          <parallel>all</parallel>
          <threadCount>4</threadCount>
          <redirectTestOutputToFile>true</redirectTestOutputToFile>
        </configuration>
        <dependencies>
          <dependency>
            <groupId>org.apache.maven.surefire</groupId>
            <artifactId>surefire-junit47</artifactId>
            <version>${maven-surefire-plugin.version}</version>
          </dependency>
        </dependencies>
      </plugin>

      <!-- Ensure that the Maven jar plugin runs before the Maven
        shade plugin by listing the plugin higher within the file. -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-jar-plugin</artifactId>
        <version>${maven-jar-plugin.version}</version>
      </plugin>

      <!--
        Configures `mvn package` to produce a bundled jar ("fat jar") for runners
        that require this for job submission to a cluster.
      -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>${maven-shade-plugin.version}</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <finalName>${project.artifactId}-bundled-${project.version}</finalName>
              <filters>
                <filter>
                  <artifact>*:*</artifact>
                  <excludes>
                    <exclude>META-INF/LICENSE</exclude>
                    <exclude>META-INF/*.SF</exclude>
                    <exclude>META-INF/*.DSA</exclude>
                    <exclude>META-INF/*.RSA</exclude>
                  </excludes>
                </filter>
              </filters>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
              </transformers>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>

    <pluginManagement>
      <plugins>
        <plugin>
          <groupId>org.codehaus.mojo</groupId>
          <artifactId>exec-maven-plugin</artifactId>
          <version>${maven-exec-plugin.version}</version>
          <configuration>
            <cleanupDaemonThreads>false</cleanupDaemonThreads>
          </configuration>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>

  <profiles>
    <profile>
      <id>direct-runner</id>
      <activation>
        <activeByDefault>true</activeByDefault>
      </activation>
      <!-- Makes the DirectRunner available when running a pipeline. -->
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-direct-java</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
      </dependencies>
    </profile>

    <profile>
      <id>apex-runner</id>
      <!-- Makes the ApexRunner available when running a pipeline. -->
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-apex</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
        <!--
          Apex depends on httpclient version 4.3.6, project has a transitive dependency to httpclient 4.0.1 from
          google-http-client. Apex dependency version being specified explicitly so that it gets picked up. This
          can be removed when the project no longer has a dependency on a different httpclient version.
        -->
        <dependency>
          <groupId>org.apache.httpcomponents</groupId>
          <artifactId>httpclient</artifactId>
          <version>4.3.6</version>
          <scope>runtime</scope>
          <exclusions>
            <exclusion>
              <groupId>commons-codec</groupId>
              <artifactId>commons-codec</artifactId>
            </exclusion>
          </exclusions>
        </dependency>
        <!--
          Apex 3.6 is built against YARN 2.6. Version in the fat jar has to match
          what's on the cluster, hence we need to repeat the Apex Hadoop dependencies here.
        -->
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-yarn-client</artifactId>
          <version>${hadoop.version}</version>
          <scope>runtime</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-common</artifactId>
          <version>${hadoop.version}</version>
          <scope>runtime</scope>
        </dependency>
      </dependencies>
    </profile>

    <profile>
      <id>dataflow-runner</id>
      <!-- Makes the DataflowRunner available when running a pipeline. -->
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
      </dependencies>
    </profile>

    <profile>
      <id>flink-runner</id>
      <!-- Makes the FlinkRunner available when running a pipeline. -->
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-flink_2.11</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
      </dependencies>
    </profile>

    <profile>
      <id>spark-runner</id>
      <!-- Makes the SparkRunner available when running a pipeline. Additionally,
           overrides some Spark dependencies to Beam-compatible versions. -->
      <properties>
        <netty.version>4.1.17.Final</netty.version>
      </properties>
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-spark</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.11</artifactId>
          <version>${spark.version}</version>
          <scope>runtime</scope>
          <exclusions>
            <exclusion>
              <groupId>org.slf4j</groupId>
              <artifactId>jul-to-slf4j</artifactId>
            </exclusion>
          </exclusions>
        </dependency>
        <dependency>
          <groupId>com.fasterxml.jackson.module</groupId>
          <artifactId>jackson-module-scala_2.11</artifactId>
          <version>${jackson.version}</version>
          <scope>runtime</scope>
        </dependency>
        <!-- [BEAM-3519] GCP IO exposes netty on its API surface, causing conflicts with runners -->
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
          <version>${beam.version}</version>
          <exclusions>
            <exclusion>
              <groupId>io.grpc</groupId>
              <artifactId>grpc-netty</artifactId>
            </exclusion>
            <exclusion>
              <groupId>io.netty</groupId>
              <artifactId>netty-handler</artifactId>
            </exclusion>
          </exclusions>
        </dependency>
      </dependencies>
    </profile>
    <profile>
      <id>gearpump-runner</id>
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-gearpump</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
      </dependencies>
    </profile>
  </profiles>

  <dependencies>
    <!-- Adds a dependency on the Beam SDK. -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-core</artifactId>
      <version>${beam.version}</version>
    </dependency>

    <!-- Adds a dependency on the Beam Google Cloud Platform IO module. -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
      <version>${beam.version}</version>
    </dependency>

    <!-- Dependencies below this line are specific dependencies needed by the examples code. -->
    <dependency>
      <groupId>com.google.api-client</groupId>
      <artifactId>google-api-client</artifactId>
      <version>${google-clients.version}</version>
      <exclusions>
        <!-- Exclude an old version of guava that is being pulled
             in by a transitive dependency of google-api-client -->
        <exclusion>
          <groupId>com.google.guava</groupId>
          <artifactId>guava-jdk5</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>com.google.apis</groupId>
      <artifactId>google-api-services-bigquery</artifactId>
      <version>${bigquery.version}</version>
      <exclusions>
        <!-- Exclude an old version of guava that is being pulled
             in by a transitive dependency of google-api-client -->
        <exclusion>
          <groupId>com.google.guava</groupId>
          <artifactId>guava-jdk5</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>com.google.http-client</groupId>
      <artifactId>google-http-client</artifactId>
      <version>${google-clients.version}</version>
      <exclusions>
        <!-- Exclude an old version of guava that is being pulled
             in by a transitive dependency of google-api-client -->
        <exclusion>
          <groupId>com.google.guava</groupId>
          <artifactId>guava-jdk5</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>com.google.apis</groupId>
      <artifactId>google-api-services-pubsub</artifactId>
      <version>${pubsub.version}</version>
      <exclusions>
        <!-- Exclude an old version of guava that is being pulled
             in by a transitive dependency of google-api-client -->
        <exclusion>
          <groupId>com.google.guava</groupId>
          <artifactId>guava-jdk5</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>joda-time</groupId>
      <artifactId>joda-time</artifactId>
      <version>${joda.version}</version>
    </dependency>

    <dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
      <version>${guava.version}</version>
    </dependency>

    <!-- Add slf4j API frontend binding with JUL backend -->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>${slf4j.version}</version>
    </dependency>

    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-jdk14</artifactId>
      <version>${slf4j.version}</version>
      <!-- When loaded at runtime this will wire up slf4j to the JUL backend -->
      <scope>runtime</scope>
    </dependency>

    <!-- Hamcrest and JUnit are required dependencies of PAssert,
         which is used in the main code of DebuggingWordCount example. -->
    <dependency>
      <groupId>org.hamcrest</groupId>
      <artifactId>hamcrest-core</artifactId>
      <version>${hamcrest.version}</version>
    </dependency>
    
    <dependency>
      <groupId>org.hamcrest</groupId>
      <artifactId>hamcrest-library</artifactId>
      <version>${hamcrest.version}</version>
    </dependency>

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>${junit.version}</version>
    </dependency>

    <!-- The DirectRunner is needed for unit tests. -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-runners-direct-java</artifactId>
      <version>${beam.version}</version>
      <scope>test</scope>
    </dependency>
    
    <dependency>
      <groupId>org.mockito</groupId>
      <artifactId>mockito-core</artifactId>
      <version>${mockito.version}</version>
      <scope>test</scope>
    </dependency>

    <!-- kafka -->
    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-io-kafka</artifactId>
        <version>${beam.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.11.0.0</version>
    </dependency>

    <!-- kafka -->
    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-io-elasticsearch</artifactId>
        <version>${beam.version}</version>
    </dependency>

    
    
  </dependencies>
</project>
View Code

相关文章:

  • 2021-03-31
  • 2021-09-17
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2022-01-19
  • 2021-07-11
猜你喜欢
  • 2023-03-21
  • 2022-12-23
  • 2022-12-23
  • 2022-02-16
相关资源
相似解决方案