2016년 3월 18일 금요일

Elixir 에서 PG2 사용하기

Elixir에서 PG2


PG2?


pg2는 Erlang의 stdlib로 분산되어있는 노드들에 특정 프로세스들를 그룹핑 할 수 있는 라이브러리 입니다. 이 라이브러리를 사용하면 그룹명을 가지고 Cluster로 연결된 노드들의 프로세스를 직접 호출하여 사용할 수 있습니다.


This module implements process groups. Each message may be sent to one, some, or all members of the group.

A group of processes can be accessed by a common name. For example, if there is a group named foobar, there can be a set of processes (which can be located on different nodes) which are all members of the group foobar. There are no special functions for sending a message to the group. Instead, client functions should be written with the functions get_members/1 and get_local_members/1 to find out which processes are members of the group. Then the message can be sent to one or more members of the group.

If a member terminates, it is automatically removed from the group. (erlang documents)


PG2 주요 API



create(Name :: name()) -> ok


빈 그룹 하나를 생성합니다. 이 그룹은 Cluster로 연결된 모든 Node에서 보여집니다.



delete(Name :: name()) -> ok


그룹을 삭제합니다.



join(Name :: name(), Pid :: pid()) -> ok


Process pid를 그룹에 추가합니다. 중복 Join이 가능하여 주의하여야 합니다.



leave(Name, Pid :: pid()) -> ok | {error, {no_such_group, Name}}


Process pid를 그룹에서 삭제합니다. 그룹에 Pid가 없을시에도 ok를 리턴합니다. 그룹명이 없을때만 error를 리턴합니다.



get_members(Name) -> [pid()] | {error, {no_such_group, Name}}


그룹내에 존재하는 모든 Pid들을 리턴합니다.



get_closest_pid(Name) -> pid() | {error, Reason}


local 에 있는 Process pid를 리턴합니다. 만약 로컬 Process가 죽었거나 호출 불가능 하다면 랜덤하게 하나의 Process pid를 리턴합니다.



PG2 간단 사용


pg2는 Custer로 연결된 노드들의 프로세스를 그룹핑 하기 위한 라이브러리 이므로 iex를 이용해서 두개의 노드를 실행하고 연결시키겠습니다.


첫번째 node1 을 실행시키고


$ iex --sname node1

iex(node1@syntaxfish) >

두번쨰로 node2 를 실행시키고 Node.connect() 함수를 이용하여 node1번과 연결 시킨후 Node.list() 함수를 이용하여 연결된 노드 리스트를 확인합니다.



$ iex --sname node2

iex(node2@syntaxfish) > Node.connect(:node1@syntaxfish)
:ok
iex(node2@syntaxfish) > Node.list
[:node1@syntaxfish]

다음으로 node1번에서 :test라는 그룹을 만들고 현재 shell을 그룹에 :pg2.join()함수로 추가하겠습니다. 그리고 :pg2.get_memebers() 함수로 :test 그룹을 출력해보면 shell pid가 그룹에 추가된걸 확인할 수 있습니다.



iex(node1@syntaxfish) > :pg2.create(:test)
:ok
iex(node1@syntaxfish) > :pg2.get_members(:test)
[]
iex(node1@syntaxfish) > :pg2.join(:test, self)
:ok
iex(node1@syntaxfish) > :pg2.get_members(:test)
[#PID<0.63.0>]

이렇게 PG2를 이용하여 node1의 shell을 :test그룹에 추가하는것에 성공하였습니다. 그럼 정말 Cluster로 연결된 다른 노드에서 호출이 가능한지 테스트 해보겠습니다.


테스트 방법은 node2 에서 node1의 shell_pid로 “hello!!” 라는 메시지를 보내고, 다시 node1로 돌아서와서 메시지를 확인하는것으로 해보겠습니다.


아래처럼 node1에서 메시지를 전송하고



iex(node2@syntaxfish) > :pg2.get_members(:test)
[#PID<8387.63.0>]
iex(node2@syntaxfish) > [node1_shell] = :pg2.get_members(:test)
[#PID<8387.63.0>]
iex(node2@syntaxfish) > send(node1_shell, "hello!!")
"hello!!"

node1 에서 메시지를 비우면 node2에서 보낸 메시지가 표시되는걸 확인하실 수 있습니다.



iex(node1@syntaxfish) > flush()
"hello!!"


PG2 적용 예제


PG2를 사용하여서 간단한 Echo 예제를 만들어보겠습니다.

구조는 node1 번에 EchoServer 와 통신할 EchoClinet 프로세스를 만들고, node2와 node3에는 EchoServer 프로세스를 만든후 PG2를 이용해서 :echo_server 그룹에 추가합니다. 완성된 구조는 아래 그림과 같습니다.




EchoServer


mix 를 이용해서 echo_server를 생성합니다.



$ mix new echo_server --sup

그리고 GenServer 를 이용해서 요청을 받을 EchoServer.Server 를 작성합니다.



defmodule EchoServer.Server do
use GenServer

def start_link do
GenServer.start_link(__MODULE__, :ok, name: EchoServer.Server)
end

def init(:ok) do
:pg2.create(:echo_server) # 그룹 생성
:pg2.join(:echo_server, self) # 그룹 추가

{:ok, %{}}
end

## Public API
def echo(pid, message) do
GenServer.call(pid, {:echo, message})
end

## Callback API
def handle_call({:echo, message}, _from, state) do
{:reply, build_echo_message(message), state}
end

def build_echo_message(message) do
"[#{Node.self |> to_string}] #{message}"
end
end

handle_call 로 요청을 받으면 응답을 주는 로직은 기본 GenServer와 동일하지만, 다른점은 init() 함수 내에서 PG2를 이용해서 그룹을 만들고 그룹에 추가하는 코드 입니다. 이렇게 두줄만 추가하면 Cluster로 연결된 모든 노드에서 :echo_server 라는 그룹명으로 프로세스에 메시지를 보낼 수 있습니다.



    ...
:pg2.create(:echo_server) # 그룹 생성
:pg2.join(:echo_server, self) # 그룹에 추가
...

완성된 EchoServer.Server 모듈을 Application에 추가 시킵니다.



defmodule EchoServer do
use Application

def start(_type, _args) do
import Supervisor.Spec, warn: false

children = [
worker(EchoServer.Server, [])
]

opts = [strategy: :one_for_one, name: EchoServer.Supervisor]
Supervisor.start_link(children, opts)
end
end

그리고 작동하는지 확인을 위해 테스트코드를 간단하게 작성했습니다. 첫번째 테스트는 일반적인 GenServer의 호출방식으로 호출했고, 두번째 테스트는 PG2를 이용하여 호출한것입니다.



defmodule EchoServerTest do
use ExUnit.Case

alias EchoServer.Server

@message "hi!!"

test "process call by local name" do
echo = Server.echo(Server, @message)

assert echo == Server.build_echo_message(@message)
end


test "proces call by pg2" do
echo = :pg2.get_closest_pid(:echo_server)
|> GenServer.call({:echo, @message})

assert echo == Server.build_echo_message(@message)
end
end

테스트를 진행하면 직접 호출, pg2를 이용하여 호출 둘다 정상작동하는걸 확인하실 수 있습니다.



Finished in 0.04 seconds (0.04s on load, 0.00s on tests)
2 tests, 0 failures

Randomized with seed 285286


EchoClient


이제 EchoClient를 만들어보겠습니다. EchoServer와 동일하게 mix 를 이용하여 새로운 프로젝트를 만들고.



$ mix new echo_client --sup

GenServer를 이용하여 EchoClient를 작성합니다.



defmodule EchoClient.Client do
use GenServer

@init_state %{}

def start_link do
GenServer.start_link(__MODULE__, :ok, name: EchoClient.Client)
end

def init(:ok) do
{:ok, %{}}
end

## Public API
def send_echo(pid, message) do
GenServer.call(pid, {:send_echo, message})
end

## Callback API
def handle_call({:send_echo, message}, _from, state) do
case :pg2.get_closest_pid(:echo_server) do
pid when is_pid(pid) ->
echo = GenServer.call(pid, {:echo, message})
{:reply, echo, state}
{:error, _r} = error ->
{:reply, error, state}
end
end
end

이 코드도 기본적인 GenServer의 모습과 동일합니다. 다른점은 handle_call 함수안에서 PG2를 이용하여 서버 PID를 구해오는 부분입니다.



...
case :pg2.get_closest_pid(:echo_server) do
pid when is_pid(pid) ->
echo = GenServer.call(pid, {:echo, message})
{:reply, echo, state}
{:error, _r} = error ->
{:reply, error, state}
end
...

그리고 실행을 위해 EchoClient를 Application에 추가합니다.



defmodule EchoClient do
use Application

def start(_type, _args) do
import Supervisor.Spec, warn: false

children = [
worker(EchoClient.Client, []),
]

opts = [strategy: :one_for_one, name: EchoClient.Supervisor]
Supervisor.start_link(children, opts)
end
end


실행


이제 코드는 다 완성했으니 작동하는지 직접 요청을 보내서 테스트 해보겠습니다.

일단 node1 에 EchoClient를 실행합니다.



$ iex --sname node1 -S mix

그리고 node2, node3으로 EchoServer를 실행시킵니다.



$ iex --sname node2 -S mix


$ iex --sname node3 -S mix

실행 시킨 node들을 Cluster로 연결해줍니다.



iex(node1@syntaxfish) > Node.connect :node2@syntaxfish
true
iex(node1@syntaxfish) > Node.connect :node3@syntaxfish
true
iex(node1@syntaxfish) > Node.list
[:node2@syntaxfish, :node3@syntaxfish]

모두 연결이 되었으니 node1(EchoClient) 에서 EchoServer로 메시지를 보내보겠습니다.



iex(node1@syntaxfish) > EchoClient.Client.send_echo(EchoClient.Client, "hello world!")
"[node3@syntaxfish] hello world!"
iex(node1@syntaxfish) > EchoClient.Client.send_echo(EchoClient.Client, "hello world!")
"[node3@syntaxfish] hello world!"
iex(node1@syntaxfish) > EchoClient.Client.send_echo(EchoClient.Client, "hello world!")
"[node2@syntaxfish] hello world!"

메시지가 정상적으로 node2와 node3에 요청되는걸 확인하실 수 있습니다.


전체 프로젝트는 Github에 올려놓았습니다. pg2_example


마치며…


첫 블로그 포스팅이라 코드를 치는것보다 한국말을 쓰는게 어렵네요.. 무튼 PG2는 Erlang와 Elixir에서 분산 시스템을 구축할때 자주 사용되는 모듈입니다. elixir의 대표적인 web framework인 phoenix에서도 PG2를 사용하고 있습니다.


그런데 조금 마음에 안드는 부분이 있습니다. PG2에서 원하는 그룹의 pid를 받아올때 get_closest_pid() 라는 함수를 사용하게 되는데 이 함수가 넘겨주는 방식이 그닥 마음에 들지 않습니다. 함수명은 가장 가까운 pid를 가져온다고 되어있지만 도큐먼트에서 내용은 조금 다릅니다.


This is a useful dispatch function which can be used from client functions. It returns a process on the local node, if such a process exist. Otherwise, it chooses one randomly.


로컬에 존재하지 않으면 랜덤으로 pid를 가져오는 방식… 설마 정말 랜덤으로 가져올까 싶어 otp 소스를 확인해 보았지만.. 정말 랜덤이였습니다.



%% otp/lib/kernel/src/pg2.erl
get_closest_pid(Name) ->
case get_local_members(Name) of
[Pid] ->
Pid;
[] ->
case get_members(Name) of
[] -> {error, {no_process, Name}};
Members ->
random_element(Members)
end;
Members when is_list(Members) ->
random_element(Members);
Else ->
Else
end.

random_element(List) ->
X = abs(erlang:monotonic_time()
bxor erlang:unique_integer()),
lists:nth((X rem length(List)) + 1, List).

그래서인지 Will Larson라는 분은 블로그에 각 pid들 마다 큐사이즈를 체크해서 가장 큐에 쌓이 메시지가 적은 pid를 가져오는 함수를 올려주셨는데 모든 멤버에 프로세스에 erlang:process_info() 를 호출하는게 조금 걱정되긴 하네요. (lethain.com)



get_best_pid(Group) ->
Members = pg2:get_members(Group),
Members1 = lists:map(fun(Pid) ->
[{message_queue_len, Messages}] = erlang:process_info(Pid, [message_queue_len]),
{Pid, Messages}
end, Members),
case lists:keysort(2, Members1) of
[{Pid, _} | _] -> Pid;
[] -> {error, empty_process_group}
end.

Round Robin 방식만 넣어줬어도 좋았을텐데.. 그부분이 아쉽지만 PG2는 충분히 편하고 좋은 라이브러리인건 분명합니다.