オルタナティブ・ブログ > プログラマー社長のブログ >

プログラミングでメシが食えるか!?

LinuxでのTCP/IP:send()の挙動

»

TCP/IPは簡単に信頼性の高い通信を実現するためのプロトコルで、接続して送受信するだけで普通に使えるようなプログラムを作ることができます。が、実際に回線を流れる際にはパケットの遅延・ロスなどが発生するため、再送などの複雑な仕組みがあり、それらを多少は理解していないと仕事で使えるレベルのシステムが構築できないこともあるものです。

受信は届いたものを受け取るだけなので、それほど気にしなくても良いのですが、送信は気にしはじめるといろいろと不可解な動きに出会います。簡単な実験用プログラムを作り、動かして確認してみましょう。ソースは最後に張り付けてあります。

まず、送信で難しいのが、相手側が不調になった場合です。相手側のプログラムが終了した場合には割とすぐに送信エラーになるのですが(それでも1発でエラーにならないなど難しいのですが)、ネットワークケーブルが抜けたり、プログラムがハングアップや暴走しているとそう簡単にはエラーになりません。

まず、相手側が全く受信をしない場合にどのくらい送信できるかを見てみましょう。

スクリーンショット 2014-11-28 14.36.59.png

size.pdf

結構なサイズが送信できてしまいます。しかも、ノンブロッキングや送信タイムアウトを指定しないデフォルトの状態ではいつまでたってもエラーにならず、send()が待ち続けてしまいます。

送信しながら他の処理を行うようなプログラムだと、送信がブロックしてしまうと他の処理も止まってしまいます。ノンブロッキングにするか送信タイムアウトを指定しないとまずい場合も多いのです。

ノンブロッキングや送信タイムアウトを使うと難しいのが、CPU使用量です。ノンブロッキングでは送信できてもできなくてもすぐにsend()から戻ってくるので、CPUがどんどん消費されます。

スクリーンショット 2014-11-28 14.36.32.png

tp.pdf

送信レディを見ながら送るようにしてCPU使用量を低くしないとスループットが下がるばかりか、マシン全体が遅くなってしまうので注意が必要です。

ということで、細かい説明は書いているとボリュームが大きくなってしまいますので、ソースを見て、動かして自分でやってみるのが一番!

#include        <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <signal.h>
#include <time.h>
#include <poll.h>
#include <sys/resource.h>
#include <sys/epoll.h>
#include <sys/param.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/tcp.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <sys/resource.h>

#define BUFSIZE (4096)

#define TOTAL_SIZE ((size_t)10*1024*1024*1024)

//#define F_NODELAY 1

//#define F_SNDTIMEO 1
#define SNDTIMEO_S 0
#define SNDTIMEO_U (1*1000)

//#define F_SNDBUF 1
#define SNDBUF_SIZE (4*1024*1024)

//#define F_NONBLOCK 1

//#define D_EPOLL 1
//#define D_POLL 1
//#define D_SELECT 1
#define D_NOWAIT 1

int EndFlag=0;

void ending(int sig)
{
EndFlag=1;
}

int main(int argc,char *argv[])
{
int soc;
struct in_addr addr;
struct sockaddr_in server;
int flag,size,optlen;
struct timeval tv;
struct rusage usage;

if(argc<=2){
fprintf(stderr,"client target-ip target-port\n");
return(-1);
}

if((addr.s_addr=inet_addr(argv[1]))==-1){
perror("inet_addr");
return(-1);
}
memset(&server,0,sizeof(server));
server.sin_family=AF_INET;
server.sin_addr=addr;
server.sin_port=htons(atoi(argv[2]));

if((soc=socket(AF_INET,SOCK_STREAM,0))<0){
perror("socket");
return(-1);
}

if(connect(soc,(struct sockaddr *)&server,sizeof(server))==-1){
perror("connect");
return(-1);
}

fprintf(stderr,"send-size:%dbytes\n",BUFSIZE);

optlen=sizeof(flag);
if(getsockopt(soc,IPPROTO_TCP,TCP_NODELAY,&flag,&optlen)==-1){
perror("getsockopt");
return(-1);
}
fprintf(stderr,"default:TCP_NODELAY=%d\n",flag);
#ifdef F_NODELAY
flag=1;
fprintf(stderr,"set:TCP_NODELAY=%d\n",flag);
if(setsockopt(soc,IPPROTO_TCP,TCP_NODELAY,&flag,optlen)==-1){
perror("setsockopt");
return(-1);
}
if(getsockopt(soc,IPPROTO_TCP,TCP_NODELAY,&flag,&optlen)==-1){
perror("getsockopt");
return(-1);
}
fprintf(stderr,"after-set:TCP_NODELAY=%d\n",flag);
#endif

optlen=sizeof(tv);
if(getsockopt(soc,SOL_SOCKET,SO_SNDTIMEO,&tv,&optlen)==-1){
perror("getsockopt");
return(-1);
}
fprintf(stderr,"default:SO_SNDTIMEO=%u.%06u\n",tv.tv_sec,tv.tv_usec);
#ifdef F_SNDTIMEO
tv.tv_sec=SNDTIMEO_S;
tv.tv_usec=SNDTIMEO_U;
fprintf(stderr,"set:SO_SNDTIMEO=%u.%06u\n",tv.tv_sec,tv.tv_usec);
if(setsockopt(soc,SOL_SOCKET,SO_SNDTIMEO,&tv,optlen)==-1){
perror("setsockopt");
return(-1);
}
if(getsockopt(soc,SOL_SOCKET,SO_SNDTIMEO,&tv,&optlen)==-1){
perror("getsockopt");
return(-1);
}
fprintf(stderr,"after-set:SO_SNDTIMEO=%u.%06u\n",tv.tv_sec,tv.tv_usec);
#endif

optlen=sizeof(size);
if(getsockopt(soc,SOL_SOCKET,SO_SNDBUF,&size,&optlen)==-1){
perror("getsockopt");
return(-1);
}
fprintf(stderr,"default:SO_SNDBUF=%d\n",size);
#ifdef F_SNDBUF
size=SNDBUF_SIZE;
fprintf(stderr,"set:SO_SNDBUF=%d\n",size);
if(setsockopt(soc,SOL_SOCKET,SO_SNDBUF,&size,optlen)==-1){
perror("getsockopt");
return(-1);
}
if(getsockopt(soc,SOL_SOCKET,SO_SNDBUF,&size,&optlen)==-1){
perror("getsockopt");
return(-1);
}
fprintf(stderr,"after-set:SO_SNDBUF=%d\n",size);
#endif

if((flag=fcntl(soc,F_GETFL,0))==-1) {
perror("fcntl");
return(-1);
}
fprintf(stderr,"default:O_NONBLOCK=%d\n",(flag&O_NONBLOCK)?1:0);
#ifdef F_NONBLOCK
fprintf(stderr,"set:O_NONBLOCK=%d\n",((flag|O_NONBLOCK)&O_NONBLOCK)?1:0);
fcntl(soc,F_SETFL,flag|O_NONBLOCK);
if((flag=fcntl(soc,F_GETFL,0))==-1) {
perror("fcntl");
return(-1);
}
fprintf(stderr,"after-set:O_NONBLOCK=%d\n",(flag&O_NONBLOCK)?1:0);
#endif

signal(SIGINT,ending);

#ifdef D_EPOLL
fprintf(stderr,"EPOLL\n");
fprintf(stderr,"sec,send-count,send-size,throughput(again-count,block-count,small-send-count)\n");
do_epoll(soc);
#endif
#ifdef D_POLL
fprintf(stderr,"POLL\n");
fprintf(stderr,"sec,send-count,send-size,throughput(again-count,block-count,small-send-count)\n");
do_poll(soc);
#endif
#ifdef D_SELECT
fprintf(stderr,"SELECT\n");
fprintf(stderr,"sec,send-count,send-size,throughput(again-count,block-count,small-send-count)\n");
do_select(soc);
#endif
#ifdef D_NOWAIT
fprintf(stderr,"NOWAIT\n");
fprintf(stderr,"sec,send-count,send-size,throughput(again-count,block-count,small-send-count)\n");
do_nowait(soc);
#endif

close(soc);

getrusage(RUSAGE_SELF,&usage);
fprintf(stderr,"utime=%u.%06u\n",usage.ru_utime.tv_sec,usage.ru_utime.tv_usec);
fprintf(stderr,"stime=%u.%06u\n",usage.ru_stime.tv_sec,usage.ru_stime.tv_usec);
fprintf(stderr,"maxrss=%lu\n",usage.ru_maxrss);
fprintf(stderr,"nvcsw=%lu\n",usage.ru_nvcsw);
fprintf(stderr,"nivcsw=%lu\n",usage.ru_nivcsw);

return(0);
}

int do_epoll(int soc)
{
char buf[BUFSIZE];
int ret,end;
size_t total=0,b_total=0,count=0,acount=0,bcount=0,scount=0,size;
time_t start_t,now_t;
double dval;
int epollfd;
struct epoll_event ev,events[1];

epollfd=epoll_create(1);
ev.events=EPOLLOUT;
ev.data.fd=soc;
epoll_ctl(epollfd,EPOLL_CTL_ADD,soc,&ev);

start_t=time(NULL);
b_total=0;
end=0;
while(total<TOTAL_SIZE){
switch(epoll_wait(epollfd,events,1,-1)){
case -1:
perror("epoll");
end=1;
break;
case 0:
fprintf(stderr,"epoll:timeout\n");
break;
default:
if(total+sizeof(buf)>TOTAL_SIZE){
size=TOTAL_SIZE-total;
}
else{
size=sizeof(buf);
}
ret=send(soc,buf,size,0);
if(ret==-1){
if(errno==EINTR){
}
else if(errno==EAGAIN){
acount++;
}
else if(errno==EWOULDBLOCK){
bcount++;
}
else{
perror("send");
end=1;
}
}
else{
total+=ret;
count++;
if(ret!=sizeof(buf)){
scount++;
}
if(total-b_total>1024*1024*1024){
b_total=total;
now_t=time(NULL);
dval=(double)total/(double)(now_t-start_t)/(1024.0*1024.0)*8.0;
fprintf(stderr,"%d,%lu,%lubytes:%gkbps(%lu,%lu,%lu)\n",now_t-start_t,count,total,dval,acount,bcount,scount);
}
}
break;
}
if(EndFlag||end){
break;
}
}

now_t=time(NULL);
dval=(double)total/(double)(now_t-start_t)/(1024.0*1024.0)*8.0;
fprintf(stderr,"end:%d,%lu,%lubytes:%gkbps(%lu,%lu,%lu)\n",now_t-start_t,count,total,dval,acount,bcount,scount);

return(0);
}

int do_poll(int soc)
{
char buf[BUFSIZE];
int ret,end;
size_t total=0,b_total=0,count=0,acount=0,bcount=0,scount=0,size;
time_t start_t,now_t;
double dval;
struct pollfd targets[1];

targets[0].fd=soc;
targets[0].events=POLLOUT;

start_t=time(NULL);
b_total=0;
end=0;
while(total<TOTAL_SIZE){
switch(poll(targets,1,-1)){
case -1:
perror("poll");
end=1;
break;
case 0:
fprintf(stderr,"poll:timeout\n");
break;
default:
if(total+sizeof(buf)>TOTAL_SIZE){
size=TOTAL_SIZE-total;
}
else{
size=sizeof(buf);
}
ret=send(soc,buf,size,0);
if(ret==-1){
if(errno==EINTR){
}
else if(errno==EAGAIN){
acount++;
}
else if(errno==EWOULDBLOCK){
bcount++;
}
else{
perror("send");
end=1;
}
}
else{
total+=ret;
count++;
if(ret!=sizeof(buf)){
scount++;
}
if(total-b_total>1024*1024*1024){
b_total=total;
now_t=time(NULL);
dval=(double)total/(double)(now_t-start_t)/(1024.0*1024.0)*8.0;
fprintf(stderr,"%d,%lu,%lubytes:%gkbps(%lu,%lu,%lu)\n",now_t-start_t,count,total,dval,acount,bcount,scount);
}
}
break;
}
if(EndFlag||end){
break;
}
}

now_t=time(NULL);
dval=(double)total/(double)(now_t-start_t)/(1024.0*1024.0)*8.0;
fprintf(stderr,"end:%d,%lu,%lubytes:%gkbps(%lu,%lu,%lu)\n",now_t-start_t,count,total,dval,acount,bcount,scount);

return(0);
}

int do_select(int soc)
{
char buf[BUFSIZE];
int ret,end;
size_t total=0,b_total=0,count=0,acount=0,bcount=0,scount=0,size;
time_t start_t,now_t;
double dval;
fd_set writeOk,mask;

FD_ZERO(&mask);
FD_SET(soc,&mask);

start_t=time(NULL);
b_total=0;
end=0;
while(total<TOTAL_SIZE){
writeOk=mask;
switch(select(soc+1,NULL,&writeOk,NULL,NULL)){
case -1:
perror("select");
end=1;
break;
case 0:
fprintf(stderr,"select:timeout\n");
break;
default:
if(total+sizeof(buf)>TOTAL_SIZE){
size=TOTAL_SIZE-total;
}
else{
size=sizeof(buf);
}
ret=send(soc,buf,size,0);
if(ret==-1){
if(errno==EINTR){
}
else if(errno==EAGAIN){
acount++;
}
else if(errno==EWOULDBLOCK){
bcount++;
}
else{
perror("send");
end=1;
}
}
else{
total+=ret;
count++;
if(ret!=sizeof(buf)){
scount++;
}
if(total-b_total>1024*1024*1024){
b_total=total;
now_t=time(NULL);
dval=(double)total/(double)(now_t-start_t)/(1024.0*1024.0)*8.0;
fprintf(stderr,"%d,%lu,%lubytes:%gkbps(%lu,%lu,%lu)\n",now_t-start_t,count,total,dval,acount,bcount,scount);
}
}
break;
}
if(EndFlag||end){
break;
}
}

now_t=time(NULL);
dval=(double)total/(double)(now_t-start_t)/(1024.0*1024.0)*8.0;
fprintf(stderr,"end:%d,%lu,%lubytes:%gkbps(%lu,%lu,%lu)\n",now_t-start_t,count,total,dval,acount,bcount,scount);

return(0);
}

int do_nowait(int soc)
{
char buf[BUFSIZE];
int ret,end;
size_t total=0,b_total=0,count=0,acount=0,bcount=0,scount=0,size;
time_t start_t,now_t;
double dval;

start_t=time(NULL);
b_total=0;
end=0;
while(total<TOTAL_SIZE){
if(total+sizeof(buf)>TOTAL_SIZE){
size=TOTAL_SIZE-total;
}
else{
size=sizeof(buf);
}
ret=send(soc,buf,size,0);
if(ret==-1){
if(errno==EINTR){
}
else if(errno==EAGAIN){
acount++;
}
else if(errno==EWOULDBLOCK){
bcount++;
}
else{
perror("send");
}
end=1;
}
else{
total+=ret;
count++;
if(ret!=sizeof(buf)){
scount++;
}
if(total-b_total>1024*1024*1024){
b_total=total;
now_t=time(NULL);
dval=(double)total/(double)(now_t-start_t)/(1024.0*1024.0)*8.0;
fprintf(stderr,"%d,%lu,%lubytes:%gkbps(%lu,%lu,%lu)\n",now_t-start_t,count,total,dval,acount,bcount,scount);
}
}
if(EndFlag||end){
break;
}
}

now_t=time(NULL);
dval=(double)total/(double)(now_t-start_t)/(1024.0*1024.0)*8.0;
fprintf(stderr,"end:%d,%lu,%lubytes:%gkbps(%lu,%lu,%lu)\n",now_t-start_t,count,total,dval,acount,bcount,scount);

return(0);
}

test.c

Comment(4)