brunch

You can make anything
by writing

C.S.Lewis

by myner Oct 18. 2020

네티 NioEventLoop

NioEventLoop


NioEventLoop는 SingleThreadEventLoop에서 상속되고 SingleThreadEventLoop는 SingleThreadEventExecutor에서 상속된다. 


SingleThreadEventExecutor는 Netty의 로컬 스레드를 추상화 한 것이다. 내부에 Thread 스레드 속성이 있고 로컬 Java 스레드를 저장한다. 따라서 NioEventLoop이 실제로 특정 스레드라고 생각할 수 있다. 바운드 및 라이프 사이클 동안 바운드 스레드는 변경되지 않는다.


NioEventLoop 클래스 계층 구조


NioEventLoop의 클래스 계층 다이어그램은 상당히 복잡하지만 몇 가지 중요한 점만주의하면된다. 먼저 NioEventLoop의 상속 체인은 다음과 같다.


NioEventLoop -> SingleThreadEventLoop -> SingleThreadEventExecutor -> AbstractScheduledEventExecutor


AbstractScheduledEventExecutor에서 Netty는 NioEventLoop의 schedule 함수를 구현한다. 

인스턴스의 실행 메서드는 NioEventLoop에 의해 실행되도록 예약 된 작업 대기열에 작업을 추가한다.


일반적으로 NioEventLoop은 두 가지 작업을 담당한다. 첫 번째는 준비된 IO 이벤트를 기다리기 위해 select 호출, 데이터 읽기 및 쓰기 및 데이터 처리 등을 포함하여 채널 관련 IO 작업을 수행하는 IO 스레드로, 두 번째 작업은 taskQueue에서 작업을 실행하기위한 작업 대기열로 사용된다. 


NioEventLoop의 인스턴스화 프로세스

SingleThreadEventExecutor 생성자에서 threadFactory.newThread를 통해 새로운 Java 스레드가 생성 된다. 이 스레드에서 수행되는 작업은 주로 SingleThreadEventExecutor.this.run() 메서드 를 호출하는 것이며 NioEventLoop이 다형성에 따라이 메서드를 구현하기 때문이다. 

protected SingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {     

    this.parent = parent;     

    this.addTaskWakesUp = addTaskWakesUp;      

    thread = threadFactory.newThread(new Runnable() {         

        @Override         

        public void run() {             

            boolean success = false;             

            updateLastExecutionTime();             

            try {                 

                SingleThreadEventExecutor.this.run();                 

                success = true;             

            } catch (Throwable t) {                 

                logger.warn("Unexpected exception from an event executor: ", t);             

            } finally {                

                 ...            

             }         

        }    

     });     

    threadProperties = new DefaultThreadProperties(thread);    

    taskQueue = newTaskQueue(); 

}


EventLoop 및 채널 연결

Netty에서 각 채널에는 연결된 EventLoop 하나만 있으며 연결 프로세스는 다음과 같다.

Bootstrap에서 bind -> doconnect가호출되면 AbstractBootStrap의 initAndRegister()가 호출된다. 이후에 channelFactory().newChannel() -> init(channel)-> group().register(channel)이 호출되고 이는 MultithreadEventLoopGroup의 register를호출한다. MultithreadEventExecutorGroup에서 NioEventLoop 객체를 리턴받고 SingleThreadEventLoop에 register를 호출하여 AbstractChannel-AbstractUnsafe.register까지 호출하여 Java의 하단 Socket의 동작을 캡슐화하는 실제로는 Netty의 상위 계층과 Java의 하단 계층을 연결하는 중요한 다리 역할하는 Unsafe 객체까지 등록을 마치게된다.


AbstractChannel - AbstractUnsafe.register가 호출 되면 Channel과 EventLoop 간의 연관이 완료되며 레지스터 구현은 다음과 같다.


@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { 

    AbstractChannel.this.eventLoop = eventLoop;      

    if (eventLoop.inEventLoop()) {         

        register0(promise);     

    } else {         

        try {             

            eventLoop.execute(new OneTimeTask() {                 

                @Override                 

                public void run() {                     

                    register0(promise);                

                 }             

            });         

        } 

        catch (Throwable t) {             ...         }    

     } 

}


AbstractChannel - AbstractUnsafe.register , EventLoop는 AbstractChannel 내부 eventLoop 필드에 할당되고, EventLoop 및 채널 간의 연관성의 방법은 여기에 완료된다.


EventLoop 시작

이미 알고 있듯이 NioEventLoop 자체는 SingleThreadEventExecutor이므로 NioEventLoop의 시작은 실제로 NioEventLoop에 바인딩 된 로컬 Java 스레드의 시작이다.


이 아이디어에 따르면 SingleThreadEventExecutor의 스레드 필드에서 start() 가 호출되는 위치만 찾으면 된다. 코드에서 검색하면 thread.start()가 SingleThreadEventExecutor.startThread() 메서드에 캡슐화 되어있다.


private void startThread() {     

    if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {         

        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {             thread.start();         

        }     

    } 

}


STATE_UPDATER 은 SingleThreadEventExecutor에 의해 내부적으로 유지되는 속성으로 현재 스레드 상태를 식별하는 기능이다. 처음에 STATE_UPDATER == ST_NOT_STARTED 이며 startThread() 메서드를 처음 호출하면 if 문에 들어가서 스레드를 호출한다.


@Override public void execute(Runnable task) {     

    if (task == null) {         

        throw new NullPointerException("task");     

    }      


    boolean inEventLoop = inEventLoop();     

    if (inEventLoop) {         

        addTask(task);     

    } else {         

        startThread(); 

        addTask(task);         

        if (isShutdown() && removeTask(task)) {             

            reject();         

        }     

    }      


    if (!addTaskWakesUp && wakesUpForTask(task)) {         

        wakeup(inEventLoop);     

    } 

}


AbstractChannel - AbstractUnsafe.register 

if (eventLoop.inEventLoop()) {     

    register0(promise); 

} else {     

    try {         

        eventLoop.execute(new OneTimeTask() {             

            @Override             

            public void run() {                 

                register0(promise);             

            }         

        });     

    } catch (Throwable t) {         ...     } 

}


Bootstrap.bind 메서드에서 AbstractChannel - AbstractUnsafe.register 메서드까지 전체 코드가 메인 스레드에서 실행되고 있으므로 위의 eventLoop.inEventLoop() 이 false 이므로이 분기에서 else 분기로 들어간다. eventLoop.execute eventLoop를 호출하는 것은 NioEventLoop의 예이지만 NioEventLoop이 실행 메소드를 달성하지 못했기 때문에 SingleThreadEventExecutor.execute 호출은 다음과 같다. 


@Override public void execute(Runnable task) {     

    ...     

    boolean inEventLoop = inEventLoop();     


    if (inEventLoop) {         

        addTask(task);     

    } else {         

        startThread();         

        addTask(task);         

        if (isShutdown() && removeTask(task)) {             

            reject();         

        }     

    }      


    if (!addTaskWakesUp && wakesUpForTask(task)) {         

        wakeup(inEventLoop);     

    } 

}


inEventLoop == false를 분석 하여 실행이 else 브랜치에 도달하여 SingleThreadEventExecutor와 관련된 Java 네이티브 스레드를 시작하기 위해 startThread() 메서드 가 호출 된다.


Netty의 IO 처리 루프

Netty는 Reactor 모델의 구현이며 Java NIO를 기반으로 한다. Java NIO의 NIO Selector에 대한 지식이 있어야한다. Netty에는 Java NIO의 Selector.select를 지속적으로 호출하는데 사용되는 Selector 스레드가 있다. 그리고 현재 준비된 IO 이벤트가 있는지 쿼리한다. 이후에 Java NIO에 설명 된 Selector의 사용 프로세스를 검토한다.


1. Selector.open()을 통해 Selector를 연다.  

2. 채널을 선택기에 등록하고 모니터링해야하는 이벤트를 설정한다(관심 설정).

3. 반복: 

- select() 메서드 호출 

- selector.selectedKeys()를 호출하여 선택한 키를 가져온다. 

- 선택한 각 키를 반복한다. 

    1) 선택한 키에서 해당 채널 및 추가 정보(있는 경우)를 얻는다

    2) 준비된 IO 이벤트를 확인한 다음 처리한다. 

    OP_ACCEPT 이벤트 인 경우 "SocketChannel clientChannel = ((ServerSocketChannel) key.channel()). accept()"를 호출하여 SocketChannel을 가져오고이를 non으로 설정한다. 블로킹된 다음 채널을 Selector에 등록한다. 

    3) 필요에 따라 선택한 키의 리스닝 이벤트를 변경한다. 

    4) 선택한 키 컬렉션에서 처리 된 키를 삭제한다.


Netty는 SelectorProvider.openSocketChannel()을 호출하여 새로운 Java NIO SocketChannel을 연다


private static SocketChannel newSocket(SelectorProvider provider) {     

    ...     

    return provider.openSocketChannel(); 

}


두 번째 단계 는 채널 선택기에 등록되며, 우리의 등록 프로세스 채널 의 첫 번째 장에서 작동하는 이벤트(관심 세트) 모니터링 필요성을 설정한다. 또한 중간에 분석, 우리는 되돌아보고, 채널 등록 프로세스에서 다음과 같은 콜체인이 있다.

Bootstrap.initAndRegister ->      

    AbstractBootstrap.initAndRegister ->          

        MultithreadEventLoopGroup.register ->              

            SingleThreadEventLoop.register ->                  

                AbstractUnsafe.register ->                     

                    AbstractUnsafe.register0 ->                         

                        AbstractNioChannel.doRegister


register0 메서드는 AbstractUnsafe.register 메서드에서 호출된다.


@Override 

public final void register(EventLoop eventLoop, final ChannelPromise promise) {     

    AbstractChannel.this.eventLoop = eventLoop;     

    register0(promise); 

}


private void register0(ChannelPromise promise) {     

    boolean firstRegistration = neverRegistered;     

    doRegister();     

    neverRegistered = false;     

    registered = true;     

    safeSetSuccess(promise);     

    pipeline.fireChannelRegistered();     

    // Only fire a channelActive if the channel has never been registered. This prevents firing     

    // multiple channel actives if the channel is deregistered and re-registered.     

    if (firstRegistration && isActive()) {         

        pipeline.fireChannelActive();     

    } 

}


register0은 AbstractNioChannel.doRegister를 다시 호출한다.


@Override protected void doRegister() throws Exception {         

    selectionKey = javaChannel().register(eventLoop().selector, 0, this); 

}


여기서 javaChannel()은 Java NIO SocketChannel 객체를 반환하고이 SocketChannel을 이전 단계에서 얻은 Selector에 등록한다.


루프의 다음 세 번째 단계는 어디에서 실행될까? 세 번째 단계는 이번 분석의 핵심이다.


스레드 실행 루프

SingleThreadEventExecutor.this.run() 메소드 를 호출하는 것이고 SingleThreadEventExecutor.run() 은 추상 메소드이며 그 구현은 NioEventLoop에 있다.


thread = threadFactory.newThread(new Runnable() {         

    @Override         

    public void run() {             

        boolean success = false;             

        updateLastExecutionTime();             

        try {                 

            SingleThreadEventExecutor.this.run();                 

            success = true;             

        } catch (Throwable t) {                 

            logger.warn("Unexpected exception from an event executor: ", t);             

        } finally {                 

            ...             

        }         

    }     

});


NioEventLoop.run() 메서드를 계속 추적


@Override 

protected void run() {     

    for (;;) {         

        boolean oldWakenUp = wakenUp.getAndSet(false);         

        try {             

            if (hasTasks()) {                 

                selectNow();             

            } else {                 

                select(oldWakenUp);                 

                if (wakenUp.get()) {                     

                    selector.wakeup();                 

                }             

            }              


            cancelledKeys = 0;             

            needsToSelectAgain = false;             

            final int ioRatio = this.ioRatio;             


            if (ioRatio == 100) {                 

                processSelectedKeys();                 

                runAllTasks();             

            } else {                 

                final long ioStartTime = System.nanoTime();                  

                processSelectedKeys();                  

                final long ioTime = System.nanoTime() - ioStartTime;                 

                runAllTasks(ioTime * (100 - ioRatio) / ioRatio);             

            }              


            if (isShuttingDown()) {                 

                closeAll();                 

                if (confirmShutdown()) {                     

                    break;                 

                }             

            }         

        } catch (Throwable t) {           

              ...         

        }     

    } 

}


위 코드 에서 for(;;) 에 의해 형성된 무한 루프가 바로  NioEventLoop 이벤트 루프의 핵심이다.


IO 이벤트 폴링

우선, run 메서드에서 첫 번째 단계는 hasTasks() 메서드를 호출 하여 현재 작업 대기열에 작업이 있는지 확인하는 것이다.


protected boolean hasTasks() {     

    assert inEventLoop();     

    return !taskQueue.isEmpty(); 

}


이 메서드는 매우 간단하다. taskQueue 가 비어 있는지 확인하기 만하면 된다. taskQueue가 무엇인지는 실제로이 EventLoop에서 실행해야하는 작업 목록을 저장한다. taskQueue에 대해서는 당분간 여기에 나열하지 않고 나중에 기다려야 한다. 


작업 대기열이 비어 있지 않은 경우  selectNow() 메소드를 실행하며 작업 대기열이 비어있을 때다르게 처리한다. 


void selectNow() throws IOException {     

    try {         

        selector.selectNow();     

    } finally {         

        // restore wakup state if needed         

        if (wakenUp.get()) {             

            selector.wakeup();         

        }     

    } 

}


먼저 selector.selectNow() 메서드를 호출하게되는데 여기서 Selector는 무엇인가? 이 필드는 Java NIO의 멀티플렉서 Selector 이다. 여기서 selector.selectNow() 는 현재 준비된 IO 이벤트가 있는지 확인하고, 그렇다면 준비된 IO 이벤트를 반환한다. 그렇지 않으면 0을 반환한다. 


selectNow()가 호출되면 finally 블록은 wakeUp 변수가 true인지 확인하고 true이면 selector.wakeup()를 호출하여 select()의 블로킹 호출을 깨우게된다.


if 브랜치의 selectNow 메소드를 살펴본 후 else 브랜치 의 select(oldWakenUp) 메소드를 살펴 보겠다. 


private void select(boolean oldWakenUp) throws IOException {     

    Selector selector = this.selector;     

    try {         

        ...         

        int selectedKeys = selector.select(timeoutMillis);        

         ...     

    } catch (CancelledKeyException e) {         

        ...     

    } 

}


이 select 메서드에서 selector.select (timeoutMillis)가 호출 되고이 호출은 현재 스레드를 차단하고 timeoutMillis는 차단 타임 아웃이 된다. hasTasks() 가 true 일 때 호출이 selectNow() 메소드는 현재 스레드를 블로킹 않는다. hasTasks()가 false이면 oldWakenUp 호출시에 현재 스레드를 블로킹한다.


작업 대기열에는 작업이없는 경우, Netty는 IO 준비 이벤트를 블로킹하고 기다릴 수 있다. taskQueue에 작업이있을 때 당연히 제출 된 작업이 가능한 한 빨리 실행되기를 바라므로 Netty는 non-blocking selectNow() 메서드를 호출하여 taskQueue의 작업을 처리할 수 있다.


IO 이벤트 처리

NioEventLoop.run() 메소드에서 첫 번째 단계는 Seleoctr의해 selectNow 호출을 통해 현재 준비 IO 이벤트가 있는지 여부를 묻는 것이다. IO 이벤트가 준비되었을때 두 번째 단계는이 IO 이벤트를 처리하기 이다.


final int ioRatio = this.ioRatio; 

if (ioRatio == 100) {     

    processSelectedKeys();     

    runAllTasks(); 

} else {     

    final long ioStartTime = System.nanoTime();      

    processSelectedKeys();      

    final long ioTime = System.nanoTime() - ioStartTime;     runAllTasks(ioTime * (100 - ioRatio) / ioRatio); 

}


여기 즉 코드의 또 다른 매우 흥미로운 것은 ioRatio 이다. ioRatio ? 이것은 스레드 할당을 나타낸다. IO 작업의 시간 비율(즉, 전체 루프에서 processSelectedKeys를 실행하는 데 걸린 시간)이다. 예를 들어, 기본 ioRatio는 50이다. 즉, IO 작업과 작업 실행이 차지하는 스레드 실행 시간의 비율이 1 : 1이다. IO 작업에 소요되는 시간과 소요 시간의 비율을 알면 작업 실행 시간을 쉽게 계산할 수 있다.


ioTime / ioRatio = taskTime / taskRatio     

taskRatio = 100 - ioRatio    => taskTime = ioTime * (100 - ioRatio) / ioRatio


위의 공식에 따르면 ioRate = 70으로 설정하면 IO 실행 시간이 70 %임을 의미한다. 즉, 사이클이 총 100ms가 걸린다고 가정하면 다음 공식에 따라 processSelectedKeys() 메서드를 호출하는 데 약 70ms(즉, IO 시간 소모) 시간이 걸린다는 것을 알 수 있다.  runAllTasks() 는 약 30ms(즉, 작업 실행 시간 소모)가 걸린다.


ioRatio가 100이면 Netty는 IO 시간 소모 비율을 고려하지 않고 processSelectedKeys 를 호출출한다.  RunAllTasks()  ioRatio가 100이 아닌 경우 else 분기로 실행하여 분기에서 먼저 processSelectedKeys() 의 실행 시간(즉, IO 작업 시간)을 기록한 다음 공식에 따라 계산한다. 태스크를 실행하고이를 매개 변수로 사용하여 runAllTasks() 를 호출하는 데 걸린 시간 이다.


private void processSelectedKeys() {     

    if (selectedKeys != null) {         

        processSelectedKeysOptimized(selectedKeys.flip());     

    } else {         

        processSelectedKeysPlain(selector.selectedKeys());     

    } 

}


selectedKeys의 유무에 따라  processSelectedKeysOptimized 또는 processSelectedKeysPlain가 될 각각 호출 된다. selectedKeys의 JVM에서 플랫폼에 따라, 필드는 openSelector() 메소드가 호출 될 때 설정되고, 서로 다른 값이 설정된다. 실제로 processSelectedKeysOptimized 메서드나 processSelectedKeysPlain 은 큰 차이가 없다. 단순화를 위해 processSelectedKeysOptimized 를 예로 들어 소스 코드의 워크 플로를 분석해 보겠다.


private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {     

    for (int i = 0;; i ++) {         

        final SelectionKey k = selectedKeys[i];         

        if (k == null) {             

            break;         

        }         


        selectedKeys[i] = null;          

        final Object a = k.attachment();          


        if (a instanceof AbstractNioChannel) {             

            processSelectedKey(k, (AbstractNioChannel) a);         

        } else {             

            @SuppressWarnings("unchecked")             

            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;             

            processSelectedKey(k, task);         

        }         

        ...     

    } 

}


두 가지 핵심 사항이 있다. 첫번째는 selectedKeys를 반복하여 준비된 IO 이벤트를 가져온 다음 각 이벤트에 대해 processSelectedKey를 호출 하여 처리한다. 두번째는 selectionKey.attach(object) 를 호출 하여 selectionKey에 추가 필드를 설정 한 다음 Object attachedObj = selectionKey.attachment()를 통해 가져올 수 있다는 것 이다. 위의 코드는 k.attachment() 를 사용 하여 selectionKey에 연결된 객체를 가져온다. 그러면이 객체는 무엇인가? 어디에 설정되어 있는가? SocketChannel이 Selector에 어떻게 등록되어 있는지 생각해 보자. 


채널 등록 과정에서 다음과 같은 콜 체인이 있는것 기억하는가?


Bootstrap.initAndRegister ->      

    AbstractBootstrap.initAndRegister ->          

        MultithreadEventLoopGroup.register ->              

            SingleThreadEventLoop.register ->                  

                AbstractUnsafe.register ->                     

                    AbstractUnsafe.register0 ->                         

                        AbstractNioChannel.doRegister


마지막 AbstractNioChannel.doRegister 메소드는 SocketChannel.register 메소드를 호출 하여 지정된 Selector에 SocketChannel을 등록한다.


@Override protected void doRegister() throws Exception {      

     selectionKey = javaChannel().register(eventLoop().selector, 0, this); 

}


세번째 파라미터가 바로 selectionKey.attach(객체) 이다.


예를들어 NioSocketChannel의 Selector에 SocketChannel을 등록 할 때 SocketChannel에 해당하는 NioSocketChannel을 추가 필드의 형태로 selectionKey에 추가한다. 추가 객체를 획득하고processSelectedKeysOptimized 메서드를 보면 IO 이벤트를 처리하기 위해 processSelectedKey 를 호출한다.


final Object a = k.attachment();  if (a instanceof AbstractNioChannel) {     

    processSelectedKey(k, (AbstractNioChannel) a); 

} else {     

    @SuppressWarnings("unchecked")     

    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;  

    processSelectedKey(k, task); 

}


processSelectedKey 메서드 의 소스 코드는 다음과 같다.


private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {     

    final NioUnsafe unsafe = ch.unsafe();     

    ...     

    try {         

        int readyOps = k.readyOps();                 


        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {             

            unsafe.read();             

            if (!ch.isOpen()) {                 

                // Connection already closed - no need to handle write.                 

                return;             

            }         

        }                         


        if ((readyOps & SelectionKey.OP_WRITE) != 0) {             

            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write             

            ch.unsafe().forceFlush();         

        }                       

    

         if ((readyOps & SelectionKey.OP_CONNECT) != 0) {            

             // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking           

             // See https://github.com/netty/netty/issues/924             

            int ops = k.interestOps();             

            ops &= ~SelectionKey.OP_CONNECT;             

            k.interestOps(ops);              

            unsafe.finishConnect();         

        }     

    } catch (CancelledKeyException ignored) {         

        unsafe.close(unsafe.voidPromise());     

    }

}


이 코드는 낯이 익은데 정확히 Java NIO의 Selector 프로세스이다.
processSelectedKey 에서는 다음과 같은 세 가지 이벤트가 처리된다.  


- OP_READ는 읽을 수있는 이벤트, 즉 상위 계층이 읽을 수 있도록 새 데이터를 채널에 수신

- OP_WRITE, 쓰기 가능한 이벤트, 즉 상위 계층이 채널에 데이터를 쓸 수 있음

- OP_CONNECT, 연결 설정 이벤트, 즉 TCP 연결이 설정되었으며 채널이 활성 상태

Netty가이 세 가지 이벤트를 처리하는 방법을 살펴 보자


OP_READ 처리

ready IO 이벤트가 OP_READ 이면 코드는 unsafe.read() 메서드를 호출한다 .


if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {     

    unsafe.read();     

    if (!ch.isOpen()) {         

        // Connection already closed - no need to handle write.        

         return;     

    } 

}


unsafe 클래스, 이제는 익숙한 클래스이다. 우리는 채널의 기본 IO 작업을 담당하는 NioSocketChannelUnsafe 의 인스턴스인 것으로 확인하였다. 이 메서드는 NioSocketChannelUnsafe 에서 구현 되지 않았지만 부모 클래스 AbstractNioByteChannel에 의해 구현되며 구현 소스 코드는 다음과 같다.


@Override public final void read() {     

    ...     

    ByteBuf byteBuf = null;     

    int messages = 0;     

    boolean close = false;     

    try {         

        int totalReadAmount = 0;         

        boolean readPendingReset = false;         

        do {             

            byteBuf = allocHandle.allocate(allocator);             

            int writable = byteBuf.writableBytes();             

            int localReadAmount = doReadBytes(byteBuf);              

            ...              

            pipeline.fireChannelRead(byteBuf);             

            byteBuf = null;             

             ...              

            totalReadAmount += localReadAmount;                      

             ...         

        } while (++ messages < maxMessagesPerRead);          

        

        pipeline.fireChannelReadComplete();         

        allocHandle.record(totalReadAmount);          


        if (close) {             

            closeOnRead(pipeline);             

            close = false;         

        }     

    } catch (Throwable t) {        

        handleReadException(pipeline, byteBuf, t, close);     

    } finally {     

    } 

}


위 실제로 read 메서드는 다음과 같이 요약 할 수 있다.  

- ByteBuf 할당  

- SocketChannel에서 데이터 읽기  

- pipeline.fireChannelRead를 호출 하여 인바운드 이벤트 보냄


Pipeline.fireChannelRead 는 인바운드 이벤트의 시작점 인 Netty의 주요 파이프라인을 통과한다. pipeline.fireIN_EVT()가 호출 되면 인바운드 이벤트 가 생성 되며 이는 head-> customContext-> tail 방향이다. ChannelPipeline의 다양한 핸들러를 차례로 통과한다. pipeline.fireChannelRead를 호출 한 후 ChannelPipeline에서 수행해야하는 작업이다. 


OP_WRITE 처리

OP_WRITE 쓰기 가능 이벤트 코드는 다음과 같으며 여기에있는 코드는 비교적 간단하고 자세한 분석이 필요하지 않다.


if ((readyOps & SelectionKey.OP_WRITE) != 0) {     

    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write     

    ch.unsafe().forceFlush(); 

}


OP_CONNECT 처리

마지막 이벤트는 OP_CONNECT 이며 TCP 연결이 설정되었음을 의미한다.


if ((readyOps & SelectionKey.OP_CONNECT) != 0) {     

    // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking     

    // See https://github.com/netty/netty/issues/924     

    int ops = k.interestOps();     

    ops &= ~SelectionKey.OP_CONNECT;     

    k.interestOps(ops);      

    unsafe.finishConnect(); 

}


OP_CONNECT 이벤트를 처리 할 때 두 가지 작업 만 수행되었다.  


1.  OP_CONNECT의 이벤트.

2. unsafe.finishConnect()를 호출하여 연결이 설정되었음을 상위 계층에 알림


unsafe.finishConnect() 호출은 마지막으로 pipeline(). fireChannelActive()를 호출하고 파이프 라인의 각 핸들러에 TCP 채널이 설정되었음을 알리는 인바운드 이벤트가 생성된다(즉, ChannelInboundHandler.channelActive 메소드가 호출 됨).


Netty의 작업 대기열 메커니즘

Netty에서 NioEventLoop은 일반적으로 두 가지 작업을 수행해야한다고 이미 언급했다. 첫 번째는 IO 작업을 처리하는 IO 스레드로 작동하고 두 번째는 taskQueue에서 작업을 처리하는 작업 스레드로 작동하는 것이다. 이번에는 NioEventLoop의 작업 대기열 메커니즘을 분석해보자


작업 추가


일반 실행 가능 작업

NioEventLoop가 SingleThreadEventExecutor 상속하지만 SingleThreadEventExecutor

가 큐 <Runnable를> 작업 대기열 태스크를 저장하기위한 필드가 있다. 


예를 들어, 우리의 Runnable를 추가 할 필요가있을 때 taskQueue에서 다음을 수행 할 수 있다.


EventLoop eventLoop = channel.eventLoop(); 

eventLoop.execute(new Runnable() {     

    @Override     

    public void run() {         

        System.out.println("Hello, Netty!");     

    } 

});


execute가 호출되면 실제로 SingleThreadEventExecutor.execute() 메서드가 호출되며 다음과 같이 구현된다.


@Override public void execute(Runnable task) {     

    if (task == null) {         

        throw new NullPointerException("task");     

    }      


    boolean inEventLoop = inEventLoop();     

    if (inEventLoop) {         

        addTask(task);     

    } else {         

        startThread();         

        addTask(task);         

        if (isShutdown() && removeTask(task)) {             

            reject();         

        }     

    }      


    if (!addTaskWakesUp && wakesUpForTask(task)) {         

        wakeup(inEventLoop);     

    } 

}


태스크 추가를 위한 addTask 메소드의 소스 코드 는 다음과 같다.


protected void addTask(Runnable task) {     

    if (task == null) {        

        throw new NullPointerException("task");     

    }    


    if (isShutdown()) {         

        reject();    

     }     


    taskQueue.add(task); 

}


결국 taskQueue 는 실행할 작업을 저장하는 큐이다.


작업 예약

일반적인 Runnable 작업 실행을 추가하는 것 외에도 호출과 같은 메소드로 시간 제한 작업 eventLoop.scheduleXXX를 추가 할 수 있다. 


EventLoop 작업 대기열 기능은 수퍼 클래스 SingleThreadEventExecutor 구현에서 구현되고 스케줄 기능은 SingleThreadEventExecutor 부모에서 구현된다. 즉, AbstractScheduledEventExecutor에서 구현 된 AbstractScheduledEventExecutor 경우에는 scheduleTaskQueue에 대한 필드가 있다.


Queue<ScheduledFutureTask<?>> scheduledTaskQueue;


AbstractScheduledEventExecutor 구현 스케줄 방법은 :

@Override 

public  ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {     

    ObjectUtil.checkNotNull(command, "command");     

    ObjectUtil.checkNotNull(unit, "unit");     


    if (delay < 0) {        

         throw new IllegalArgumentException(                 

            String.format("delay: %d (expected: >= 0)", delay));    

     }     


    return schedule(new ScheduledFutureTask<Void>(             

        this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay)))); 

}


Runnable이 전달되면 ScheduledFutureTask 객체로 캡슐화된다.

이 객체는 Runnable이 실행되는시기와 빈도를 기록한다. ScheduledFutureTask가 생성 되면, 다른 오버로드 된 일정 메서드를 계속 호출한다.


<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {     

    if (inEventLoop()) {         

        scheduledTaskQueue().add(task);    

    } else {         

        execute(new OneTimeTask() {             

            @Override             

            public void run() {                

                 scheduledTaskQueue().add(task);             

            }         

        });     

    }      

    

    return task; 

}


이 메서드에서 ScheduledFutureTask 객체는 scheduleTaskQueue에 추가된다.


작업 실행

taskQueue에 task가 추가되면 EventLoop에 의해 어떻게 실행되는가?
NioEventLoop.run() 메서드로 돌아가서이 메서드에서는 processSelectedKeys() 및 runAllTasks() 메서드 를 각각 호출 하여 IO를 수행한다. 

runAllTasks 메서드에는 두 개의 오버로드 된 메서드가 있다. 하나는 매개 변수가없고 다른 하나는 하나의 매개 변수가있다. 


먼저 매개 변수가없는 runAllTasks를 살펴보자


protected boolean runAllTasks() {     

    fetchFromScheduledTaskQueue();     

    Runnable task = pollTask();     


    if (task == null) {         

        return false;     

    }      


    for (;;) {         

        try {             

            task.run();         

        } catch (Throwable t) {             

            logger.warn("A task raised an exception.", t);         

        }          


        task = pollTask();         

        if (task == null) {             

            lastExecutionTime = ScheduledFutureTask.nanoTime();             

            return true;         

        }     

    } 

}


private void fetchFromScheduledTaskQueue() {     

    if (hasScheduledTasks()) {         

        long nanoTime = AbstractScheduledEventExecutor.nanoTime();         

        for (;;) {             

            Runnable scheduledTask = pollScheduledTask(nanoTime);             

            if (scheduledTask == null) {                 

                break;             

            }             

        taskQueue.add(scheduledTask);       

      }     

    } 

}


runAllTasks() 메서드는 task = pollTask() 를 계속 호출 하여 taskQueue 에서 실행 가능한 작업 을 가져온 다음 해당 run() 메서드를 호출 하여 작업을 실행한다.


EventLoop은 IO 작업과 일반 프로세싱 작업을 모두 수행해야하므로 EventLoop.execute 메서드를 호출하여 Task를 제출할 때 시간이 많이 걸리는 Task를 퍼블리싱하지 말고 블로킹을 유발하는 일부 작업을 퍼블리싱하지 말아야한다. 그렇지 않으면 IO 스레드가 전체 프로그램의 동시성에 영향을 미친다.


결론

전반적으로 NioEventLoop의 구현은 복잡하지 않으며 주로 IO를 선택하고 작업을 소비하는 두 가지 작업을 수행한다. Select 작업이 블로킹되기 때문에(제한 시간이 설정되어 있지만) Select가 실행될 때마다 새 작업이 있는지 확인하고 있는 경우 작업이 먼저 실행된다. 이렇게하면 EventLoop의 처리량을 최대화하고 차단 시간을 줄일 수 있다. 이 두 가지 외에도 NioEventLoop는 JDK에 언급 된 EPoll 버그를 해결하였다.



https://segmentfault.com/a/1190000007282789 

https://www.slideshare.net/kslisenko/networking-in-java-with-nio-and-netty-76583794 

https://www.slideshare.net/JangHoon1/netty-92835335?from_action=save

https://blog.csdn.net/zxhoo/article/details/17419229 

https://slowdev.tistory.com/16 

https://sina-bro.tistory.com/15 

https://github.com/YonghoChoi/develop-note/blob/master/md/Netty/3장_부트스트랩.md 

https://blog.csdn.net/zxhoo/article/details/17532857 

https://clairdelunes.tistory.com/26 

https://runningup.tistory.com/entry/부트스트랩-1 

https://juyoung-1008.tistory.com/23

https://toutiao.io/posts/awvc9w/preview 

https://blog.csdn.net/zxhoo/article/details/17709765 

https://segmentfault.com/a/1190000007403873 

https://segmentfault.com/a/1190000007403937 




작가의 이전글 네티 NioEventLoopGroup

작품 선택

키워드 선택 0 / 3 0

댓글여부

afliean
브런치는 최신 브라우저에 최적화 되어있습니다. IE chrome safari