Co2y's Blog

Cassandra本地化

关键是获取token range

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
Cluster cluster = Cluster.builder().addContactPoint("192.168.100.1").build();
Metadata metadata = cluster.getMetadata();
Set<Host> allHosts = metadata.getAllHosts();
Session tableSession = cluster.connect(keyspace);
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
int id = 1;
for (Host host : allHosts) {
if (host.hashCode() == -1062697133) {
Set<TokenRange> tokenRanges = metadata.getTokenRanges(keyspace, host);
Set<TokenRange> unwrap = new HashSet<TokenRange>();
for (TokenRange tokenRange : tokenRanges) {
unwrap.addAll(tokenRange.unwrap());
}
for (TokenRange tokenRange : unwrap) {
executor.execute(new newthread(tableSession, tokenRange, id));
id++;
}
break;
} else {
continue;
}
}
executor.shutdown();
while (true) {
if (executor.isTerminated()) {
tableSession.close();
cluster.close();
break;
}
Thread.sleep(1000);
}
1
2
3
4
5
6
7
8
9
Statement stmt = QueryBuilder.select().all().from("xxxx").where(QueryBuilder.gt(QueryBuilder.token("num"), tokenRange.getStart().getValue())).and(QueryBuilder.lte(QueryBuilder.token("num"), tokenRange.getEnd().getValue()));
stmt.setFetchSize(1000);
ResultSet rs = tableSession.execute(stmt);
Iterator<Row> iterator = rs.iterator();
List<String> contentList = new ArrayList<String>();
while (iterator.hasNext()) {
Row row = iterator.next();
contentList.add(row.toString());
}