Ignite 集成 springboot

面向内存的数据网格计算平台

why Ignite

1.Ignite是一个分布式的基于内存的数据库和缓存平台,可以用于事务分析和作为流式处理的计算平台,可提供PB级的内存速度
2.对数据的处理包括但不限于数据网格、计算网格、流计算,当然也包括数据结构。
3.提供了完整的SQL、DDL和DML的支持,可以使用纯SQL而不用写代码与Ignite进行交互,这意味着只使用SQL就可以创建表和索引,以及插入、更新和查询数据。有这个完整的SQL支持,Ignite就可以作为一种分布式SQL数据库。

目前的业务需要实时的对照人脸库进行人物移动轨迹的分析,需要在毫秒内处理上亿条的数据CRUD,目前的数仓,以及RDBMS是无法完成这项工作的

本文主要涉及使用Ignite来作为数据源的存储读取,以及在和springboot集成当中碰到的一些问题,使用的Ignite版本为2.7.6 。

与计算平台的集成(例如spark)参照官网

Ignite与spark的集成

开始

从我的部署安装过程来看Ignite是采用无主模式的,虽然这一点在官方文档中我并未找到明显的提示.

​ 从配置文件属性看来,Ignite的应用模式分为客户端与服务器两种,如果未显示注明默认是服务器模式,进 行自动探测在同一个网络中的所有Ignite服务。这是部署在测试集群的配置(server模式)

  • server和clinet模式最大区别就在于资源利用,client在未向server进行数据调用时是不占用资源的,而server对资源的占用是启动完以后就一直存在的(包括堆外内存的使用)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">

<bean id="igniteCfg" class="org.apache.ignite.configuration.IgniteConfiguration">

<property name="consistentId" value="A"/>

<!--也可以不用设置默认开启的是服务端模式-->
<property name="clientMode" value="false"/>


<property name="cacheConfiguration">
<list>
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<property name="dataRegionName" value="500MB_Region"/>
<property name="name" value="probd-cache"/>
</bean>
</list>
</property>


<property name="dataStorageConfiguration" ref="dataStorageConfiguration"/>
</bean>


<bean id="dataStorageConfiguration" class="org.apache.ignite.configuration.DataStorageConfiguration">
<!-- Threads that generate dirty pages too fast during ongoing checkpoint will be throttled -->
<property name="writeThrottlingEnabled" value="true"/>

<!--Checkpointing frequency which is a minimal interval when the dirty pages will be written to the Persistent Store.-->
<!-- 检查点频率 -->
<property name="checkpointFrequency" value="180000"/>

<!-- Number of threads for checkpointing.-->
<!-- 检查点线程数 -->
<property name="checkpointThreads" value="4"/>

<!-- 在检查点同步完成后预写日志历史保留数量-->
<!-- Number of checkpoints to be kept in WAL after checkpoint is finished.-->
<property name="maxWalArchiveSize" value="#{64 * 1024 * 1024}"/>



<!-- Redefining the default region's settings -->
<property name="defaultDataRegionConfiguration">
<bean class="org.apache.ignite.configuration.DataRegionConfiguration">
<property name="name" value="Default_Region"/>
<!-- 设置默认内存区最大内存为 1GB. -->
<property name="maxSize" value="#{1L * 1024 * 1024 * 1024}"/>
<!-- 默认内存区开启持久化. -->
<property name="persistenceEnabled" value="true"/>
</bean>
</property>
<property name="dataRegionConfigurations">
<list>
<!-- 自定义内存区并开启持久化-->
<bean class="org.apache.ignite.configuration.DataRegionConfiguration">
<!-- 内存区名. -->
<property name="name" value="500MB_Region"/>
<!-- 100 MB initial size. -->
<property name="initialSize" value="#{100L * 1024 * 1024}"/>
<!-- 500 MB maximum size. -->
<property name="maxSize" value="#{500L * 1024 * 1024}"/>
<!-- 开启持久化. -->
<property name="persistenceEnabled" value="true"/>
</bean>
</list>
</property>

<!-- 设置持久化预写日志模式. -->
<property name="walMode">
<util:constant static-field="org.apache.ignite.configuration.WALMode.FSYNC"/>
</property>

<!-- 持久化文件存储路径. -->
<property name="storagePath" value="/data/probd/ignite/db"/>

<!-- 预写日志存储路径. -->
<property name="walPath" value="/data/probd/ignite/db/wal"/>

<!-- 预写日志解压路径. -->
<property name="walArchivePath" value="/data/probd/ignite/db/wal/archive"/>

</bean>
</beans>
  • 替换配置$IGNITE_HOME/config/default-config.xml

    image-20200120111056554

    • 要注意的是这里有个配置属性 consistentId 这是以后做分布式扩展时候要用到的,包括后面的持久化配置,先配上。
  • 切换JDK1.8以上,强制的

    image-20200120112221306

    启动

1
ignite.sh

image-20200120112445415

可以看到启动成功了,使用脚本看一下集群情况

1
./ignitevisorcmd.bat

1
2
open
选择配置文件 0

image-20200120132013784

1
top 查看集群情况

image-20200120132510515

  • 我同时部署了3个节点的服务器,作为和单节点部署区别是consistentId 不同。

  • 所以可以基于这种嗅探模式很轻易的做到横向扩展。

  • 可以看到已经自动探测到所有的节点信息,下面使用springboot集成ignite作为客户端进行cache的CRUD,这部分也是翻车频率最高的地方。

    springboot版本 2.0.7.RELEASE

image-20200120133243211.png

可以看到和服务端配置不同的地方是注明了clientMode 为false,这是在真正产线部署环境当中应该开启的模式。
现在换成true开启客户端模式做本地测试。

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Ignite配置
*
* @author Tak
*/
@Configuration
@ImportResource(locations = {"classpath:configs/ignite-conf.xml"}) //ignite配置文件路径
public class IgniteConf {

private final IgniteConfiguration igniteCfg;

public IgniteConf(IgniteConfiguration igniteCfg) {
this.igniteCfg = igniteCfg;
}

/**
* Creating Apache Ignite instance bean. A bean will be passed
* to IgniteRepositoryFactoryBean to initialize all Ignite based Spring Data
* repositories and connect to a cluster.
*/
@Bean
public Ignite igniteInstance() {
igniteCfg.setIgniteInstanceName(UUID.randomUUID().toString());
igniteCfg.setConsistentId(UUID.randomUUID().toString());
Ignite ignite = Ignition.start(igniteCfg);

// Defining and creating a new cache to be used by Ignite Spring Data
// repository.
CacheConfiguration cacheCfg = new CacheConfiguration("IgniteFaceCache");
// Setting SQL schema for the cache.
cacheCfg.setIndexedTypes(Long.class, IgniteFace.class);
if (!ignite.cluster().active()) {
ignite.cluster().active(true);
}
ignite.getOrCreateCache(cacheCfg);

CH.info("-----------ignite service is started.----------");
return ignite;
}
}

​ 这份配置做了这几件事

  • 读取xml配置文件中的属性,初始化IgniteConfiguration
  • 新建了一个名为IgniteFaceCache的缓存
  • 当集群模式刚启动之后,这个版本的ignite并不会自动切换至active,所以client连接到集群后将状态设置为active

main函数

image-20200120140119943

​ 开启了@EnableIgniteRepositories 注解,使用JPA操纵cache

IgniteFaceDao

image-20200120140300402

​ 这里要注意引用ignite-spring-data是无法正常使用的,需要使用

1
2
3
4
5
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-spring-data_2.0</artifactId>
<version>${ignite.version}</version>
</dependency>

否则会出现编译不通过的情况

image-20200120140725386

编写测试程序,插入1亿条数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@SpringBootTest(classes = {DaoTests.class})
@RunWith(SpringRunner.class)
@ComponentScan(
value = {
"com.pronetway.bigdata.videosecurity.dao",
"com.pronetway.bigdata.videosecurity.entity",
"tk.mybatis.mapper.common"
},
basePackageClasses = {AppExpandedEnvironment.class, PsqlConf.class
, IgniteConf.class
}
)
@EnableAutoConfiguration
@tk.mybatis.spring.annotation.MapperScan(basePackages = "com.pronetway.bigdata.videosecurity.entity.mapper")
@MapperScan(basePackages = "com.pronetway.bigdata.videosecurity.entity.mapper.ignite")
@EnableIgniteRepositories(basePackageClasses = {IgniteFaceDao.class})
public class DaoTests implements CommandLineRunner {

@Inject
private IgniteFaceDao igniteFaceDao;


@Before
public void setUp() throws Exception{
if (!ignite.cluster().active()) {
ignite.cluster().active(true);
}
Collection<ClusterNode> nodes = ignite.cluster().forServers().nodes();
ignite.cluster().setBaselineTopology(nodes);
}
@Test
public void testInsertBigData() throws Exception {

long cnt = 100000000;

long upd = 123123123L;

Map<Long, IgniteFace> faceMap = new HashMap<>(1000000);
for (long i = 1; i < cnt; i++) {

faceMap.put( i, new IgniteFace( i, 0, 0, upd, 33));
if (i % 1000000 == 0) {
igniteFaceDao.save(faceMap);
faceMap.clear();
}
}
System.out.println("all task done");

}

@Override
public void run(String... args) throws Exception {
// 如果集群未启动则启动集群
if (!ignite.cluster().active()) {
ignite.cluster().active(true);
}
}
}

趁着导数据过程切换到visor窗口 输入node

image-20200120145951637

可以看到刚才本地启动的节点也已加入至集群(Client)

切换到visor窗口,输入cache

image-20200120151454656

可以看到一亿条数据已经灌入完毕

编写controller查询数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* @author Tak
*/
@RestController
public class TestController {

private final IgniteFaceDao igniteFaceDao;

public TestController(IgniteFaceDao igniteFaceDao) {
this.igniteFaceDao = igniteFaceDao;
}


@RequestMapping("test/{id}")
public String testDoor(@PathVariable long id) {
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
IgniteFace face = igniteFaceDao.findById(id).orElseThrow(() -> new RuntimeException("no data"));
stopwatch.stop();
return face.toString().concat("time:" + stopwatch);
}
}

测试请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Resource
private TestController testController;
private MockMvc mockMvc;

@Before
public void setup() {
mockMvc = MockMvcBuilders.standaloneSetup(testController).build();
}


@Test
public void queryController() throws Exception {
ThreadLocalRandom longs = ThreadLocalRandom.current();
// 测试5组请求数据
for (int i = 0; i < 5; i++) {
// 每组请求数据测试三次
long id = longs.nextLong(1, 100000000);
for (int j = 0; j < 3; j++) {
MockHttpServletRequestBuilder params1 = MockMvcRequestBuilders.get("/test/".concat(String.valueOf(id)));
MvcResult mvcResult = mockMvc.perform(params1)
.andExpect(MockMvcResultMatchers.status().isOk())
.andDo(MockMvcResultHandlers.print())
.andReturn();
String contentAsString = mvcResult.getResponse().getContentAsString();
System.out.println("req id:" + id);
System.out.println(contentAsString);
}
System.out.println("=================");
System.out.println("=================");
System.out.println("=================");
}
}

测试结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
req id:17461275
IgniteFace[id=17461275, sex=0, withGlass=0, updateTime=123123123, age=33]time:10.05 ms

req id:17461275
IgniteFace[id=17461275, sex=0, withGlass=0, updateTime=123123123, age=33]time:1.835 ms

req id:17461275
IgniteFace[id=17461275, sex=0, withGlass=0, updateTime=123123123, age=33]time:1.852 ms
=================
=================
=================

req id:98412594
IgniteFace[id=98412594, sex=0, withGlass=0, updateTime=123123123, age=33]time:2.373 ms

req id:98412594
IgniteFace[id=98412594, sex=0, withGlass=0, updateTime=123123123, age=33]time:1.768 ms

req id:98412594
IgniteFace[id=98412594, sex=0, withGlass=0, updateTime=123123123, age=33]time:1.973 ms
=================
=================
=================

req id:26372191
IgniteFace[id=26372191, sex=0, withGlass=0, updateTime=123123123, age=33]time:90.82 ms

req id:26372191
IgniteFace[id=26372191, sex=0, withGlass=0, updateTime=123123123, age=33]time:1.736 ms

req id:26372191
IgniteFace[id=26372191, sex=0, withGlass=0, updateTime=123123123, age=33]time:1.761 ms
=================
=================
=================

req id:44665991
IgniteFace[id=44665991, sex=0, withGlass=0, updateTime=123123123, age=33]time:21.98 ms


req id:44665991
IgniteFace[id=44665991, sex=0, withGlass=0, updateTime=123123123, age=33]time:1.863 ms


req id:44665991
IgniteFace[id=44665991, sex=0, withGlass=0, updateTime=123123123, age=33]time:1.648 ms

=================
=================
=================

req id:95049787
IgniteFace[id=95049787, sex=0, withGlass=0, updateTime=123123123, age=33]time:1.905 ms


req id:95049787
IgniteFace[id=95049787, sex=0, withGlass=0, updateTime=123123123, age=33]time:1.867 ms


req id:95049787
IgniteFace[id=95049787, sex=0, withGlass=0, updateTime=123123123, age=33]time:1.773 ms
  • 第一次的查询偏慢,看看有没有后期调优的入口,其实从启动客户端的日志上就能看到,官方提供了参数调优指南

image-20200120163334601

补上DTO的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import org.apache.ignite.cache.affinity.AffinityKey;
import org.apache.ignite.cache.query.annotations.QuerySqlField;

import java.io.Serializable;
import java.util.StringJoiner;


/**
* @author Tak
*/
public class IgniteFace implements Serializable {
private static final long serialVersionUID = 1L;



@QuerySqlField(index = true)
private Long id;

@QuerySqlField
private Integer sex;

@QuerySqlField
private Integer withGlass;

@QuerySqlField
private Long updateTime;

@QuerySqlField
private Integer age;
private transient AffinityKey<Long> key;

public IgniteFace() {
}

public IgniteFace(Long id, Integer sex, Integer withGlass, Long updateTime, Integer age) {
this.id = id;
this.sex = sex;
this.withGlass = withGlass;
this.updateTime = updateTime;
this.age = age;
}

public long getId() {
return id;
}

public void setId(long id) {
this.id = id;
}

public int getSex() {
return sex;
}

public void setSex(int sex) {
this.sex = sex;
}

public int getWithGlass() {
return withGlass;
}

public void setWithGlass(int withGlass) {
this.withGlass = withGlass;
}

public long getUpdateTime() {
return updateTime;
}

public void setUpdateTime(long updateTime) {
this.updateTime = updateTime;
}

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}

@Override
public String toString() {
return new StringJoiner(", ", IgniteFace.class.getSimpleName() + "[", "]")
.add("id=" + id)
.add("sex=" + sex)
.add("withGlass=" + withGlass)
.add("updateTime=" + updateTime)
.add("age=" + age)
.toString();
}
}

参考过的网站

spring集成Ignite

Ignite官方文档