Disruptor

设计方案

利用填充使缓存友好

环形数组也有能充分利用局部性原理, 可以一齐加载通过缓存行加载到CPU高速缓存,数据的遍历访问可以更有效地利用了 CPU 里面的多级流水线

@Contended注解:可以用于类级别的修饰,同时也可以用于字段级别的修饰,当应用于字段级别时,被注释的字段将和其他字段隔离开来,会被加载在独立的缓存行上,字段级别上,@Contended还支持一个“contention group”属性,同一group的字段们在内存上将是连续(64字节范围内)

如果在类上应用该注解,将使整个字段块的两端都被填充,就跟Disruptor自己定义7个变量一样

使用7个变量的原因在于:前后7个就是因为8个long正好64byte,这样cache line无论在哪个位置被加载,这64个byte在第一次加载到cache line

填充cache line的手法是为了防止False Sharing:多线程修改互相独立的变量时,如果这些变量共享同一个缓存行,就会无意中影响彼此的性能

开发

disruptor初始化的时候,会调用Event工厂,对ringBuffer进行内存的提前分配 GC产生频率会降低

public class StringEventFactory implements EventFactory<String> {
    @Override
    public String newInstance() {
        return UUID.randomUUID().toString();
    }
}
public class StringEventHandler implements EventHandler<String> {
    @Override
    public void onEvent(String s, long l, boolean b) throws Exception {
        System.out.println(Thread.currentThread().getName() + "handle " + s);
    }
}
StringEventFactory eventFactory = new StringEventFactory();
int bufferSize = 1024;

Disruptor<String> disruptor =
        new Disruptor<>(eventFactory, bufferSize, Executors.defaultThreadFactory());
disruptor.handleEventsWith(new StringEventHandler());
disruptor.start();

RingBuffer<String> ringBuffer = disruptor.getRingBuffer();
for (int i = 0; i < 10; i++) {
    ringBuffer.publishEvent((s, l) -> {});
}

生产者线程模式

ProducerType有两种模式 Producer.MULTI和Producer.SINGLE

默认是MULTI,表示在多线程模式下产生sequence

如果确认是单线程生产者,那么可以指定SINGLE,效率会提升

等待策略

1,(常用)BlockingWaitStrategy:通过线程阻塞的方式,等待生产者唤醒,被唤醒后,再循环检查依赖的sequence是否已经消费。

2,BusySpinWaitStrategy:线程一直自旋等待,可能比较耗cpu

3,LiteBlockingWaitStrategy:线程阻塞等待生产者唤醒,与BlockingWaitStrategy相比,区别在signalNeeded.getAndSet,如果两个线程同时访问一个访问waitfor,一个访问signalAll时,可以减少lock加锁次数.

4,LiteTimeoutBlockingWaitStrategy:与LiteBlockingWaitStrategy相比,设置了阻塞时间,超过时间后抛异常。

5,PhasedBackoffWaitStrategy:根据时间参数和传入的等待策略来决定使用哪种等待策略

6,TimeoutBlockingWaitStrategy:相对于BlockingWaitStrategy来说,设置了等待时间,超过后抛异常

7,(常用)YieldingWaitStrategy:尝试100次,然后Thread.yield()让出cpu

8,(常用)SleepingWaitStrategy : sleep

消费者异常处理

默认:disruptor.setDefaultExceptionHandler()

覆盖:disruptor.handleExceptionFor().with()