为了熟悉Erlang的套接字编程开始编写一个“聊天室”程序。基本流程如下:
1.服务器启动监听指定端口
2.启动一个gen_server 作为聊天室进程,里面使用ets 保持当前所有客户端连接信息,并负责将某个客户端的消息广播到所有在线客户端
3.服务器接受客户端连接,并绑定到一个gen_server进程
4.客户端维护进程接受客户端发送的消息,调用聊天室进程函数进行广播
5.客户端维护进程接受聊天室发送的消息,转发给客户端
客户端信息包括id、pid、socket、nickname、sex、age、province 等。
目前包括以下几个模块:
echatServer.erl :服务器端程序启动模块。
chat_room.erl :聊天室模块,一个gen_server负责处理客户端请求,保存了所有客户端的连接信息。
id_generator.erl:负责为每一个连接的客户端生成唯一ID
client_session.erl:与客户端socket绑定的gen_server回调模块,接收和发送消息
chat_acceptor.erl:负责监听端口和处理连接的客户端socket
代码如下:
echatServer.erl:
-module(echatServer).
%%
%% Include files
%%
%%
%% Exported Functions
%%
-export([start/0]).
%%
%% API Functions
%%
start()->
chat_room:start_link(),
chat_acceptor:start(3377),
ok.
chat_room.erl:
%% Author: Administrator %% Created: 2012-2-18 %% Description: TODO: Add description to chat_room %% 1.genPid for every client connection %% 2.broadcast message to all clientSessions -module(chat_room). -behaviour(gen_server). %% %% Include files %% -include("clientinfo.hrl"). -include("message.hrl"). -record(state,{}). %% %% Exported Functions %% -export([start_link/0,init/1,getPid/0,bindPid/2,broadCastMsg/1,logout/1]). -export([handle_call/3,handle_info/2,handle_cast/2,code_change/3,terminate/2]). %% %% API Functions %% start_link()-> gen_server:start_link({local,?MODULE}, ?MODULE, [],[]). %%to init all %%1.start id_generator %%2.create session table to store clientinfo %% init([])-> id_generator:start_link(), ets:new(clientinfo,[public, ordered_set, named_table, {keypos,#clientinfo.id} ]), {ok,#state{}} . handle_call({getpid,Id},From,State)-> {ok,Pid}=client_session:start_link(Id), {reply,Pid,State}; handle_call({remove_clientinfo,Ref},From,State)-> Key=Ref#clientinfo.id, ets:delete(clientinfo, Key) ; handle_call({sendmsg,Msg},From,State)-> Key=ets:first(clientinfo), io:format("feching talbe key is ~p~n",[Key]), sendMsg(Key,Msg), {reply,ok,State} . %%process messages handle_info(Request,State)-> {noreply,State}. handle_cast(_From,State)-> {noreply,State}. terminate(_Reason,_State)-> ok. code_change(_OldVersion,State,Ext)-> {ok,State}. %% %% Local Functions %% %% generate new Pid for eache conecting client getPid()-> Id=id_generator:getnewid(client), Pid=gen_server:call(?MODULE,{getpid,Id}), io:format("id generated ~w~n",[Id]), #clientinfo{id=Id,pid=Pid} . %%bind Pid to Socket %%create new record and store into table bindPid(Record,Socket)-> io:format("binding socket...~n"), case gen_tcp:controlling_process(Socket, Record#clientinfo.pid) of {error,Reason}-> io:format("binding socket...error~n"); ok -> NewRec =#clientinfo{id=Record#clientinfo.id,socket=Socket,pid=Record#clientinfo.pid}, io:format("chat_room:insert record ~p~n",[NewRec]), %store clientinfo to ets ets:insert(clientinfo, NewRec), %then we send info to clientSession to update it's State (Socket info) Pid=Record#clientinfo.pid, Pid!{bind,Socket}, io:format("clientBinded~n") %start client reciving %Pid!{start,Pid} end . %%generate random name %%and call setInfo(name) generatename()-> ok. %%broad CastMsg to all connected clientSessions broadCastMsg(Msg)-> gen_server:call(?MODULE, {sendmsg,Msg}). sendMsg(Key,Msg)-> case ets:lookup(clientinfo, Key)of [Record]-> io:format("Record found ~p~n",[Record]), Pid=Record#clientinfo.pid, %while send down we change msg type to dwmsg io:format("send smg to client_session ~p~n",[Pid]), Pid!{dwmsg,Msg}, Next=ets:next(clientinfo, Key), sendMsg(Next,Msg); []-> io:format("no clientinfo found~n") end , ok ; sendMsg([],Msg)-> ok. %%return all connected clientinfo to sender getMembers(From)-> ok. %%set clientinfo return ok or false %% when ok broadcast change %% user can changge name later setInfo(ClientInfo,From)-> ok. logout(Ref)-> gen_server:call(?MODULE, {remove_clientinfo,Ref}), ok. id_generator.erl:
%% Author: Administrator
%% Created: 2012-2-16
%% Description: TODO: Add description to id_generator
-module(id_generator).
-behavior(gen_server).
%%
%% Include files
%%
%%
%% Exported Functions
%%
-export([start_link/0,getnewid/1]).
-export([init/1,handle_call/3,handle_cast/2,handle_info/2,terminate/2,code_change/3]).
-record(ids,{idtype,ids}).
-record(state,{}).
%%
%% API Functions
%%
start_link()->
gen_server:start_link({local,?MODULE}, ?MODULE, [],[])
.
init([])->
mnesia:start(),
io:format("Started"),
mnesia:create_schema([node()]),
case mnesia:create_table(ids,[{type,ordered_set},
{attributes,record_info(fields,ids)},
{disc_copies,[]}
]) of
{atomic,ok}->
{atomic,ok};
{error,Reason}->
io:format("create table error")
end,
{ok,#state{}}
.
getnewid(IdType)->
%case mnesia:wait_for_tables([tbl_clientid], 5000) of
% ok->
% gen_server:call(?MODULE, {getid,IdType});
% {timeout,_BadList}->
% {timeout,_BadList};
% {error,Reason}->
% {error,Reason}
%end
mnesia:force_load_table(ids),
gen_server:call(?MODULE, {getid,IdType})
.
%%generate new Id with given type
handle_call({getid,IdType},From,State)->
F=fun()->
Result=mnesia:read(ids,IdType,write),
case Result of
[S]->
Id=S#ids.ids,
NewClumn=S#ids{ids=Id+1},
mnesia:write(ids,NewClumn,write),
Id;
[]->
NewClumn=#ids{idtype=IdType,ids=2},
mnesia:write(ids,NewClumn,write),
1
end
end,
case mnesia:transaction(F)of
{atomic,Id}->
{atomic,Id};
{aborted,Reason}->
io:format("run transaction error ~1000.p ~n",[Reason]),
Id=0;
_Els->
Id=1000
end,
{reply,Id,State}
.
handle_cast(_From,State)->
{noreply,ok}.
handle_info(Request,State)->
{noreply,ok}.
terminate(_From,State)->
ok.
code_change(_OldVer,State,Ext)->
{ok,State}.
%%
%% Local Functions
%%
client_session.erl:
%% Author: Administrator
%% Created: 2012-2-16
%% Description: TODO: Add description to client_session
-module(client_session).
-behavior(gen_server).
%%
%% Include files
%%
-include("clientinfo.hrl").
-include("message.hrl").
%%
%% Exported Functions
%%
-export([init/1,start_link/1,handle_info/2,handle_call/3,terminate/2]).
-export([process_msg/1]).
%%
%% API Functions
%%
start_link(Id)->
gen_server:start_link(?MODULE, [Id], [])
%gen_server:start_link({local,?MODULE}, ?MODULE, [Id],[])
.
init(Id)->
State=#clientinfo{id=Id},
{ok,State}.
%%handle message send from room
handle_info({dwmsg,Message},State)->
io:format("client_session dwmsg recived ~p~n",[Message]),
case gen_tcp:send(State#clientinfo.socket, Message#message.content)of
ok->
io:format("client_session dwmsg sended ~n");
{error,Reason}->
io:format("client_session dwmsg sended error ~p ~n",Reason)
end,
{noreply,State};
%%handle message recived from client
%handle_info(Message,State) when is_binary(Message)->
handle_info({bind,Socket},State)->
io:format("client_session bind socket ~n"),
NewState=State#clientinfo{socket=Socket},
io:format("NewState ~p~n",[NewState]),
{noreply,NewState};
%to start reciving
%handle_info({start,Pid},State)->
% io:format("client_session:reciving start...~p~n",[State#clientinfo.socket]),
% NewState=State#clientinfo{pid=Pid},
% process_msg(NewState),
% {noreply,State};
handle_info({tcp,Socket,Data},State)->
io:format("client_session tcp data recived ~p~n",[Data]),
%io:format("msg recived ~p~n",[Message]),
NewMsg=#message{type=msg,from=State#clientinfo.id,content=Data},
chat_room:broadCastMsg(NewMsg),
{noreply,State};
handle_info({tcp_closed,Socket},State)->
chat_room:logout(State);
handle_info(stop,State)->
io:format("client stop"),
{stop,normal,stopped,State};
handle_info(Request,State)->
io:format("client_session handle else ~p~n",[Request]),
{noreply,State}
.
handle_call(Request,From,State)->
{reply,ok,State}.
handle_cast(Request,State)->
{noreply,State}.
terminate(_Reason,State)->
ok.
%%
%% Local Functions
%%
process_msg(State)->
io:format("client_session:process_msg SOCKET:~p ~n",[State#clientinfo.socket]),
case gen_tcp:recv(State#clientinfo.socket, 0) of
{ok,Message}->
io:format("recived ~p ~n",[Message]),
%io:format("msg recived ~p~n",[Message]),
NewMsg=#message{type=msg,from=State#clientinfo.id,content=Message},
chat_room:broadCastMsg(NewMsg),
process_msg(State);
{error,closed}->
io:format("client_session:recive error ~n"),
process_msg(State);
Any->
io:format("client_session:recive any ~n"),
process_msg(State)
end
.
chat_acceptor.erl:
%% Author: Administrator %% Created: 2012-2-18 %% Description: TODO: Add description to chat_acceptor -module(chat_acceptor). %% %% Include files %% %% %% Exported Functions %% -export([start/1,accept_loop/1]). %% %% API Functions %% %%start listen server start(Port)-> case (do_init(Port))of {ok,ListenSocket}-> accept_loop(ListenSocket); _Els -> error end. %%listen port do_init(Port) when is_list(Port)-> start(list_to_integer(Port)) ; do_init([Port]) when is_atom(Port)-> start(list_to_integer(atom_to_list(Port))) ; do_init(Port) when is_integer(Port)-> Options=[binary, {packet, 0}, {reuseaddr, true}, {backlog, 1024}, {active, true}], case gen_tcp:listen(Port, Options) of {ok,ListenSocket}-> {ok,ListenSocket}; {error,Reason} -> {error,Reason} end. %%accept client connection accept_loop(ListenSocket)-> case (gen_tcp:accept(ListenSocket, 3000))of {ok,Socket} -> process_clientSocket(Socket), ?MODULE:accept_loop(ListenSocket); {error,Reason} -> ?MODULE:accept_loop(ListenSocket); {exit,Reason}-> ?MODULE:accept_loop(ListenSocket) end. %%process client socket %%we should start new thread to handle client %%generate new id using id_generator process_clientSocket(Socket)-> Record=chat_room:getPid(), chat_room:bindPid(Record, Socket), ok. %% %% Local Functions %%
为了测试这个服务器程序,我用JAVA写了个简单的client端程序,如下:
MainForm.java
package com.kinglong.socket; import java.awt.BorderLayout; import java.awt.FlowLayout; import java.awt.GridBagConstraints; import java.awt.GridBagLayout; import java.awt.Insets; import java.awt.event.ActionEvent; import java.awt.event.ActionListener; import java.awt.event.WindowEvent; import java.awt.event.WindowListener; import javax.swing.JButton; import javax.swing.JFrame; import javax.swing.JPanel; import javax.swing.JTextArea; import javax.swing.JTextField; public class MainForm { private JFrame frame; private JPanel toolbar; private JTextArea outfile; private JTextField inpfile; SocketClient clientThread; public MainForm(){ final JButton send = new JButton("send"); outfile = new JTextArea(); inpfile = new JTextField(); clientThread = new SocketClient("***.***.***",3377,this); clientThread.start(); send.setActionCommand("send"); toolbar=new JPanel(); toolbar.setLayout(new BorderLayout()); toolbar.add(outfile,BorderLayout.CENTER); JPanel bottom =new JPanel(); bottom.setLayout(new GridBagLayout()); bottom.add(send,new GridBagConstraints(0,0,1,1, 0.0,0.0, GridBagConstraints.CENTER, GridBagConstraints.NONE, new Insets(2,2,2,2), 0,0)); bottom.add(inpfile,new GridBagConstraints(1,0,1,1, 0.0,0.0, GridBagConstraints.CENTER, GridBagConstraints.NONE, new Insets(2,2,2,2), 200,0)); toolbar.add(bottom,BorderLayout.SOUTH); ActionListener act = new ActionListener(){ @Override public void actionPerformed(ActionEvent e) { // TODO Auto-generated method stub if("send".equals(e.getActionCommand())){ clientThread.sendMsg(inpfile.getText()); inpfile.setText(""); } } }; send.addActionListener(act); frame = new JFrame(); frame.getContentPane().add(toolbar); frame.setSize(500,300); frame.setResizable(false); frame.setVisible(true); frame.addWindowListener(new WindowListener(){ @Override public void windowActivated(WindowEvent e) { // TODO Auto-generated method stub } @Override public void windowClosed(WindowEvent e) { // TODO Auto-generated method stub } @Override public void windowClosing(WindowEvent e) { // TODO Auto-generated method stub } @Override public void windowDeactivated(WindowEvent e) { // TODO Auto-generated method stub } @Override public void windowDeiconified(WindowEvent e) { // TODO Auto-generated method stub } @Override public void windowIconified(WindowEvent e) { // TODO Auto-generated method stub } @Override public void windowOpened(WindowEvent e) { // TODO Auto-generated method stub } }); } public void connect(){ } public void recMsg(String msgs){ String data =outfile.getText(); outfile.setText((data==null?"":data)+msgs+"\n"); } public static void main(String args[]){ MainForm form = new MainForm(); form.connect(); } }
SocketClient.java
package com.kinglong.socket;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.Socket;
import java.net.UnknownHostException;
public class SocketClient extends Thread{
Socket clientSocket;
InputStream inst;
OutputStream oust;
MainForm mf;
boolean isrunning=true;
public SocketClient(String ip,int port,MainForm mf){
try {
clientSocket= new Socket(ip,port);
inst = clientSocket.getInputStream();
oust = clientSocket.getOutputStream();
this.mf=mf;
} catch (UnknownHostException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void sendMsg(String msg){
try {
oust.write(msg.getBytes());
oust.flush();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void run() {
// TODO Auto-generated method stub
InputStreamReader reader = new InputStreamReader(inst);
BufferedReader bfreader = new BufferedReader(reader);
while(isrunning){
String str=null;
try {
byte[] data =new byte[200];
int len =0;
while((len=inst.read(data))>0){
str=new String(data).trim();
System.out.println("msg recived:"+str);
mf.recMsg(str);
}
} catch (IOException e) {
// TODO Auto-generated catch block
System.out.println("recMsg error"+e.getMessage());
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
测试方法:
启动服务器端:
>echatServer:start().
启动两个客户端:
run as java Application
效果图:
发送:
发送:
服务器端打印出的日志:
Startedid generated 1 binding socket... chat_room:insert record {clientinfo,1,<0.102.0>,#Port<0.1778>,undefined, undefined,undefined,undefined,undefined, undefined,undefined} clientBinded client_session bind socket NewState {clientinfo,[1], undefined,#Port<0.1778>,undefined,undefined,undefined, undefined,undefined,undefined,undefined} id generated 2 binding socket... chat_room:insert record {clientinfo,2,<0.109.0>,#Port<0.1797>,undefined, undefined,undefined,undefined,undefined, undefined,undefined} clientBinded client_session bind socket NewState {clientinfo,[2], undefined,#Port<0.1797>,undefined,undefined,undefined, undefined,undefined,undefined,undefined} client_session tcp data recived <<"hello a">> feching talbe key is 1 Record found {clientinfo,1,<0.102.0>,#Port<0.1778>,undefined,undefined, undefined,undefined,undefined,undefined,undefined} send smg to client_session <0.102.0> Record found {clientinfo,2,<0.109.0>,#Port<0.1797>,undefined,undefined, undefined,undefined,undefined,undefined,undefined} client_session dwmsg recived {message,msg, [2], undefined,<<"hello a">>,undefined} send smg to client_session <0.109.0> client_session dwmsg sended no clientinfo found client_session dwmsg recived {message,msg, [2], undefined,<<"hello a">>,undefined} client_session dwmsg sended feching talbe key is 1 client_session tcp data recived <<"hello b">> Record found {clientinfo,1,<0.102.0>,#Port<0.1778>,undefined,undefined, undefined,undefined,undefined,undefined,undefined} send smg to client_session <0.102.0> Record found {clientinfo,2,<0.109.0>,#Port<0.1797>,undefined,undefined, undefined,undefined,undefined,undefined,undefined} send smg to client_session <0.109.0> no clientinfo found client_session dwmsg recived {message,msg, [1], undefined,<<"hello b">>,undefined} client_session dwmsg sended client_session dwmsg recived {message,msg, [1], undefined,<<"hello b">>,undefined} client_session dwmsg sended
至此基本的聊天功能实现了。顺便说下,以上的服务器端程序将监听到的socket连接交由单独的进程处理了。
还有不足就是客户端退出时会引发服务器端异常退出,这个需要对socket连接断开进行处理,下一步对这个进行修改。