Libevent Java版实现
What's about the libevent? read more
Libevent Java版实现
纸上得来终觉浅,demo一下
Event 结构
/**
* 基础配置
*/
public interface EventConfig {
public int EV_READ = 0x01;
public int EV_WRITE = 0x02;
public int EV_TIMEOUT = 0x04; // 定时事件
;
public int EV_ACCEPT = 0x08;
public int EV_CONNECT = 0x10;
public int EV_PERSIST = 0x20;// 辅助选项
public int EVLIST_TIMEOUT = 0x01;// event在time堆中
public int EVLIST_INSERTED = 0x02;// event在已注册事件链表中
public int EVLIST_ACTIVE = 0x04;// event在激活链表中
public int EVLIST_INTERNAL = 0x08;// 内部使用标记
public int EVLIST_INIT = 0x10;// event已被初始化
}
/**
* Event 主体
*/
public class Event implements Comparable<Event>, EventConfig {
EventBase eventBase;
SelectableChannel selectableChannel;
int ev_events;//event status,such as read write accept
int ev_ncalls;//callback counts
AtomicInteger ev_pncalls;//Allows deletes in callback
int ev_res;//result passed to event callback
int ev_flags=EVLIST_INIT; //libevent status
long ev_timeout;//
public int compareTo(Event o) {
if (this.ev_timeout > o.ev_timeout)
return 1;
else if (this.ev_timeout == o.ev_timeout)
return 0;
else
return -1;
}
int ev_pri; // smaller numbers are higher priority
Object[] args;
EventCallBackHandler eventCallBackHandler;
」
EventBase 表示一个libevent instance,event相关list均在此对象 例如,当向写socket写入数据时,读socket就会得到通知,触发读事件,从而event_base就能相应的得到通知
public LinkedList<Event> events_queue;//event register list
public LinkedList<Event>[] active_queues;//active list
public PriorityQueue<Event> timeheap;//timer event mini-heap
public int event_count_active=0;
public int event_count=0;
public EventOp eventOp;
public boolean event_break;
public long timecache=0;
EventOp 为libevent提供I/O demultiplex机制
先看下EventOp的全局属性
private Selector selector;
private LibEvent libEvent;
private Map<SelectableChannel, Event> acceptEventByChannel = new HashMap<SelectableChannel, Event>();
private Map<SelectableChannel,Event> readEventByChannel = new HashMap<SelectableChannel, Event>();
private Map<SelectableChannel,Event> writeEventByChannel = new HashMap<SelectableChannel, Event>();
public EventOp(LibEvent libEvent) {
this.libEvent = libEvent;
}
/**
* libevent 抽象,可以理解为event 处理的统一接口
*/
public interface LibEvent {
void init() throws IOException;
void event_add(Event event,long timeout) throws IOException;
void event_del(Event event);
void event_loop();
Event event_set(SelectableChannel channel,int ev_events,EventCallBackHandler eventCallBackHandler,Object... args);
void event_active(Event event,int ev_res,int ncalls);
}
下面是Add Event为例
/**
* 将event添加至reactor chanel,java这里就是selector - channel
*/
public void add(Event event) throws IOException {
if (event != null) {
if((event.ev_events&EventConfig.EV_ACCEPT)>0){
SelectionKey key = event.selectableChannel.keyFor(selector);
if (key == null) {
key = event.selectableChannel.register(selector, SelectionKey.OP_ACCEPT);
} else {
key.interestOps(key.interestOps() | SelectionKey.OP_ACCEPT);
}
acceptEventByChannel.put(event.selectableChannel,event);
}else if((event.ev_events&EventConfig.EV_READ)>0){
SelectionKey key = event.selectableChannel.keyFor(selector);
if (key == null) {
key = event.selectableChannel.register(selector, SelectionKey.OP_READ);
} else {
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
}
readEventByChannel.put(event.selectableChannel,event);
}else if((event.ev_events&EventConfig.EV_WRITE)>0){
SelectionKey key = event.selectableChannel.keyFor(selector);
if (key == null) {
key = event.selectableChannel.register(selector, SelectionKey.OP_WRITE);
} else {
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
}
writeEventByChannel.put(event.selectableChannel,event);
}
}
}
Dispatch 处理I/O事件就绪的过程:遍历 selector.selectedKeys 根据时间的类型,交给Libevent 接口的实现来处理
public int dispatch(long timeout) throws IOException {
if (timeout > 0) {
int n = selector.select(timeout);
if (n > 0) {
Set<SelectionKey> set=selector.selectedKeys();
Iterator<SelectionKey> it = set.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
Event accept=null;
Event read = null;
Event write = null;
int ev_res=0;
if(!key.isValid()){
continue;
}
if ((key.interestOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
ev_res |= EventConfig.EV_ACCEPT;
accept = acceptEventByChannel.get(key.channel());
}
if ((key.interestOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
ev_res |= EventConfig.EV_READ;
read = readEventByChannel.get(key.channel());
}
if ((key.interestOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
ev_res |= EventConfig.EV_WRITE;
write = writeEventByChannel.get(key.channel());
}
if (accept != null && (accept.ev_events & EventConfig.EV_ACCEPT)>0) {
libEvent.event_active(accept,ev_res,1);
}
if (read != null && (read.ev_events & EventConfig.EV_READ)>0) {
libEvent.event_active(read,ev_res,1);
}
if (write != null && (write.ev_events & EventConfig.EV_WRITE)>0) {
libEvent.event_active(write,ev_res,1);
}
it.remove();
}
}
return n;
}else{
return 0;
}
}
event active处理
public void event_active(Event event, int ev_res, int ncalls) {
if ((event.ev_flags & EventConfig.EVLIST_ACTIVE) > 0) {
event.ev_flags |= ev_res;
return;
}
event.ev_ncalls = ncalls;
event.ev_pncalls = null;
event.ev_res = ev_res;
eventQueueInsert(event,EventConfig.EVLIST_ACTIVE);
}
/**
* 注册到不同的数据结构
*/
public void eventQueueInsert(Event event,int queue) {
if ((event.ev_flags & queue) > 0) {
if ((event.ev_flags & EventConfig.EVLIST_ACTIVE) > 0) {
return;
}
}
event.ev_flags |= queue;
switch (queue) {
case EventConfig.EVLIST_INSERTED:
eventBase.events_queue.add(event);
break;
case EventConfig.EVLIST_TIMEOUT:
eventBase.timeheap.add(event);
break;
case EventConfig.EVLIST_ACTIVE:
eventBase.active_queues[event.ev_pri].add(event);
eventBase.event_count_active++;
break;
default:throw new IllegalArgumentException("unknown queue" + queue);
}
}
下面使用libevent创建server,并启动
final LibEvent libEvent = new LibEventHandler();
libEvent.init();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(8080));
//注册事件
Event event = libEvent.event_set(serverSocketChannel, EventConfig.EV_ACCEPT|EventConfig.EV_PERSIST, new AcceptHandler(libEvent));
libEvent.event_add(event,2000);
System.out.println("server starting...");
libEvent.event_loop();
event_loop 处理过程:
public void event_loop() {
this.eventBase.timecache=0;
while (!done) {
//根据堆中具有最小超时值的事件和当前时间来计算等待时间
long timeout = getTimeoutNext();
try {
//等待I/O事件就绪
// 调用系统I/O demultiplexer等待就绪I/O events,可能是epoll_wait,或者select/kqueue 等
//dispatch()中,会把就绪signal event、I/O event插入到激活链表中
this.eventOp.dispatch(eventBase, timeout);
this.eventBase.timecache = getTime(eventBase);
LinkedList<Event> activeList = null;
for (int i=eventBase.active_queues.length-1;i>=0;i--) {
if (eventBase.active_queues[i].peek() != null) {
activeList = eventBase.active_queues[i];
break;
}
}
if (activeList != null) {
Event event = null;
//时间处理
while ((event=activeList.peek()) != null) {
if ((event.ev_events & EventConfig. EV_PERSIST) > 0) {
eventQueueRemove(event,EventConfig.EVLIST_ACTIVE);
}else{
event_del(event);
}
AtomicInteger ncalls = new AtomicInteger(event.ev_ncalls);
event.ev_pncalls = ncalls;
while (ncalls.get() > 0) {
event.ev_ncalls = ncalls.decrementAndGet();
event.eventCallBackHandler.callback(event.selectableChannel,event.ev_res,event.args);
if (eventBase.event_break) {
return;
}
}
}
}
if (!eventBase.timeheap.isEmpty()) {
long now = getTime(eventBase);
Event event = null;
while ((event = eventBase.timeheap.peek()) != null) {
if (event.ev_timeout > now) {
break;
}
this.event_del(event);
event_active(event,EventConfig.EV_TIMEOUT,1);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public long getTime(EventBase eventBase) {
if (eventBase.timecache > 0) {
return eventBase.timecache;
}
return System.currentTimeMillis();
}
private long getTimeoutNext() {
long selectionTimeout = 1000L;
if (this.eventBase.event_count_active > 0) {
selectionTimeout = -1;
} else {
Event timeoutEvent = eventBase.timeheap.peek();
if (timeoutEvent != null) {
long now = getTime(this.eventBase);
if (timeoutEvent.ev_timeout < now) {
selectionTimeout = -1L;
} else
selectionTimeout = timeoutEvent.ev_timeout - now;
}
}
return selectionTimeout;
}