brunch

You can make anything
by writing

C.S.Lewis

by myner Oct 11. 2020

리액터패턴 / 프로액터패턴

feat. Netty, java NIO

리액터 패턴


Reactor 패턴은 하나 이상의 클라이언트로부터의 요청을 동시 처리하기 위해서 사용하는 패턴이다. 이벤트 핸들 패턴의 전형적인 모습이다. 서버는 각 입력에 대해서 받을 이벤트를 동적으로 등록/해제하는 식으로 처리해야 할 입력과 이벤트를 관리할 수 있다.   


어플리케이션이 능동적으로 계속해서 처리하기위한 루프를 도는 것이 아니라, 이벤트에 반응하는 객체(reactor)를 만들고, 이벤트가 발생하면 어플리케이션 대신 reactor가 반응하여 처리하는 것이다. 


reactor는 이벤트가 발생하길 기다리고, 이벤트가 발생하면 event handler에게 이벤트를 보낸다. 


1. 이벤트에 반응하는 reactor를 만들고 reactor에 이벤트를 처리할 event handler들을 등록한다. 이벤트로는 입력, 출력, 종료 등이 있을수 있다. => initiate / multiplexing 과정(여러개의 입력을 처리할 수 있도록 다중화)

2. reactor는 이벤트가 발생하기를 기다린다. => receive

3. 이벤트가 발생하면 이벤트를 처리할 event handler단위로 분할한다. => demultiplexing 과정으로 이벤트가 발생한 "하나의" 입력을 찾아낸다

4. 분할된 이벤트를 해당 event handler에게 발송한다. => dispatch

5. event handler에 알맞은 method를 사용하여 이벤트를 처리한다. => process 


reactor 패턴의 I/O 통지모델은 직접 I/O 이벤트를 대기하는 동기형 multiplex 통지모델이다. select, poll, epoll 등이 이에 해당한다. 



Reactor 패턴의 아키텍처의 두 가지 중요한 참여자


Reactor

별도의 스레드에서 실행되며 발생한 I/O 이벤트는 dispatching되어 해당 이벤트 처리기로 보내 처리


Handlers

Handler는 Reactor로부터 I/O 이벤트를 받아 실제 작업을 수행


Reactor패턴


Selector는 계속해서 I/O 이벤트가 발생하기를 대기하며, Reactor가 Selector.select() 메소드를 호출하면 Selector는 등록된 채널에 대해서 발생한 이벤트 정보가 들어있는 SelectionKey Set을 반환한다.


리액터패턴 예제(자바로 작성됨)


Reactor는 Runnable을 구현하고 있으며, run() 메서드에서는 while 루프를 돌며 selector.select()를 호출하여 처리할 수 있는 이벤트 정보가 담긴 SelectionKey Set을 가져온다. SelectionKey에 바인드 되어있는 Handler를 가져와 dispatch한다.


public class Reactor implements Runnable {

    final Selector selector;
    final ServerSocketChannel serverSocketChannel;
    final boolean isWithThreadPool;

    Reactor(int port, boolean isWithThreadPool) throws IOException {
        this.isWithThreadPool = isWithThreadPool;
        selector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(port));
        serverSocketChannel.configureBlocking(false);
        SelectionKey selectionKey0 = 

                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        selectionKey0.attach(new Acceptor());
    }


    public void run() {
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                while (it.hasNext()) {
                    dispatch((SelectionKey) (it.next()));
                }
                selected.clear();
            }
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }

    void dispatch(SelectionKey k) {
        Runnable r = (Runnable) (k.attachment());
        if (r != null) {
            r.run();
        }
    }

    class Acceptor implements Runnable {
        public void run() {
            try {
                SocketChannel socketChannel = serverSocketChannel.accept();
                if (socketChannel != null) {
                    if (isWithThreadPool)
                        new HandlerWithThreadPool(selector, socketChannel);
                    else
                        new Handler(selector, socketChannel);
                }
                System.out.println("Connection Accepted by Reactor");
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }
}


Handler에는 READING, SENDING 2가지 상태가 있다. 채널은 한 번에 하나의 작업만 지원하기 때문에 동시에 처리할 수 없다. 

public class Handler implements Runnable {

    final SocketChannel socketChannel;
    final SelectionKey selectionKey;
    ByteBuffer input = ByteBuffer.allocate(1024);
    static final int READING = 0, SENDING = 1;
    int state = READING;
    String clientName = "";

    Handler(Selector selector, SocketChannel c) throws IOException {
        socketChannel = c;
        c.configureBlocking(false);
        selectionKey = socketChannel.register(selector, 0);
        selectionKey.attach(this);
        selectionKey.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }


    public void run() {
        try {
            if (state == READING) {
                read();
            } else if (state == SENDING) {
                send();
            }
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }

    void read() throws IOException {
        int readCount = socketChannel.read(input);
        if (readCount > 0) {
            readProcess(readCount);
        }
        state = SENDING;
        // Interested in writing
        selectionKey.interestOps(SelectionKey.OP_WRITE);
    }

    /**
     * Processing of the read message. This only prints the message to stdOut.
     *
     * @param readCount
     */
    synchronized void readProcess(int readCount) {
        StringBuilder sb = new StringBuilder();
        input.flip();
        byte[] subStringBytes = new byte[readCount];
        byte[] array = input.array();
        System.arraycopy(array, 0, subStringBytes, 0, readCount);
        // Assuming ASCII (bad assumption but simplifies the example)
        sb.append(new String(subStringBytes));
        input.clear();
        clientName = sb.toString().trim();
    }

    void send() throws IOException {
        System.out.println("Saying hello to " + clientName);
        ByteBuffer output = ByteBuffer.wrap(("Hello " + clientName + "\n").getBytes());
        socketChannel.write(output);
        selectionKey.interestOps(SelectionKey.OP_READ);
        state = READING;
    }
}


HandlerWithThreadPool은 Handler 클래스의 확장 버전

public class HandlerWithThreadPool extends Handler {

    static ExecutorService pool = Executors.newFixedThreadPool(2);
    static final int PROCESSING = 2;

    public HandlerWithThreadPool(Selector sel, SocketChannel c) throws IOException {
        super(sel, c);
    }

    void read() throws IOException {
        int readCount = socketChannel.read(input);
        if (readCount > 0) {
            state = PROCESSING;
            pool.execute(new Processer(readCount));
        }
        // We are interested in writing back to the client soon after read processing is done.
        selectionKey.interestOps(SelectionKey.OP_WRITE);
    }

    // Start processing in a new Processer Thread and Hand off to the reactor thread.
    synchronized void processAndHandOff(int readCount) {
        readProcess(readCount);
        // Read processing done. Now the server is ready to send a message to the client.
        state = SENDING;
    }

    class Processer implements Runnable {
        int readCount;
        Processer(int readCount) {
            this.readCount =  readCount;
        }
        public void run() {
            processAndHandOff(readCount);
        }
    }
}


프로액터 패턴


Proactor 패턴은 이벤트를 수동적으로 기다리지 않고, 능동적으로 비동기 작업을 수행한다. 일단 사고치고 본다는 것이다. 비동기 프로세스가 가능한 일거리들을 demultiplexing한 뒤, 작업까지 비동기로 처리한다. 그리고 작업이 완료되면 비동기 프로세스가 completion dispatch에게 이벤트를 넘기고 dispatcher는 적절한 completion handler(Queue)에 이벤트를 dispatch 한다.


proactor 패턴에서 필수적인 것은 비동기 명령을 처리하는 프로세스이다. 직접 구현하는 방법도 있겠지만, 운영체제의 최적화된 스케쥴링의 도움을 받는 것이 좋고 대표적인 것이 windows의 IOCP이다. 


Reactor에서 event가 작업이 가능함을 알리는 event였다면, Proactor에서 event는 작업의 완료를 알리는 event이다. 즉 일단 작업을 미리 처리해버리는것이다. completion handler에 event가 dispatch되면 completion handler는 미리 정해진 콜백을 호출하여 process event를 처리한다.


1. 완료 이벤트를 받을 completion handler를 등록 => initiate

2. 비동기 프로세스가 작업을 대기 혹은 작업이 발생하면 처리 => receive

3. 가능한 작업들이 생기면 비동기 프로세스가 작업을 분리하여 비동기적으로 처리 => demultiplex / work processing

4. 작업이 완료되면 비동기 프로세스는 completion dispatcher에게 완료 정보를 넘기고 dispatcher는 이벤트를 적절한 completion handler에게 발송 => dispatch

5. completion handler는 이벤트의 정보를 토대로 정해진 콜백을 호출하여 process 처리 => process





https://semtax.tistory.com/88

http://i5on9i.blogspot.com/2013/11/reactor-pattern.html 

https://elky.tistory.com/587 

https://rotiamo.tistory.com/entry/Proactor-pattern 

https://thebook.io/006884/ch03/07-09/

https://ozt88.tistory.com/25 

https://www.python2.net/questions-961691.htm

https://m.blog.naver.com/PostView.nhn?blogId=anbv3&logNo=130092591538&proxyReferer=https:%2F%2Fwww.google.com%2F 

http://jeewanthad.blogspot.com/2013/02/reactor-pattern-explained-part-1.html

http://jeewanthad.blogspot.com/2013/02/reactor-pattern-explained-part-2.html 

http://jeewanthad.blogspot.com/2013/03/reacter-pattern-explained-part-3.html 


작가의 이전글 멀티플렉싱
브런치는 최신 브라우저에 최적화 되어있습니다. IE chrome safari