【问题标题】:Write behind with Apache Ignite in Spring boot application在 Spring Boot 应用程序中使用 Apache Ignite 编写后面
【发布时间】:2020-07-09 01:28:46
【问题描述】:

我想在我的带有 mysql 数据库的 Spring boot 应用程序中使用 Apache Ignite 的 write behind 策略。 我想通过异步写入将缓存数据写入mysql数据库。我是 Apache Ignite 的新手,我指的是来自 https://apacheignite.readme.io/docs/ 的文档。

下面是我使用 Apache Ignite 的 write behind 策略的 Spring boot 配置。

    @Bean
    public Ignite igniteInstance() {
        IgniteConfiguration cfg = new IgniteConfiguration();
        cfg.setIgniteInstanceName("ignite-1");
        cfg.setPeerClassLoadingEnabled(true);

        CacheConfiguration<Long, Contact> ccfg2 = new CacheConfiguration<>("ContactCache");
        ccfg2.setIndexedTypes(Long.class, Contact.class);
        ccfg2.setWriteBehindEnabled(true);
        ccfg2.setWriteBehindFlushFrequency(1000);
        ccfg2.setExpiryPolicyFactory(TouchedExpiryPolicy.factoryOf(Duration.ONE_MINUTE));
        ccfg2.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC).setBackups(1);
        ccfg2.setWriteBehindFlushSize(0);
        ccfg2.setWriteBehindFlushThreadCount(1);
        ccfg2.setWriteBehindBatchSize(1);
        //ccfg2.setReadThrough(true);
        //ccfg2.setWriteThrough(true);

        // Memory Configuration
        DataStorageConfiguration storageCfg = new DataStorageConfiguration();
        DataRegionConfiguration defaultDataRegionCfg = storageCfg.getDefaultDataRegionConfiguration();
        defaultDataRegionCfg.setPersistenceEnabled(false);   // Only Memory
        defaultDataRegionCfg.setMaxSize(1024 * 1024 * 256);  // 256MB
        defaultDataRegionCfg.setMetricsEnabled(true);
        cfg.setDataStorageConfiguration(storageCfg);


        CacheJdbcPojoStoreFactory<Long, Contact> f2 = new CacheJdbcPojoStoreFactory<>();
        f2.setDataSource(datasource);
        f2.setDialect(new MySQLDialect());
        JdbcType jdbcContactType = new JdbcType();
        jdbcContactType.setCacheName("ContactCache");
        jdbcContactType.setKeyType(Long.class);
        jdbcContactType.setValueType(Contact.class);
        jdbcContactType.setDatabaseTable("contact");
        jdbcContactType.setDatabaseSchema("demo");
        jdbcContactType.setKeyFields(new JdbcTypeField(Types.INTEGER, "id", Long.class, "id"));
        jdbcContactType.setValueFields(new JdbcTypeField(Types.VARCHAR, "contact_type", ContactType.class, "type"),
                new JdbcTypeField(Types.VARCHAR, "location", String.class, "location"),
                new JdbcTypeField(Types.INTEGER, "person_id", Long.class, "personId"));
        f2.setTypes(jdbcContactType);
        ccfg2.setCacheStoreFactory(f2);

         TcpCommunicationSpi spi = new  TcpCommunicationSpi();
         spi.setMessageQueueLimit(1000);

        CacheConfiguration<Long, Person> ccfg = new CacheConfiguration<>("PersonCache");
        ccfg.setIndexedTypes(Long.class, Person.class);
        ccfg.setWriteBehindEnabled(true);
        ccfg.setWriteBehindFlushFrequency(2000);
        ccfg.setExpiryPolicyFactory(TouchedExpiryPolicy.factoryOf(Duration.ONE_MINUTE));
        ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_ASYNC);
        ccfg.setWriteBehindFlushSize(0);
        ccfg.setWriteBehindFlushThreadCount(10);
        ccfg.setWriteBehindBatchSize(10);
        ccfg.setBackups(1);
        ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
//      ccfg.setDataRegionName(storageCfg);
//      ccfg.setReadThrough(true);
//      ccfg.setWriteThrough(true);
        CacheJdbcPojoStoreFactory<Long, Person> f = new CacheJdbcPojoStoreFactory<>();
        f.setDataSource(datasource);
        f.setDialect(new MySQLDialect());
        JdbcType jdbcType = new JdbcType();
        jdbcType.setCacheName("PersonCache");
        jdbcType.setKeyType(Long.class);
        jdbcType.setValueType(Person.class);
        jdbcType.setDatabaseTable("person");
        jdbcType.setDatabaseSchema("demo");
        jdbcType.setKeyFields(new JdbcTypeField(Types.INTEGER, "id", Long.class, "id"));
        jdbcType.setValueFields(new JdbcTypeField(Types.VARCHAR, "first_name", String.class, "firstName"),
                new JdbcTypeField(Types.VARCHAR, "last_name", String.class, "lastName"),
                new JdbcTypeField(Types.VARCHAR, "gender", Gender.class, "gender"),
                new JdbcTypeField(Types.VARCHAR, "country", String.class, "country"),
                new JdbcTypeField(Types.VARCHAR, "city", String.class, "city"),
                new JdbcTypeField(Types.VARCHAR, "address", String.class, "address"),
                new JdbcTypeField(Types.VARCHAR, "birth_date", String.class, "birthDate"));
        f.setTypes(jdbcType);
        ccfg.setCacheStoreFactory(f);

        Ignite ignite = Ignition.start(cfg);
        ignite.cluster().active(true);
        return ignite;
    }

使用此配置,如果我将 write through 设置为 true,则数据将成功插入数据库。 但是 write behind 不适用于此配置。我已经参考https://apacheignite.readme.io/v2.7/docs/3rd-party-store 页面进行后写配置,但我认为它不起作用。如果我是 Ignite 的新手,如果我错过了什么,请告诉我。

我的仓库代码 -

@RepositoryConfig(cacheName = "PersonCache")
public interface PersonRepository extends IgniteRepository<Person, Long> {

    List<Person> findByFirstNameAndLastName(String firstName, String lastName);

    @Query("SELECT c.* FROM Person p JOIN \"ContactCache\".Contact c ON p.id=c.personId WHERE p.firstName=? and p.lastName=?")
    List<Contact> selectContacts(String firstName, String lastName);

    @Query("SELECT p.id, p.firstName, p.lastName, c.id, c.type, c.location FROM Person p JOIN \"ContactCache\".Contact c ON p.id=c.personId WHERE p.firstName=? and p.lastName=?")
    List<List<?>> selectContacts2(String firstName, String lastName);
}

模型类 -

@QueryGroupIndex.List(
        @QueryGroupIndex(name="idx1")
) 
public class Person implements Serializable {

    private static final long serialVersionUID = -1271194616130404625L;
    private static final AtomicLong ID_GEN = new AtomicLong();

    @QuerySqlField(index = true)
    private Long id;
    @QuerySqlField(index = true)
    @QuerySqlField.Group(name = "idx1", order = 0) 
    private String firstName;
    @QuerySqlField(index = true)
    @QuerySqlField.Group(name = "idx1", order = 1) 
    private String lastName;
    private Gender gender;
    private Date birthDate;
    private String country;
    private String city;
    private String address;
    @Transient
    private List<Contact> contacts = new ArrayList<>();

    public void init() {
        this.id = ID_GEN.incrementAndGet();
    }

控制器类-

@RestController
@RequestMapping("/person")
public class PersonController {

    private static final Logger LOGGER = LoggerFactory.getLogger(PersonController.class);

    @Autowired
    PersonRepository repository;

    @PostMapping
    public Person add(@RequestBody Person person) {
        person.init();
        return repository.save(person.getId(), person);
    }

    @PutMapping
    public Person update(@RequestBody Person person) {
        return repository.save(person.getId(), person);
    }

    @DeleteMapping("/{id}")
    public void delete(Long id) {
        repository.delete(id);
    }

    @GetMapping("/{id}")
    public Person findById(@PathVariable("id") Long id) {
        return repository.findOne(id);
    }

如果有人想访问完整的代码库,请访问 -https://github.com/gotidhavalh/ignite-rest-service

【问题讨论】:

  • “不工作”是什么意思?如果您启用后写,条目永远不会被插入?请注意,您必须同时启用通过 写入才能使其工作。
  • 感谢@alamar 的回复。通过“不工作”,我的意思是在一定时间后条目不会插入/更新到数据库中。我设置了这个时间1分钟。我尝试过启用直写和后写,但使用此配置不会在数据库中更新条目。
  • Reproducer 不会以以下神秘异常开始:org.springframework.data.mapping.PropertyReferenceException: No property deleteAll found for type Flight! 无论哪种方式,您肯定需要启用 writeThrough。
  • 好的。谢谢你。

标签: mysql spring-boot spring-data-jpa ignite


【解决方案1】:

您需要同时启用两者

    ccfg.setWriteThrough(true);

    ccfg.setWriteBehindEnabled(true);

为了让 Write Behind 工作。还请务必检查其刷新超时(默认为 5 秒)。

【讨论】:

    猜你喜欢
    • 2021-04-22
    • 2015-01-20
    • 1970-01-01
    • 2016-02-01
    • 2019-12-08
    • 2016-11-20
    • 1970-01-01
    • 1970-01-01
    • 2018-10-01
    相关资源
    最近更新 更多