高性能的异步处理框架Disruptor(五)??Disruptor2.0的应用

发布于:2021-10-21 08:52:15

前文讲了那么多理论,原理。现在通过实际的代码,来看看Disruptor2.0的几种实际应用代码。



com.lmax
disruptor
3.4.0
test


Event事件定义:


/**
* Event,RingBuffer的数据
* @author wangxi
* @date 2019-10-16 16:31
*/
@Slf4j
public class DisruptorEvent implements Serializable {

Integer param;

public void setParam(Integer i){
this.param = param;
}

public void say(){
log.info("i===>{},nowTime===>{}, nowThreadId===>{}", param, System.currentTimeMillis()+"", Thread.currentThread().getId()+"");
}

}


Event事件工厂


/**
* @author wangxi
* @date 2019-10-16 16:42
*/
public class DisruptorEventFactory implements EventFactory {

@Override
public DisruptorEvent newInstance() {
return new DisruptorEvent();

}
}

消费者EventHandler定义:


/**
* 消费者
* @author wangxi
* @date 2019-10-16 16:55
*/
public class DisruptorEventHandler implements WorkHandler {

@Override
public void onEvent(DisruptorEvent disruptorEvent) throws Exception {

disruptorEvent.say();
}
}


Publisher 生产者和Disruptor的启动


public class DisruptorPublisher {

private static final AtomicLong INDEX = new AtomicLong(1);
private Disruptor disruptor;

//开启
public void start(final int bufferSize, final int threadSize){

ThreadFactory threadFactory = runnable ->
new Thread(new ThreadGroup("disruptor"), runnable,
"disruptor-thread-" + INDEX.getAndIncrement());

//ProducerType.MULTI 多生产者
disruptor = new Disruptor<>(new DisruptorEventFactory(), bufferSize, threadFactory, ProducerType.MULTI, new BlockingWaitStrategy());


DisruptorEventHandler[] consumers = new DisruptorEventHandler[threadSize];
for (int i = 0; i < threadSize; i++) {
consumers[i] = new DisruptorEventHandler();
}

disruptor.handleEventsWithWorkerPool(consumers);
/**
如上是仅存在单个DisruptorEventHandler 的情况,如果存在一个如前文所属的菱形结构。存在DisruptorEventHandler[] a,DisruptorEventHandler[] b, DisruptorEventHandler[] c。c依赖a和b
则如下的结构:
disruptor.handleEventsWithWorkerPool(a).handleEventsWithWorkerPool(b).thenHandleEventsWithWorkerPool(consumerThree);
如果 c 依赖 b,b依赖a
disruptor.handleEventsWithWorkerPool(a).thenhandleEventsWithWorkerPool(b).thenHandleEventsWithWorkerPool(consumerThree);
**/

disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());
disruptor.start();
}

public static void main(String[] args){

DisruptorPublisher disruptorPublisher = new DisruptorPublisher();
disruptorPublisher.start(2*2*2*2*2, 10);

RingBuffer ringBuffer = disruptorPublisher.disruptor.getRingBuffer();

for(int i =0;i<100;i++) {

ringBuffer.publishEvent(new EventTranslatorOneArg() {

/**
* 该回调运行于主线程
* @param o 由DisruptorEventFactory产生的
* @param l 统计,记录回掉的第几个Event
* @param o2 publishEvent 第二个参数回填这里
*/
@Override
public void translateTo(DisruptorEvent o, long l, Integer o2) {
//设置Event的参数
o.setParam(o2);
}
}, i);

}

}
}



对于菱形的依赖结构的实现如下:


DisruptorEventHandlerOne[] consumerOne = new DisruptorEventHandlerOne[threadSize];
DisruptorEventHandlerTwo[] consumerTwo = new DisruptorEventHandlerTwo[threadSize];
DisruptorEventHandlerThree[] consumerThree = new DisruptorEventHandlerThree[threadSize];

for (int i = 0; i < threadSize; i++) {

consumerOne[i] = new DisruptorEventHandlerOne();
consumerTwo[i] = new DisruptorEventHandlerTwo();
consumerThree[i] = new DisruptorEventHandlerThree();
}

disruptor.handleEventsWithWorkerPool(consumerOne).handleEventsWithWorkerPool(consumerTwo).thenHandleEventsWithWorkerPool(consumerThree);

相关推荐

最新更新

猜你喜欢