Be Better

厚积薄发


  • 首页

  • 标签

  • 分类

  • 归档

  • Sitemap

  • 搜索

redis排行榜

发表于 2018-02-03 | 分类于 数据库 | 阅读次数:

设计思路

因为看了一段时间的redis,准备动手做一个小demo,做一个排行榜,正好加在之前的未完成的新闻门户里面。关于排行榜他有一些跟排行榜本身相关的要求比如:

排行精确性

如果一个排行榜的结果关系到用户的权益问题,这个时候一个排行榜的精确性就需要非常高,比如一个运营同学进行了根据微博转发数量的营销活动,这个时候微博转发数量的排行榜就需要非常精确,否则会影响用户权益的分发。

排行榜实时性

游戏和社交互动的结合是目前的趋势,对于热门游戏的排行是用户的关注重点,在这部分用户中对于排行的实时性有很高的要求,如果一个用户升级了自己的装备和能力,而自己的排名一直没有更新,那这个用户一定要非常伤心抛弃这个游戏了。所以通过离线计算等平台来构建一个非实时的排行榜系统就不太适合这样的模型。

海量数据排行

海量数据是目前的一个趋势,比如对于淘宝全网商品的一个排行,这个榜单将会是一个亿级别的,所以我们设计的榜单也需要具备弹性伸缩能力,同时在对海量数据进行排行的时候拥有一定的实时性。

实现方法

目的是要实现一个热点新闻排行榜的话,毫无疑问,使用的是redis内置的zset这种数据结构,他可以根据score自动产生rank比较方便。我们将评论或者点赞数超过200的认为是热门文章

由于文章是从别的地方爬过来的,所以只有评论数没有点赞数,设置初始化分数为:

score = 发布时间毫秒数 + 432 * 评论数

而问题就在排行榜更新的频率,更新过快,缓存效果不好,会产生类似重建热key的问题(下一篇文章要讲一下),但是频率过慢又不能达到实时性,所以正如之前所说的,要根据排行榜自身的要求制定一个适合的更新策略:

针对自身的这个项目需求,我想实现的是一个热点新闻排行榜,他的时效性要求并不是很高,所以通过分析网易新闻的爬取量,对爬到的每个新闻建立一个news:id的hash进行初始化,类似关系型数据库中的一条字段,并设置一周后过期自动删除,排行榜肯定是用zset的,但是为了不刷新过快,再建立一个time:的zset缓存最近一个星期的文章,设置一周过期,每周一次定时维护time:,从time: 删除时间超过一个星期的文章,并重置score:,由于爬虫每隔6小时更新一次,且新闻量相对较小,所以对time:的频繁读写是可以容忍的,再维护一个score:的zset简历news和score的映射,所以总的来说就是

  1. 爬取新闻,建立新的hash(news:id),设置过期时间为一周,并加入zset(time:)
  2. 每周执行一次更新,删除zset:中过期的任务,对未过期的任务分数进行更新。
score: zset
news:id 分数
time: zset
news:id 时间
news:id hash
voted 投票数
title xxx
url xxx

这里介绍一个在网上看到的实时排行榜的设计策略,其思路类似于维护一个小顶堆:

  1. 第一次访问的时候,查数据库,查整个表查出topN(使用sql排序),丢给redis(使用sorted set数据类型)。

  2. 排序在redis,redis自动排序。以后的用户访问:均访问redis。

  3. 只要每次积分变化判断的时候拿topN的最后一个判别,大于最后一名,则整个user丢进redis排序。
    效率性能再优化:用户积分变动的时候,(守护线程)服务器预存一下变化的数量。。到一定量再通知。

  4. 再往下去设定一个小距离为阈值。比如现在第50名的积分是100,那80分一下的应该就没必要扔给redis了吧?

注意:这个排行榜的用户是会不断增加的,比如1亿用户,如果刚开始只有前50,后5千万人的积分大于第50名,那么就会往redis加入这个用户的信息。(虽然看起来要存很多,其实一亿用户怎么存也就1G左右的内存,简单暴力优雅方案了)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Autowired
private RedisDao redisDao;
private final int ONE_WEEK_IN_SECONDS = 7 * 86400;
@Scheduled(cron = "0 0 0 1/7 * ?")
public void updataRank() {
redisDao.zRemRangeByRank("score:", 0, -1);
long cutOff = System.currentTimeMillis() / 1000 - ONE_WEEK_IN_SECONDS;
Set<TypedTuple<Object>> set = redisDao.zRangeWithScores("time:", 0, -1);
for (TypedTuple<Object> o: set) {
//如果过期直接删除,否则计算结果
BigDecimal db = new BigDecimal(o.getScore().toString());db.toPlainString();
if (Long.valueOf(db.toPlainString()) < cutOff) {
redisDao.zrem("time:", o.getValue());
} else {
redisDao.zadd("score:", o.getValue(),
Double.valueOf(o.getScore().toString()) +
432 * Double.valueOf(redisDao.hget(o.getValue().toString(), "voted").toString()));
}
}
}

成品效果如图(忽略我这个丑陋的前端):

新闻爬虫2.0

由于这次修改也涉及到了之前爬取数据的爬虫,索性就把爬虫也一并进行了修改,对整个爬虫进行了重构,使用多线程对爬虫进行优化,具体步骤如下:

将爬虫分为两个部分,使用生产者和消费者模式,将redis作为任务队列,生产者爬虫爬取新闻url,消费者爬虫根据新闻url爬取具体信息。使用2个redis集合存储已爬新闻和未爬新闻,作为简单去重。

完整代码请参考:

新闻门户代码:https://github.com/MoriatyC/OmegaNews

爬虫代码:https://github.com/MoriatyC/nethard

redis数据安全与性能保障

发表于 2018-01-29 | 分类于 数据库 | 阅读次数:

一.持久化

1. 快照: 将存在于某一时刻的所有数据都写入硬盘里面

方法:
  1. 客户端通过向redis发送bgsave命令(创建子进程)
  2. 客户端通过向redis发送save命令,但是会阻塞其他命令,所以只有内存不够,或者不怕阻塞的时候才可以用。但是不要创建子进程,不会导致redis停顿,并且由于没有子进程抢资源所以比bgsave快。
  3. 设置了save选项:比如 save 60 10000,表示从最近一次创建快照之后开始算起,当有60s内有10000次写入的时候就会触发bgsave命令,可以有多个save配置,任意一个满足即可。
  4. 通过shutdown接收到关闭请求时,或者接收到标准的term信号,执行save命令
  5. 当一个redis服务器连接另一个redis服务器,想对方发送sync时,若主服务器没执行bgsave,或者并非刚刚执行完,那么主服务器就会执行bgsave。
缺点:当redis、系统或者硬件中的一个发生崩溃,将丢失最近一次创建快照后的数据。

TIPS: 将开发环境尽可能的模拟生产环境以得到正确的快照生成速率配置。

2. AOF:在执行写命令时,将被执行的写命令复制到硬盘里面

使用appendonlyyes配置选项打开,下图是appendfsync配置选项。

选项目 同步频率
always 每个写操作都要同步写入,严重降低redis速度损耗硬盘寿命
everysec 每秒执行一次,将多个写入同步,墙裂推荐
no 让os决定,不稳定,不知道会丢失多少数据

自动配置aof重写:

  • auto-aof-rewrite-percentage 100
  • auto-aof-rrewrite-min-size 64
    当启用aof持久化之后,当aof文件体积大于64mb并且体积比上一次大了100%,就会执行bgrewriteaof命令。

缺点:1.aof文件过大,2. 文件过大导致还原事件过长。
但是可以对其进行重写压缩。

二. 复制

就像之前所说当一个从服务器连接一个主服务器的时候,主服务器会创建一个快照文件并将其发送到从服务器。

在配置中包含slaveof host port选项指定主服务器,启动时候会先执行aof或者快照文件。

也可以通过发送flaveof no one命令来终止复制操作,通过slaveof host port命令来开始复制一个主服务器,会直接执行下面的连接操作。

步骤 主服务器操作 从服务器操作
1 (等待命令) 连接主服务器,发送sync命令
2 开始执行bgsave,并使用缓冲区记录bgsave之后执行的所有写命令 根据配置选项决定使用现有数据处理客户端请求还是返回错误
3 Bgsave执行完毕,向从服务器发快照,并在发送期间继续用缓冲区记录写命令 丢弃所有旧数据,载入快照文件
4 快照发送完毕,向从服务器发送缓冲区里的写命令 完成快照解释,开始接受命令
5 缓冲区存储的写命令发送完毕:从现在起每执行一个写命令都发给从服务器 执行主服务器发来的所有存储在缓冲区里的写;并接受执行主服务器发来的写命令

三. 处理故障系统

验证快照和aof文件

  • redis-check-aof
  • redis-check-dump

检查aof和快照文件的状态,在有需要的情况下对aof文件进行修复。

更换新的故障主服务器

假设A为主服务器,B为从服务器,当机器A发生故障的时候,更换服务器的步骤如下:
首先向机器B发送一个save命令,将这个快照文件发送给机器C,在C上启动Redis,让B成为C的从服务器。

将从服务器升级为主服务器

将从服务器升级为主服务器,为升级后的主服务器创建从服务器。

redis事务

四. 事务

multi: 标记一个事务块的开始。

事务块内的多条命令会按照先后顺序被放进一个队列当中,最后由 EXEC 命令原子性(atomic)地执行。

exec: 执行所有事务块内的命令。

假如某个(或某些) key 正处于 WATCH 命令的监视之下,且事务块中有和这个(或这些) key 相关的命令,那么 EXEC 命令只在这个(或这些) key 没有被其他命令所改动的情况下执行并生效,否则该事务被打断(abort)。

redis的事务包裹在multi命令和exec命令之中,在jedis中通过如下实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
        public class RedisJava extends Thread{
static Response<String> ret;
Jedis conn = new Jedis("localhost");
@Override
public void run() {

Transaction t = conn.multi();
t.incr("notrans:");
Response<String> result1 = t.get("notrans:");
try {
Thread.sleep(1L);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
t.incrBy("notrans:", -1);
t.exec();
String foolbar = result1.get();
System.out.println(foolbar);
}


public static void main(String[] args) {
Jedis conn = new Jedis("localhost");
Thread t1 = new RedisJava();
Thread t2 = new RedisJava();
Thread t3 = new RedisJava();
t1.start();
t2.start();
t3.start();


}
}

  • wathc:
    监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。
  • unwatch:
    取消 WATCH 命令对所有 key 的监视。
    如果在执行 WATCH 命令之后, EXEC 命令或 DISCARD 命令先被执行了的话,那么就不需要再执行 UNWATCH 了。
    的监视,因此这两个命令执行之后,就没有必要执行 UNWATCH 了。
  • discard :取消事务,放弃执行事务块内的所有命令。取消watch,清空任务队列。
    如果正在使用 WATCH 命令监视某个(或某些) key,那么取消所有监视,等同于执行命令 UNWATCH 。

一个简单的商品买卖demo如下:

key type
inventory:id set
market zset
user:id hash
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public boolean listItem(
Jedis conn, String itemId, String sellerId, double price) {

String inventory = "inventory:" + sellerId;
String item = itemId + '.' + sellerId;
long end = System.currentTimeMillis() + 5000;

while (System.currentTimeMillis() < end) {
conn.watch(inventory);
if (!conn.sismember(inventory, itemId)){
conn.unwatch();
return false;
}

Transaction trans = conn.multi();
trans.zadd("market:", price, item);
trans.srem(inventory, itemId);
List<Object> results = trans.exec();
// null response indicates that the transaction was aborted due to
// the watched key changing.
if (results == null){
continue;
}
return true;
}
return false;
}
public boolean purchaseItem(
Jedis conn, String buyerId, String itemId, String sellerId, double lprice) {

String buyer = "users:" + buyerId;
String seller = "users:" + sellerId;
String item = itemId + '.' + sellerId;
String inventory = "inventory:" + buyerId;
long end = System.currentTimeMillis() + 10000;

while (System.currentTimeMillis() < end){
conn.watch("market:", buyer);

double price = conn.zscore("market:", item);
double funds = Double.parseDouble(conn.hget(buyer, "funds"));
if (price != lprice || price > funds){
conn.unwatch();
return false;
}

Transaction trans = conn.multi();
trans.hincrBy(seller, "funds", (int)price);
trans.hincrBy(buyer, "funds", (int)-price);
trans.sadd(inventory, itemId);
trans.zrem("market:", item);
List<Object> results = trans.exec();
// null response indicates that the transaction was aborted due to
// the watched key changing.
if (results == null){
continue;
}
return true;
}

总结:相比于一般关系型数据库的悲观锁,redis的事务是典型的乐观锁,没有对事务进行封锁,以避免客户端运行过慢造成长时间的阻塞

非事务型流水线

使用流水线,减少通信次数提高性能,以jedis为例,对比使用和没使用流水线的函数方法调用次数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public void updateTokenPipeline(Jedis conn, String token, String user, String item) {
long timestamp = System.currentTimeMillis() / 1000;
Pipeline pipe = conn.pipelined();
pipe.multi();
pipe.hset("login:", token, user);
pipe.zadd("recent:", timestamp, token);
if (item != null){
pipe.zadd("viewed:" + token, timestamp, item);
pipe.zremrangeByRank("viewed:" + token, 0, -26);
pipe.zincrby("viewed:", -1, item);
}
pipe.exec();
}

//对比没有使用流水线的方法
public void updateToken(Jedis conn, String token, String user, String item) {
long timestamp = System.currentTimeMillis() / 1000;
conn.hset("login:", token, user);
conn.zadd("recent:", timestamp, token);
if (item != null) {
conn.zadd("viewed:" + token, timestamp, item);
conn.zremrangeByRank("viewed:" + token, 0, -26);
conn.zincrby("viewed:", -1, item);
}
}
//测试函数如下
public void benchmarkUpdateToken(Jedis conn, int duration) {
try{
@SuppressWarnings("rawtypes")
Class[] args = new Class[]{
Jedis.class, String.class, String.class, String.class};
Method[] methods = new Method[]{
this.getClass().getDeclaredMethod("updateToken", args),
this.getClass().getDeclaredMethod("updateTokenPipeline", args),
};
for (Method method : methods){
int count = 0;
long start = System.currentTimeMillis();
long end = start + (duration * 1000);
while (System.currentTimeMillis() < end){
count++;
method.invoke(this, conn, "token", "user", "item");
}
long delta = System.currentTimeMillis() - start;
System.out.println(
method.getName() + ' ' +
count + ' ' +
(delta / 1000) + ' ' +
(count / (delta / 1000)));
}
}catch(Exception e){
throw new RuntimeException(e);
}
}

运行结果如图所示,在本地运行性能提升大概17.8倍。

tips:可以使用redis-benchmark工具进行性能测试。

五. References

《Redis实战》

线程池(二)

发表于 2018-01-18 | 分类于 并发 | 阅读次数:

性能问题

饥饿死锁

如果线程池中的任务依赖于之后提交的子任务,当线程池不够大的时候,很容易造成饥饿死锁。所以最好在线程池中加入的是同类型的独立任务。

运行时间较长的任务

如果线程运行时间较长也会影响任务的相应性,同样造成不好的体验,所以api有很多方法都带有一个限时版本。

线程池大小

线程池的大小需要分析计算环境,资源预算和任务特性。

一般来说在知道了系统中有多少个cpu和内存的基础下,任务类型是最为重要的。

  • 对于计算密集型的任务线程池大小为cpu数+1,以实现尽可能的满载利用率
  • 对于i/o密集型,由于线程不会一直执行,所以规模更大。这里给出一个《Java并发变成实战》这本书提出的一个公式

最佳线程数目 = (线程等待时间/线程CPU时间之比 + 1) CPU数目cpu利用率

即等待时间越长,需要更多的线程。
当我们不需要一个那么精准的线程数目时,也可以用这个公式

最佳线程数目 = 2N+1(N为CPU数目)

是否使用线程池就一定比使用单线程高效呢?

答案是否定的,比如Redis就是单线程的,但它却非常高效,基本操作都能达到十万量级/s。从线程这个角度来看,部分原因在于:

  • 多线程带来线程上下文切换开销,单线程就没有这种开销
  • 锁
  • 当然“Redis很快”更本质的原因在于:Redis基本都是内存操作,这种情况下单线程可以很高效地利用CPU。而多线程适用场景一般是:存在相当比例的IO和网络操作。

扩展线程池

ThreadPoolExecutor提供了几个可以在子类化中该学的方法:

  • beforeExecute
  • afterExecute
  • terminated

如果beforeExecute抛出一个RuntimeException,那么任务将不被执行,并且afterExecute也不会被调用。

但是无论人物从run中正常返回还是抛出一个异常返回,afterExecute都会被调用,如果任务在完成后带有一个Error,那么久不会调用。

所有任务都已经完成并且所有工作者线程也已经关闭后,terminated会被调用。

给出一个demo,他通过这一些列方法来统计任务执行并添加日志。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class TimingThreadPool extends ThreadPoolExecutor {

public TimingThreadPool() {
super(1, 1, 0L, TimeUnit.SECONDS, null);
}

private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
private final Logger log = Logger.getLogger("TimingThreadPool");
private final AtomicLong numTasks = new AtomicLong();
private final AtomicLong totalTime = new AtomicLong();

protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
log.fine(String.format("Thread %s: start %s", t, r));
startTime.set(System.nanoTime());
}

protected void afterExecute(Runnable r, Throwable t) {
try {
long endTime = System.nanoTime();
long taskTime = endTime - startTime.get();
numTasks.incrementAndGet();
totalTime.addAndGet(taskTime);
log.fine(String.format("Thread %s: end %s, time=%dns",
t, r, taskTime));
} finally {
super.afterExecute(r, t);
}
}

protected void terminated() {
try {
log.info(String.format("Terminated: avg time=%dns",
totalTime.get() / numTasks.get()));
} finally {
super.terminated();
}
}
}

引用

http://ifeve.com/how-to-calculate-threadpool-size/
《Java并发编程实战》

线程池(一)

发表于 2018-01-17 | 分类于 并发 | 阅读次数:

概述

这几天准备深入学习有关并发的知识,所以先简单复习了一下JDK自带的并发包,其中首先比较重要的一个就是线程池了。

为什么不无限的创造线程?主要基于以下几个原因:

  1. 线程生命周期的开销非常高
  2. 资源消耗
  3. 稳定性

所谓物极必反,线程的创建和销毁和需要一定的时间,如果所创建的线程工作时间还不如创建销毁的时间长那是得不偿失的,并且当线程创建过多也会对内存造成一定的负担甚至溢出,并且对GC也是极大的消耗,由于存在一定数额的活跃线程也提高了响应性。

线程池

根据《阿里巴巴Java开发手册》中对线程创建的要求

  1. 【强制】线程资源必须通过线程池提供,不允许在应用中自行显式创建线程

由此可见,在正式生产环境中,线程池是唯一的创建线程的方法。而JDK对线程池也有强大的支持。

根据《手册》中的另一点要求

  1. 【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样
    的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
    说明: Executors 返回的线程池对象的弊端如下:
    1) FixedThreadPool 和 SingleThreadPool:
    允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
    2) CachedThreadPool 和 ScheduledThreadPool:
    允许的创建线程数量为 Integer.MAX_VALUE, 可能会创建大量的线程,从而导致 OOM

虽然Executor为我们提供了很多方便的工厂方法,比如newSingleThreadExecutor(),也有Executors为我们很好的实现了这些工厂方法,但是手动实现ThreadPoolExecutor能让我们对线程池有更深的了解和控制。所以接下来让我们来介绍一下ThreadPoolExecutor这个类。

原理

一个最常见的ThreadPoolExecutor构造函数如下

1
2
3
4
5
6
7
ThreadPoolExecutor(int corePoolSize, 
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
  • corePoolSize: 活动线程数
  • maximumPoolSize: 线程池上限
  • keepAliveTime: 当线程池中线程数超过corePoolSize后,完成工作后的线程存活时间
  • unit: 单位
    其余的几个参数我们会在后面着重介绍。

这里介绍一下ThreadPoolExecutor的核心工作原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);

workderCountOf()获得当前线程池线程总数,若小于corePoolSize,则直接将任务通过addWorker
()方法执行,否则在workQueue.offer()进入等待队列,若进入失败,则任务直接交给线程池,若线程池达到了maximumPoolSize则提交失败执行拒绝策略

任务队列

BlockingQueue:接口,阻塞队列,数据共享通道

任务队列的作用在于,当线程池中线程数达到corePoolSize的时候,接下来的任务将进入这个队列进行等待,等待执行。

简单原理

服务线程(获取队列信息并处理的线程)在队列为空时进行读等待,有新的消息进入队列后自动唤醒,反之,当队列满时进行写等待直到有消息出队。

不同于常用的offer()和poll()方法,这里我们使用take()和put()方法进行读写。我们以ArrayBlockingQueue的为例子,其中包括了这几个控制对象

1
2
3
final ReentrantLock lick;
private final Condition notEmpty;
private final Condition notFull;

就拿take()来说

1
2
3
4
5
6
7
8
9
10
11
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}

当队列为空时,他会在notEmpty上进行等待,
在线程等待时,若有新的元素插入,线程就会被唤醒

1
2
3
4
5
6
7
8
9
10
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}

常用实现

  • SynchronousQueue(直接提交队列): 一个零容量队列,每个插入操作要对应一个删除操作。提交的任务不会被真实保存,其实就是将新任务交给了线程执行。
  • ArrayBlockingQueue(有界任务队列): 这里就会用到线程池中另一个参数maximumPoolSize, 若当前线程池中线程小于corePoolSize则直接在线程池中增加线程,若大于,则加入该任务队列,若队列满则继续加入线程池,若线程池中数目多余maximumPoolSize则执行拒绝策略。
  • LinkedBlockingQueue(无界任务队列):如果未指定容量,那么容量将等于 Integer.MAX_VALUE。只要插入元素不会使双端队列超出容量,每次插入后都将动态地创建链接节点。
  • PriorityBlockingQueue(优先任务队列): 一个特殊的无界任务队列,前面两者都是按FIFO的顺序执行,而这个是可以按照优先级执行。

    拒绝策略

    JDK内置拒绝策略如下
  • AboerPolicy(默认):直接抛出异常,阻止系统正常工作。
  • CallerRunsPolicy: 只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。(伪丢弃,但是任务提交线程性能大幅度下降)
  • DiscardOledestPolicy:和名字一样,丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。
  • DiscardPolicy: 丢弃无法处理的任务,不给任何处理。

异常堆栈

首先给出一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Main implements Runnable{
int a, b;
public Main(int a, int b) {
this.a = a;
this.b = b;
}
@Override
public void run() {
int ret = a / b;
System.out.println(ret);
}

public static void main(String[] args) {
ThreadPoolExecutor pools = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 0L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
for (int i = 0; i < 5; i++) {
pools.submit(new Main(100, i));
}
}
}
/* 结果如下
100
33
50
25

*/

可以发现,其中一个显然的异常除数为0不见了,我们可以通过将submit方法改为execute方法来打印部分异常信息,但是我们仍然不能发现他的调用线程在哪儿。
这里我们通过扩展线程池给出一种解决办法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package first_maven;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main extends ThreadPoolExecutor{

public Main(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

@Override
public void execute(Runnable task) {
super.execute(wrap(task, clientTrace(), Thread.currentThread().getName()));
}

@Override
public Future<?> submit(Runnable task) {
return super.submit(wrap(task, clientTrace(), Thread.currentThread().getName()));
}

private Exception clientTrace() {
return new Exception("Client stack trace");
}

private Runnable wrap(final Runnable task, final Exception clientStack, String clientThreadName) {
return new Runnable() {
@Override
public void run() {
try {
task.run();
} catch(Exception e) {
clientStack.printStackTrace();
System.out.println(" 1212");
throw e;
}
}
};
}
}

我们通过扩展ThreadPoolExecutor,将要执行的Runnable进行包装,通过手动创建异常,获取当前主线程的调用堆栈,从而得到线程池的调用信息,并打印相应的运行异常,这样我们就可以追踪到完整的异常信息。

总结

在使用多线程的时候,要通过ThreadPoolExecutor来手动创建,根据当前任务的需求分配相应的线程池大小和阻塞队列以及拒绝策略,这样才能知根知底。

五. References

《实战Java高并发程序设计》

《阿里巴巴Java开发手册》

工厂模式

发表于 2018-01-13 | 分类于 设计模式 | 阅读次数:

这两天正在看关于多线程的一些内容,看到线程池的时候发现它的实现使用了工厂模式,之前对工厂模式的了解不深,只是知道他是根据需要创建对象的,索性就开个支线,找了本书看了看关于工厂模式的一些知识,书中讲的也比较有意思,以下是一些心得。

概述

对于设计模式来说,模式本身固然重要,但是模式设计的思想也同样很有味道,其中带来的一些OO的原则更是我们平时写代码需要注意的地方。而对于OO的设计原则其中有一个重要的思想就是将固定与变化分开,也就是简单的策略模式,将变化抽象,针对同一个接口,有各自的实现。

但是对于创建对象来说,java中只有new这一种方法,这就不可避免的要将代码写死,这又是我们不想看到的事情,由于硬编码带来的一系列拓展上的不便,使我们无法针对接口编程。就好像当我们使用集合的使用都会这么写:

1
List<T> list = new XXXList<>();

因为这种针对接口的编程给了我们更多的自由。那么有没有一种灵活的方式创建对象,那就是工厂模式,所有的工厂模式都是针对对象的创建。

一.简单工厂

首先声明一下,简单工厂不是一种设计模式,只是一种习惯而已。他将动态的创建对象这一过程与固定的使用对象的代码分隔开。我们结合一个简单的例子来说:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

class PizzaStore {
SimplePizzaFactory factory;

public PizzaStore(SimplePizzaFactory factory) {
this.factory = factory;
}

public Pizza orderPizza(String type) {
Pizza pizza;

pizza = factory.createPizza(type);

pizza.prepare();
pizza.bake();
pizza.cut();
pizza.box();

return pizza;
}

}


public class SimplePizzaFactory {

public Pizza createPizza(String type) {
Pizza pizza = null;

if (type.equals("cheese")) {
pizza = new CheesePizza();
} else if (type.equals("pepperoni")) {
pizza = new PepperoniPizza();
} else if (type.equals("clam")) {
pizza = new ClamPizza();
} else if (type.equals("veggie")) {
pizza = new VeggiePizza();
}
return pizza;
}
}

在这个例子中,我们所需要创建的对象是Pizza,但在这里我们通过一个factory代替了以往的new关键字来创建对象,而这样的好处也是显而易见的,在这个服务中,变化的是Pizza的种类,而处理Pizza 的流程是固定的。我们只需根据需要传入所需的factory,就能实现创建对象与使用对象的解耦。

我们通过定义一个工厂类,将创建对象的操作通过这个类来进行,当对象种类增加时,我们只需要修改工厂类,就是所谓类对修改关闭,对扩展开放。

二. 工厂方法

在上一个例子中,我们在PizzaStore中创建简单工厂对象,通过简单工厂创建对象,这不免让代码失去了一点弹性,让我们进一步抽象,将创建对象的方法进一步封装,形成一个抽象基类,让每个子类去各自实现自己所需的创建对象的方法。提高代码的可扩展性。下面给出例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
abstract class PizzaStore {

abstract Pizza createPizza(String item);

public Pizza orderPizza(String type) {
Pizza pizza = createPizza(type);
System.out.println("--- Making a " + pizza.getName() + " ---");
pizza.prepare();
pizza.bake();
pizza.cut();
pizza.box();
return pizza;
}
}
public class ChicagoPizzaStore extends PizzaStore {

Pizza createPizza(String item) {
if (item.equals("cheese")) {
return new ChicagoStyleCheesePizza();
} else if (item.equals("veggie")) {
return new ChicagoStyleVeggiePizza();
} else if (item.equals("clam")) {
return new ChicagoStyleClamPizza();
} else if (item.equals("pepperoni")) {
return new ChicagoStylePepperoniPizza();
} else return null;
}
}

在上面的代码中,对象的创建只给出了一个抽象方法,而具体的实现,则有子类自由选择决定,这样极大的丰富了代码的选择性和扩展性。基类实际上并不知道他持有的是什么对象,他主要负责持有对象后的一系列固定流程操作。

定义了一个创建对象的接口,但由子类决定要实例化的类是哪一个。工厂方法让类把实例化推迟到子类。

对比工厂方法和简单工厂

原本有一个对象负责所有具体类的实例化,而在工厂方法中则由一些子类来负责实例化。工厂方法用来处理对象的创建,并将行为封装在子类,这样基类的代码就和子类的对象创建完全解耦。

三. 抽象工厂

当你需要创建的对象也依赖了一系列可变对象,那么就需要工厂模式中的最后一种方式–抽象工厂。我们首先给出抽象工厂的定义:

提供一个借口,用于创建相关或依赖对象的家族,而不需要明确指定具体类。
让我们再用Pizza来举例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class ChicagoPizzaStore extends PizzaStore {

protected Pizza createPizza(String item) {
Pizza pizza = null;
PizzaIngredientFactory ingredientFactory =
new ChicagoPizzaIngredientFactory();

if (item.equals("cheese")) {

pizza = new CheesePizza(ingredientFactory);
pizza.setName("Chicago Style Cheese Pizza");

} else if (item.equals("veggie")) {

pizza = new VeggiePizza(ingredientFactory);
pizza.setName("Chicago Style Veggie Pizza");

} else if (item.equals("clam")) {

pizza = new ClamPizza(ingredientFactory);
pizza.setName("Chicago Style Clam Pizza");

} else if (item.equals("pepperoni")) {

pizza = new PepperoniPizza(ingredientFactory);
pizza.setName("Chicago Style Pepperoni Pizza");

}
return pizza;
}
}

class ChicagoPizzaIngredientFactory
implements PizzaIngredientFactory
{

public Dough createDough() {
return new ThickCrustDough();
}

public Sauce createSauce() {
return new PlumTomatoSauce();
}

public Cheese createCheese() {
return new MozzarellaCheese();
}

public Veggies[] createVeggies() {
Veggies veggies[] = { new BlackOlives(),
new Spinach(),
new Eggplant() };
return veggies;
}

public Pepperoni createPepperoni() {
return new SlicedPepperoni();
}

public Clams createClam() {
return new FrozenClams();
}
}

PizzaStore和之前一样,这里就不重复了,和之前不一样的是在子类的createPizza方法中,我们不是简单的返回对象,而是根据创建对象所依赖的成员的不同,也进行了“个性化定制”。

其本质上其实也是用工厂方法对依赖对象进行创建。

四. 抽象工厂与抽象方法的比较

  1. 工厂方法使用继承,把对象的创建委托给子类,子类实现工厂方法来创建对象,并将实例化延迟到子类。
  2. 抽象工厂使用组合,对象的创建被实现在工厂接口所暴露出来的方法中。
  3. 抽象工厂创建相关的对象家族,并让他们集合起来,而不需要依赖他们的具体类

五. References

《Head First 设计模式》

数据库索引

发表于 2017-11-27 | 分类于 数据库 | 阅读次数:

索引在存储引擎层实现,所以我们先介绍一下不同的数据库引擎。

一.数据库引擎的类型

1. InnoDB

MySQL的默认事务型引擎,使用mvcc来支持高并发,实现了四个标准的隔离级别,默认级别是rr,通过间隙锁策略防止幻读。

InnoDB表基于聚簇索引建立的,数据存储在表空间中,可以将每个表的数据和索引存放在单独的文件中,支持热备份,其他的引擎都不支持。他的索引结构和其他存储引擎有很大不同,他的二级索引中必须包含主键

2. MyISAM

Mysql5.1及之前版本的默认引擎,有大量特性,包括全文索引、压缩、空间函数,不支持事务和行级锁,崩溃后无法安全恢复。

MyISAM会将表存储在两个文件中:数据文件和索引文件。

InnoDB和MyISAM的区别

  1. InnoDB支持事务,MyISAM不支持
  2. InnoDB是聚集索引,数据文件是和索引绑在一起的,必须要有主键,通过主键索引效率很高。但是辅助索引需要两次查询,先查询到主键,然后再通过主键查询到数据。因此,主键不应该过大,因为主键太大,其他索引也都会很大。而MyISAM是非聚集索引,数据文件是分离的,索引保存的是数据文件的物理地址。主键索引和辅助索引是独立的。

3.Memory

如果需要快速的访问数据,并且这些数据不会被修改,重启后丢失也没有关系,那么使用Memory表很有用,他比MyISAM快一个数量级,因为索欧数据都保存在内存中。重启后结构保留,数据丢失。

二.索引的类型

1.B-Tree索引

实际上在很多引擎中使用的是B+树进行优化比如innoDB,之后再写一篇详细介绍B-树索引的文章,这里先简单介绍一下,他的工作原理,存储引擎不需要进行权标扫描来获取数据,而是从根节点开始搜索,根节点存了指向子节点的指针,根据这些指针向下层搜索,通过比较节点页的值和要查找的值,可以找到合适的指针进入下一层。

索引对多个值进行排序依据的是CREATE TABLE语句中定义索引时列的顺序

可以使用B-Tree索引的查询类型

适用于全键值、键值范围、或者键前缀查找,假设索引建立在姓、名、出生日期上,实际使用如下所示

  • 全值匹配: 匹配Cuba Allen、生日是1970-01-01的人
  • 匹配最左前缀:可以用于查找姓为Allen的人,即只用第一列
  • 匹配列前缀:匹配姓开头为J的
  • 匹配范围值:查找姓在Allen和Jack之间的人
  • 精确匹配某一列并范围匹配另一列:查找姓为Allen名字是K开头的

优点

  1. 大大减少了服务器需要扫描的数据量
  2. 可以帮助服务器避免排序和临时表
  3. 可以将随机io编程顺序io

    限制

  • 若果不按引的最左列开始查找,则无法使用:比如无法找到特定生日的人活着名字叫Bill的人,要从索引的最左列开始,所以也无法找找姓氏以某个字母结尾的人
  • 不能跳过索引:即使无法找到姓为Smith并且在某个特定日期出生的人
  • 如果查询中有某个范围查询,那么他右边的所有列都无法使用索引优化查找:例如where last_name = “Smith” and first_name like “J%” and dob = ‘1990-01-01’,由于第二列是范围查询,所以第三列作废

Tips

  • 对于BLOB、TEXT或者很长的VARCHAR类型的列,必须使用前缀索引,因为MySQL不允许索引这些列的完整长度。
  • 选择合适的索引列顺序

2.哈希索引

只有精确匹配索引所有列的查询才有效,对于每一行数据,都会对所有的索引列计算一个hashcode,哈希索引将所有的哈希码存储在索引中,同时在哈希表中保存指向每个数据行的指针

Innodb有一个功能叫做自适应哈希索引,当innodb注意到某些索引值使用的很频繁的时候,会在内存中基于B-Tree索引之上再创建一个哈希索引。

缺陷

  • 索引只包含哈希值和行指针不存储字段值,无法通过索引值来避免读行
  • 不是按索引值的顺序存储的,无法排序
  • 不支持部分索引列匹配查找
  • 只支持等值比较
  • 访问哈希索引的数据很快,除非有很多冲突,冲突多的时候维护代价很大

3.空间数据索引(R_Tree)

4. 全文索引

InnoDB和MyISAM中B-Tree实现区别

聚簇索引和非聚簇索引

聚簇索引的顺序就是数据的物理存储顺序,而对非聚簇索引的索引顺序与数据物理排列顺序无关其实就是一个存储的是具体数据,一个存储了物理地址。正是聚簇索引的顺序就是数据的物理存储顺序,所以一个表最多只能有一个聚簇索引,因为物理存储只能有一个顺序。

InnoDB和MyISAM的数据分布对比

下面分别是聚簇索引和非聚簇索引的存储结构图

聚簇索引


总结:

  1. InnoDB是聚簇索引,通过主键引用被索引的值,数据文件和索引文件在一起,MyISAM是非聚簇索引,通过物理地址索引被索引的值,数据文件和索引文件在两个文件中
  2. InnoDB的二级索引需要包含主键,MyISAM不需要,仍然只需要存储地址,他的主键索引有唯一性要求,二级索引没有
  3. InnoDB索引保存了原格式文件,MyISAM使用了前缀压缩

后记

对于聚簇索引和非聚簇索引的区别,应该从他们存储的区别以及相对应的主键索引和二级索引来说。用查字典来举例的话,聚簇索引类似用拼音检索,物理顺序和逻辑顺序一致,非聚簇索引类似于偏旁部首查找,通过偏旁部首找到页码,也就是相应的物理地址指针。正由于一个字典只有一个排列顺序,所以一个表只有一个聚簇索引。对于聚簇索引对应的主索引和二级索引,他的二级索引包含主键列,在查询的时候需要查询2次,先要查询到主键列,再根据这个值去聚簇索引中查找。对于非聚簇索引的主键索引和二级索引相差不大,存储的都是相应的物理地址。

数据库隔离级别

发表于 2017-11-26 | 分类于 数据库 | 阅读次数:

READ UNCOMMITTED(未提交读)

事务中的修改,即使没有提交,对其他事务也都是可见的,即读取了一个未提交的修改(脏读)

READ COMMITTED(提交读)

大多数数据库的默认隔离级别(mysql不是),满足ACID中的隔离性定义:一个事务开始时,只能看见已经提交的事务所做的修改。有点不能理解,在网上看到了一个例子:

singo拿着工资卡去消费,系统读取到卡里确实有2000元,而此时她的老婆也正好在网上转账,把singo工资卡的2000元转到另一账户,并在 singo之前提交了事务,当singo扣款时,系统检查到singo的工资卡已经没有钱,扣款失败,singo十分纳闷,明明卡里有钱,为 何……

出现上述情况,即我们所说的不可重复读 ,两个并发的事务,“事务A:singo消费”、“事务B:singo的老婆网上转账”,事务A事先读取了数据,事务B紧接了更新了数据,并提交了事务,而事务A再次读取该数据时,数据已经发生了改变。

这里事务A两次读取到的数据都是已经提交的,不管是首次读取的2000,还是在B事务提交之后读取到的0,所以这个级别有时候也叫不可重复读。

当隔离级别设置为Read committed 时,避免了脏读,但是可能会造成不可重复读。

REPEATABLE READ(可重复度, mysql默认级别)

该级别保证同一个事务多次读取的结果是一致的,在同一个事务里,SELECT的结果是事务开始时时间点的状态,因此,同样的SELECT操作读到的结果会是一致的。用上面的例子说就是当singo拿着工资卡去消费时,一旦系统开始读取工资卡信息(即事务开始),singo的老婆就不可能对该记录进行修改,也就是singo的老婆不能在此时转账。但是这个级别会产生幻读,所谓幻读,指的是当某个事务在读取某个范围内的记录时,另一个事务又在该范围内插入新纪录,当前事务再次读取该范围的记录时会产生幻行。

幻读和不可重复读的区别

幻读是指当事务不是独立执行时发生的一种现象,例如第一个事务对一个表中的数据进行了修改,这种修改涉及到表中的全部数据行。同时,第二个事务也修改这个表中的数据,这种修改是向表中插入一行新数据。那么,以后就会发生操作第一个事务的用户发现表中还有没有修改的数据行,就好象
发生了幻觉一样。

不可重复读强调的是对同一个数据的修改得到两个不同的结果,幻读强调的是对结果集的插入或删除操作,产生了新的结果集。

Serializable (序列化)

Serializable 是最高的事务隔离级别,强制事务串行执行,简单来说就是读取的每一行数据都加锁,同时代价也花费最高,性能很低,一般很少使用,在该级别下,事务顺序执行,不仅可以避免脏读、不可重复读,还避免了幻像读。

间隙锁(Next-Key锁)

当我们用范围条件而不是相等条件检索数据,并请求共享或排他锁时,InnoDB会给符合条件的已有数据记录的索引项加锁;对于键值在条件范围内但并不存在的记录,叫做“间隙(GAP)”,InnoDB也会对这个“间隙”加锁,这种锁机制就是所谓的间隙锁(Next-Key锁),另外一方面,是为了满足其恢复和复制的需要。

举例来说,假如emp表中只有101条记录,其empid的值分别是 1,2,…,100,101,下面的SQL:

Select * from emp where empid > 100 for update;
是一个范围条件的检索,InnoDB不仅会对符合条件的empid值为101的记录加锁,也会对empid大于101(这些记录并不存在)的“间隙”加锁。

InnoDB使用间隙锁的目的,一方面是为了防止幻读,以满足相关隔离级别的要求,

√: 可能出现 ×: 不会出现

隔离级别 脏读 不可重复读 幻读 加锁读
Read uncommitted √ √ √ ×
Read committed × √ √ ×
Repeatable read × × √ ×
Serializable × × × √

最后提一句

InnoDB采用两阶段锁协议,只有在commit和rollback的时候才会释放锁并且是同一时间释放,并且这里说的锁都是隐式加锁,InnoDB会根据隔离级别在需要的时候自动加锁。

为什么String是不可变对象(译)

发表于 2017-11-23 | 分类于 java | 阅读次数:

原文:Why String is immutable in Java ?

String是Java中的一个不可变类。所谓不可变,简单来说就是其对象不能被修改。实例中的所有信息在初始化的时候就已经具备,并且不能被修改。不可变类有很多优点。这篇文章简要说明了为什么String被设计为不可变类。关于其好的回答应该建立在对内存模型、同步和数据结构等的理解之上。

1. 字符串池的需求

字符串池是一个位于方法区的特殊区域。当一个字符串被创建的时候,如果该字符串已经存在于字符串池中,那么直接返回该字符串的引用,而不是创建一个新的字符串。
下边的代码将只会创建一个字符串对象:

1
2
String s1 = "abcd";
String s2 = "abcd";

s1和s2都指向同一个字符串对象。
如果String不是不可变的,那么修改s1的字符串对象同样也会导致s2的内容发生变化。

2. 缓存Hashcode

字符串的hashcode在Java中经常被用到。例如,在一个HashMap中。其不可变性保证了hashcode(哈希值)总是保持不变,从而不用担心因hashcode变化导致的缓存问题。那就意味着,不用每次在其使用的时候计算其hashcode,从而更加高效。
在String类中,有如下代码:

1
private int hash; //用来缓存hash code

3. 简化其他对象的使用

为了理解这一点,请看下边的代码:

1
2
3
4
5
6
HashSet<String> set = new HashSet<String>();
set.add(new String("a"));
set.add(new String("b"));
set.add(new String("c"));
for (String a : set)
a.value = "a";

这个例子中,如果String是可变的,也就是说set中的值是可变的,这会影响到set的设计(set包含不重复的元素)。当然这个例子是有问题的,在String类中是不存在value这个属性的。(ps:个人觉得应该是没有可以直接访问的value,毕竟String中value数组是可以通过反射访问的,不知道,不知道这个老外是怎么个意思)

4.安全性

字符串在许多的java类中都用作参数,例如网络连接,打开文件等等。如果字符串是可变的,一个连接或文件就会被修改从而导致严重的错误。可变的字符串也会导致在使用反射时导致严重的问题,因为参数是字符串形式的。
举例如下:

1
2
3
4
5
6
7
boolean connect(String s) {
if (!isSecure(s)) {
throw new SecurityException();
}
// 如果s内的值被修改,则会导致出现问题
doSomethind(s);
}

(虽然略牵强,但是也有一定道理)

5. 不可变的对象本身就是线程安全的

不可变的对象,可以在多个线程间自由共享。从而免除了进行同步的麻烦。

总之, String被设计为不可变的类,是出于性能和安全性的考虑,这也是其他所有不可变类应用的初衷。

6. 引用

http://blog.csdn.net/get_set/article/details/49926511

12

MoriatyC

MoriatyC的学习心得

18 日志
9 分类
27 标签
© 2018 MoriatyC
由 Hexo 强力驱动
|
主题 — NexT.Gemini v5.1.4