Spring Cloud Data Flow--大数据操作工具,作为Spring XD的替代产品,它是一个混合计算模型,结合了流数据与批量数据的处理方式。为数据微服务提供了业务流程,包括长期的 Spring Cloud Stream 应用程序和短期的 Spring Cloud Task 应用程序。

之前很少写博客,主要是国内相关资料少之又少,踩了很多坑,谷歌也没多大帮助,深知那种无助的感觉,所以记录一下,与喜欢研究技术的朋友分享一下,希望此文会有小小的帮助,微服务的明天会更好Spring Cloud Data Flow Server for Apache Mesos 适用于mesos平台的springcloud数据流服务器(DCOS构建)

1.通过vagrant安装DCOS,我们会得到一个Marathon端点url:http://m1.dcos/service/marathon,留作配置用,如果是其它比如CLI、GUI的安装方式,也可配置其服务的ip地址。service安装这里不做过多介绍,可以通过dcos的universe安装,也可以通过json文件安装

Spring Cloud Data Flow Server for Apache Mesos 适用于mesos平台的springcloud数据流服务器(DCOS构建)

2.安装mysql可以通过DCOS进行安装,如果你有一个mysql数据库亦可不必安装

curl -X POST http://m1.dcos/service/marathon/v2/apps -d @mysql.json -H "Content-type: application/json"

3.安装rabbitmq,如果你有一个rabbitmq服务器亦可不必安装,此处注意,对于springboot应用,应当在应用配置文件中配置mq的用户名密码,如:

Spring Cloud Data Flow Server for Apache Mesos 适用于mesos平台的springcloud数据流服务器(DCOS构建)

curl -X POST http://m1.dcos/service/marathon/v2/apps -d @rabbitmq.json -H "Content-type: application/json"

4.安装redis,如果你有一个redis服务器亦可不必安装

curl -X POST http://m1.dcos/service/marathon/v2/apps -d @redis.json -H "Content-type: application/json"

5.安装chronos,如果你有一个chronos服务器亦可不必安装,速度慢说明拉取镜像慢,此处共享下百度云链接:http://pan.baidu.com/s/1dFaQvSX  密码:3j9c,下载到agent节点直接docker load < chronos.tar,docker images查看即可,重新执行,速度杠杠滴。

dcos package install chronos

6.获取springclouddataflow的json配置文件

$ wget https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-server-mesos/v1.0.0.RELEASE/src/etc/marathon/scdf-server.json

7.直接贴上我的配置文件

{

  "id": "/spring-cloud-data-flow",

  "instances": 1,

  "cpus": 2,

  "mem": 4000,

  "disk": 3000,

  "gpus": 0,

  "backoffSeconds": 1,

  "backoffFactor": 1.15,

  "maxLaunchDelaySeconds": 3600,

  "container": {

    "type": "DOCKER",

    "docker": {

      "image": "springcloud/spring-cloud-dataflow-server-mesos:latest",

      "network": "BRIDGE",

      "portMappings": [

        {

          "containerPort": 9393,

          "hostPort": 0,

          "servicePort": 10000,

          "protocol": "tcp",

          "name": "default"

        }

      ],

      "privileged": false,

      "forcePullImage": false

    }

  },

  "healthChecks": [

    {

      "gracePeriodSeconds": 120,

      "intervalSeconds": 60,

      "timeoutSeconds": 20,

      "maxConsecutiveFailures": 0,

      "portIndex": 0,

      "path": "/management/health",

      "protocol": "HTTP",

      "ignoreHttp1xx": false

    }

  ],

  "upgradeStrategy": {

    "minimumHealthCapacity": 1,

    "maximumOverCapacity": 1

  },

  "unreachableStrategy": {

    "inactiveAfterSeconds": 300,

    "expungeAfterSeconds": 600

  },

  "killSelection": "YOUNGEST_FIRST",

  "requirePorts": true,

  "env": {

    "JDBC_DRIVER": "org.mariadb.jdbc.Driver",

    "MESOS_CHRONOS_URI": "http://172.16.1.77:10105",

    "REDIS_HOST": "172.16.1.61",

    "RABBITMQ_PORT": "6392",

    "MESOS_MARATHON_URI": "http://m1.dcos/service/marathon",

    "REDIS_PORT": "6379",

    "JDBC_PASSWORD": "1234321",

    "JDBC_URL": "jdbc:mysql://172.16.1.145:3306/test",

    "SPRING_APPLICATION_JSON": "{\"spring.cloud.deployer.mesos.marathon.apiEndpoint\":\"${MESOS_MARATHON_URI}\",\"spring.cloud.deployer.mesos.chronos.apiEndpoint\":\"${MESOS_CHRONOS_URI}\",\"spring.datasource.url\":\"${JDBC_URL}\",\"spring.datasource.driverClassName\":\"${JDBC_DRIVER}\",\"spring.datasource.username\":\"${JDBC_USERNAME}\",\"spring.datasource.password\":\"${JDBC_PASSWORD}\",\"spring.datasource.testOnBorrow\":true,\"spring.datasource.validationQuery\":\"SELECT 1\",\"spring.redis.host\":\"${REDIS_HOST}\",\"spring.redis.port\":\"${REDIS_PORT}\",\"spring.cloud.deployer.mesos.marathon.environmentVariables\":\"SPRING_RABBITMQ_HOST=${RABBITMQ_HOST},SPRING_RABBITMQ_PORT=${RABBITMQ_PORT}\",\"spring.cloud.deployer.mesos.dcos.authorizationToken\":\"${DCOS_TOKEN}\",\"spring.cloud.config.enabled\":false,\"spring.freemarker.checkTemplateLocation\":false,\"spring.cloud.deployer.mesos.marathon.memory\":\"3000\",\"spring.dataflow.embedded.database.enabled\":false}",

    "RABBITMQ_HOST": "172.16.1.77",

    "JDBC_USERNAME": "root"

  }

}


这里对几点做说明:(1)"image": "springcloud/spring-cloud-dataflow-server-mesos:latest",对应的agent节点docker中的镜像,直接运行的话,首先它会下载镜像,由于你懂得,速度慢成翔,你可

以docker去pull喝着大茶慢慢等,如果你别处有此镜像也可导入到你的agent节点的docker中。此处共享下导出的镜像百度云地址链接:http://pan.baidu.com/s/1hstbj5E  

密码:56nk,下载后直接docker load < springdataflow.tar,如果你有搭建docker的本地远程仓库,也可将镜像打个tag推送到仓库,agent的docker直接从仓库中pull。

(2)配置marathon还有chronos地址,也可以配置ip地址,浏览器能正常访问即可

"MESOS_MARATHON_URI": "http://m1.dcos/service/marathon",
"MESOS_CHRONOS_URI": "http://m1.dcos/service/chronos",
ip可以通过service的detail中查看,logs可以看到其运行日志

Spring Cloud Data Flow Server for Apache Mesos 适用于mesos平台的springcloud数据流服务器(DCOS构建)

(3)配置mysql

"JDBC_URL": "jdbc:mysql://",
"JDBC_DRIVER": "org.mariadb.jdbc.Driver",
"JDBC_USERNAME": "",
"JDBC_PASSWORD": "",

(4)配置mq

"RABBITMQ_HOST": "",
"RABBITMQ_PORT": "",

(5)配置redis

"REDIS_HOST": "",
"REDIS_PORT": "",

(6)如果DCOS是开启权限认证,则需要配置

DCOS_TOKEN

获取token方法如下:

curl https://downloads.dcos.io/binaries/cli/linux/x86-64/dcos-1.9/dcos -o dcos && 
sudo mv dcos /usr/local/bin && 
sudo chmod +x /usr/local/bin/dcos && 
dcos config set core.dcos_url http://XXXXXXXXX && 
dcos

安装dcos的cli,配置你的master地址,然后dcos auth login 出现一个地址,http://XXXXXX/login?redirect_uri=urn:ietf:wg:oauth:2.0:oob贴到浏览器获取一个token

Spring Cloud Data Flow Server for Apache Mesos 适用于mesos平台的springcloud数据流服务器(DCOS构建)

然后复制到控制台,登录成功。然后 dcos config show core.dcos_acs_token 出现的token可以copy到

DCOS_TOKEN

还可以配置maven地址,可参阅文档:http://docs.spring.io/spring-cloud-dataflow/docs/1.2.2.BUILD-SNAPSHOT/reference/htmlsingle/#arch-data-flow-server,但是注册app的时候,直接配置jar的地址或者maven地址,都会出现异常,看了下源码,在deploy的时候jar会生成一个临时镜像,然后通过fegin去请求marathon的api,但是请求一直会报异常null或者:reason不啦不啦的,所以我是把每一个流的jar都通过docker打包镜像push到远程仓库,配置本地docker仓库地址去拉取镜像,还有需要注意的地方就是注册app的名称还有stream的名称的时候全部用小写,不然fegin请求的时候也会报异常。若有大神知道jar或者maven配置的正确方法,请分享与我,谢谢。

Spring Cloud Data Flow Server for Apache Mesos 适用于mesos平台的springcloud数据流服务器(DCOS构建)

8.然后就可以通过json文件运行了,页面也是可以操作的

Spring Cloud Data Flow Server for Apache Mesos 适用于mesos平台的springcloud数据流服务器(DCOS构建)

查看日志启动成功

Spring Cloud Data Flow Server for Apache Mesos 适用于mesos平台的springcloud数据流服务器(DCOS构建)

9.访问springdataflow的web端http://XXXX:AAA/dashboard,亦可通过

spring-cloud-dataflow-shell
来注册app或者stream,此处不再赘述。

10.注册app

Spring Cloud Data Flow Server for Apache Mesos 适用于mesos平台的springcloud数据流服务器(DCOS构建)

11.创建数据流

Spring Cloud Data Flow Server for Apache Mesos 适用于mesos平台的springcloud数据流服务器(DCOS构建)

注意:至少得有一个source一个sink,由于资源有限,processor暂时不加入(ps:跑应用真的很消耗资源)

Spring Cloud Data Flow Server for Apache Mesos 适用于mesos平台的springcloud数据流服务器(DCOS构建)

12.应用数据流

Spring Cloud Data Flow Server for Apache Mesos 适用于mesos平台的springcloud数据流服务器(DCOS构建)

然后就可以取services去查看,一个jar应该会启动一个实例

Spring Cloud Data Flow Server for Apache Mesos 适用于mesos平台的springcloud数据流服务器(DCOS构建)

Spring Cloud Data Flow Server for Apache Mesos 适用于mesos平台的springcloud数据流服务器(DCOS构建)

Spring Cloud Data Flow Server for Apache Mesos 适用于mesos平台的springcloud数据流服务器(DCOS构建)

大功告成哈哈哈哈哈哈哈哈哈

下面附上部分代码

source:

application

import java.util.Date;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.support.MessageBuilder;
@EnableBinding(Source.class)
@SpringBootApplication
public class LoggingSourceApplication {
   @Bean
   @InboundChannelAdapter(
     value = Source.OUTPUT, 
     poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1")
   )
   public MessageSource<Long> timeMessageSource() {
      System.out.println(new Date() +"======================logging-source========================== execued");
       return () -> { 
          System.out.println(new Date() + "*****logging-source****** send");
          return MessageBuilder.withPayload(new Date().getTime()).build();
       };
   }

   public static void main(String[] args) {
      SpringApplication.run(LoggingSourceApplication.class, args);
   }
}
properties

spring.rabbitmq.host=172.16.3.183
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin

# 本机启动测试需要以下配置
spring.cloud.stream.default.contentType=application/json
spring.cloud.stream.bindings.output.destination=source-log
# 默认情况下,Spring Cloud Stream 会在 RabbitMQ 中创建一个临时的队列,程序关闭,
# 对应的连接关闭的时候,该队列也会消失。为此,我们需要一个持久化的队列,并且指定一个分组,用于保证应用服务的缩放。
# 只需要在消费者端的 binding 添加配置项 spring.cloud.stream.bindings.[channelName].group = logistic
# 对应的队列就是持久化
spring.cloud.stream.bindings.output.group=logTest
spring.cloud.stream.bindings.output.binder=rabbitMq1
spring.cloud.stream.binders.rabbitMq1.type=rabbit
spring.cloud.stream.default-binder=rabbitMq1
# rabbitMQ服务器地址
spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.host=172.16.3.183
# rabbitMQ服务器端口
spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.port=5672
spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.username=admin
spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.password=admin
spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.virtual-host=/
pom

<parent>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-parent</artifactId>
   <version>1.4.4.RELEASE</version>
   <relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
   <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
   <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
   <java.version>1.8</java.version>
</properties>

<dependencies>
   <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
   </dependency>

   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
   </dependency>
</dependencies>

<dependencyManagement>
   <dependencies>
      <dependency>
         <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-dependencies</artifactId>
         <version>Camden.SR5</version>
         <type>pom</type>
         <scope>import</scope>
      </dependency>
   </dependencies>
</dependencyManagement>

<build>
   <plugins>
      <plugin>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
   </plugins>
</build>


sink:

application

import java.util.Date;
import java.util.Map;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;

@EnableBinding(Sink.class)
@SpringBootApplication
public class LoggingSinkApplication {

   @MessageEndpoint
   public static class LoggingMessageEndpoint {
      @ServiceActivator(inputChannel = Sink.INPUT)
      public void logIncomingMessages(@Payload String msg, @Headers Map<String, Object> headers) {
         System.out.println(new Date() + "***********logging-sink**************"+ msg);
         headers.entrySet().forEach(e -> System.out.println(e.getKey() + '=' + e.getValue()));
      }
   }
   @StreamListener(Sink.INPUT)
   public void loggerSink(String date) {
       System.out.println("logging-sink Received: " + date);
   }
   @Payload
   public static void main(String[] args) {
      SpringApplication.run(LoggingSinkApplication.class, args);
   }
}

properties

spring.rabbitmq.host=172.16.3.183
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin

#本地测试需要配置
server.port=8090
spring.cloud.stream.default.contentType=application/json
spring.cloud.stream.bindings.input.destination=source-log
# 默认情况下,Spring Cloud Stream 会在 RabbitMQ 中创建一个临时的队列,程序关闭,
# 对应的连接关闭的时候,该队列也会消失。为此,我们需要一个持久化的队列,并且指定一个分组,用于保证应用服务的缩放。
# 只需要在消费者端的 binding 添加配置项 spring.cloud.stream.bindings.[channelName].group = logistic
# 对应的队列就是持久化
spring.cloud.stream.bindings.input.group=logTest
spring.cloud.stream.bindings.input.binder=rabbitMq1
spring.cloud.stream.binders.rabbitMq1.type=rabbit
spring.cloud.stream.default-binder=rabbitMq1
# rabbitMQ服务器地址
spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.host=172.16.3.183
# rabbitMQ服务器端口
spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.port=5672
spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.username=admin
spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.password=admin
spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.virtual-host=/

pom

<parent>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-parent</artifactId>
   <version>1.4.4.RELEASE</version>
   <relativePath /> <!-- lookup parent from repository -->
</parent>

<properties>
   <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
   <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
   <java.version>1.8</java.version>
</properties>

<dependencies>
   <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
   </dependency>

   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
   </dependency>
</dependencies>

<dependencyManagement>
   <dependencies>
      <dependency>
         <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-dependencies</artifactId>
         <version>Camden.SR5</version>
         <type>pom</type>
         <scope>import</scope>
      </dependency>
   </dependencies>
</dependencyManagement>

<build>
   <plugins>
      <plugin>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
   </plugins>
</build>

相关文章:

  • 2022-12-23
  • 2022-12-23
  • 2021-07-19
  • 2021-04-23
  • 2022-12-23
  • 2021-06-05
  • 2022-12-23
  • 2021-10-23
猜你喜欢
  • 2021-11-26
  • 2021-09-03
  • 2022-01-15
  • 2021-12-08
  • 2021-03-30
  • 2021-06-14
相关资源
相似解决方案