オルタナティブ・ブログ > プログラマー社長のブログ >

プログラミングでメシが食えるか!?

高負荷システムではログ出力も大変!

»

引き続き技術ネタで。

どのようなシステムでもログ出力はとても大切なものです。プログラムの挙動を後から確認するのはもちろん、さまざまな統計処理に使ったりと、何らかのデータ加工処理を行うシステムでログが一切不要というシステムはまずないでしょう。

昨日書いたように、私が大好きな高負荷ネットワークシステムでも、当然ログは重要なのですが、ログ出力自体が性能の足を引っ張ることが実に良くあるのです。

一番苦労するのが、マルチスレッド&マルチプロセスのプログラムから1つのファイルにログを出力するようなケースです。マルチスレッドでも、ログの整合性を取るためにミューテックスで排他を取るなどの気配りが必要になります(高水準出力は排他が内部で行われているとしても)し、そもそもディスクリプタを大量に使っていると高水準入出力はまともに使えなくなり、低水準入出力を使う必要が出てきます。そうするとある程度バッファリング処理などを自作しないとI/O負荷が目立ってきてしまうこともあります。さらにマルチプロセスになると、プロセス間で排他が必要になり、スレッド間排他以上に面倒であると同時に速度も落ちます。そのような状態でさらにローテーション管理なども行うとなると、ログ出力だけでかなりのコーディングが必要になるばかりか、経験のない人ではお手上げになってしまうくらいなのです。

実際、高負荷システムを開発していて選択を迫られるものの一つが、「統計情報をどこまで取るか」という問題です。運用側からすれば統計情報が豊富なほど当然運用に必要な情報が増えて助かるのですが、プログラム側では統計処理自体のために排他が必要になり、さらにログ記録で排他が必要となるなど、性能への影響がとても大きいのです。

ログ出力などsyslogに任せれば?と考える人もいると思いますが、syslogはあまりにも高い頻度で書き込むと不具合が出ます。デッドロックしたりした経験が私も何度もあります。さらに、システムごとに要求される細かな出力制御に対応するのはかなり困難です。

昨日、あるシステムの高負荷状態での動きを観察していたのですが、そのシステムでは、あるプログラムが、2プロセス起動し、それぞれ2万スレッドずつフル稼働するような処理を行います。その状態で5種類のログファイルに各スレッドから記録を行うのですが、超高負荷になるとプロセス間の排他がスレッド間の排他に勝てなくなり、片方のプロセスはなかなか排他を取ることができなくなり、かなり稼働状況に偏りができてしまいました。原因切り分けをしていくと、ログ出力を完全に止めると問題が無くなることから、明らかにログの排他だということになり・・・昨晩対策を考えました。

自力でファイルに記録するのは諦め、別プログラムに記録を依頼する仕組みにし、ログ出力側は排他を行わない、という感じにしました。SystemVメッセージキューを使い、ログ記録を依頼し、ログ記録専用プログラムがそれを順次ファイルに記録していくという仕組みです。わざわざプロセスを分けてプロセス間通信するのは、逆にオーバーヘッドが増えるのでは?と思うかもしれませんが、メッセージキュー自体がキューの役割も果たし、実にスムーズに問題なくログが記録でき、さらにログ記録側もシンプルな処理のため、負荷も低く処理できるということもあるのです。

言葉だけでは良くわからないと思いますので、私が昨晩1時間ちょっとで作ったロガープログラムを公開しておきます。何か問題に気がついたら教えてくださいね!なお、Linux以外ではそのまま動かないかもしれません。簡単なものなので、流用するなりなんなりお好きに使ってかまいませんが、信用して重要なシステムに使って問題が出ても責任は取りません。。

まとめたもの:logger.tar.gzをダウンロード

[logger]メッセージキューを介したログ出力サーバ

【概要】
loggerはSystemVメッセージキューを介してログ出力を行うためのログ出力サーバである。INIファイルにより複数のログ出力を定義でき、メッセージキュー番号ごとにログ出力が行われる。
高水準出力を使い、I/O負荷を下げると共に、1秒に1回fflush()でフラッシュも念のため(tail -Fなどで使いやすいように)行っている。
ローテーションなし、サイズローテーション、定時ローテーションの機能も持つ。
1回のログ出力単位の最大サイズは8192バイトで、loggerにより自動的に末尾に改行(LF)が負荷されてファイルに記録される。
logrotateと組み合わせの為に、USR1でファイルの再オープンを行うことができる。

【INIファイル】
行頭'#'はコメント行としてスキップする。
qnoをメッセージキュー番号とし、そこに届いた内容を「path/name.log」ファイルに追記していく。
rotateTypeは、N:ローテーションなし、S:サイズローテーション、H:定時ローテーションが指定できる。
サイズローテーションの時のみ、sizeLimitに指定した値が有効である。
例:

# name,qno,path,rotateType(N:none,S:size,H:hour),sizeLimit(MB)
app1,5601,/usr/local/app1/log,N,0
app2,5602,/usr/local/app2/log,N,0
app3,5603,/usr/local/app3/log,H,0
app4,5604,/usr/local/app4/log,S,10
app5,5605,/usr/local/app5/log,S,10


【起動】
引数でINIファイルのパスを指定できる。無指定の場合は"./logger.ini"が使われる。
フォアグラウンドで起動する。

【シグナル】
SIGINT,SIGTERM,SIGQUITで終了。
SIGUSR1でファイルの再オープン:logrotate用。

【備考】
今どきSystemVメッセージキューを使う理由は、使い慣れているということと、このプログラムでは多重化の必要がないこと、POSIXメッセージキューより速いらしいという理由である。IPCは後始末が面倒という面もあるが、アプライアンス的な用途では問題ない。
ソケットを使わない理由は、キューサイズの大きさと、不揮発性からログ向きという判断と、ソケットを大量に使うアプリケーションと組み合わせて使うことを想定して。
多数のスレッド・プロセスで構成するアプリケーションで単一ファイルにログ出力を行いたい場合、排他が非常に大変であり、排他自体のオーバーヘッドも高いため、メッセージキューを介して独立させ、排他の簡素化とともに、ディスクリプタ数の問題からも開放して高水準出力を使うことが本来の目的である。

【送信側サンプル】
エラー処理などは省略したサンプルを示す。
msgsnd()のIPC_NOWAITは、万が一loggerが起動していないときに、ログ出力でブロックすることがないように指定しているが、ログ出力自体の確実性を優先するなら指定しない、あるいはリトライするのも良い。

#include        <stdio.h>
#include        <string.h>
#include        <sys/ipc.h>
#include        <sys/msg.h>

#define MAX_SIZE        (8192)
#define QNO             (4601)

int main()
{
int     qid;
struct msgbuf{
        long    mtype;
        char    mtext[MAX_SIZE];
}buf;

        qid=msgget(QNO,0666|IPC_CREAT);

        buf.mtype=1L;
        snprintf(buf.mtext,sizeof(buf.mtext),"test message");

        msgsnd(qid,&buf,strlen(buf.mtext),IPC_NOWAIT);

        return(0);
}

ソース

#include    <stdio.h>
#include    <stdlib.h>
#include    <string.h>
#include    <signal.h>
#include    <unistd.h>
#include    <errno.h>
#include    <sys/ipc.h>
#include    <sys/msg.h>
#include    <pthread.h>

#define    PARAM_PATH    "./logger.ini"
#define    MAX_SIZE    (8192)

typedef struct    {
    char    *name;
    int    qno;
    char    *path;
    char    rotateType;
    int    sizeLimit;
}PARAM;

PARAM    *Param=NULL;
int    ParamNo=0;

typedef struct    {
    pthread_t    tid;
    char    *fname;
    int    qid;
    FILE    *fp;
    int    reopen;
    int    year,mon,mday,hour;
}THREAD_DATA;

THREAD_DATA    *TD=NULL;
int    TDNo=0;

pthread_t    ParentTid;
int    EndFlag=0;

int ReadParam(char *fname)
{
FILE    *fp;
PARAM    p;
char    buf[2048],*ptr,*saveptr;

    if((fp=fopen(fname,"r"))==NULL){
        perror("fopen");
        return(-1);
    }

    while(1){
        if(fgets(buf,sizeof(buf),fp)==NULL){
            break;
        }
        if(buf[0]=='#'){
            continue;
        }
        if((ptr=strtok_r(buf,",",&saveptr))==NULL){
            continue;
        }
        p.name=ptr;
        if((ptr=strtok_r(NULL,",",&saveptr))==NULL){
            continue;
        }
        p.qno=atoi(ptr);
        if((ptr=strtok_r(NULL,",",&saveptr))==NULL){
            continue;
        }
        p.path=ptr;
        if((ptr=strtok_r(NULL,",",&saveptr))==NULL){
            continue;
        }
        if(*ptr=='N'||*ptr=='S'||*ptr=='H'){
            p.rotateType=*ptr;
        }
        else{
            continue;
        }
        if((ptr=strtok_r(NULL,"\r\n",&saveptr))==NULL){
            continue;
        }
        p.sizeLimit=atoi(ptr);
        if(ParamNo==0){
            Param=(PARAM *)malloc(sizeof(PARAM));
        }
        else{
            Param=(PARAM *)realloc(Param,(ParamNo+1)*sizeof(PARAM));
        }
        Param[ParamNo].name=strdup(p.name);
        Param[ParamNo].qno=p.qno;
        Param[ParamNo].path=strdup(p.path);
        Param[ParamNo].rotateType=p.rotateType;
        Param[ParamNo].sizeLimit=p.sizeLimit;
        printf("[%d]:%s,%d,%s,%c,%d\n",
            ParamNo,
            Param[ParamNo].name,
            Param[ParamNo].qno,
            Param[ParamNo].path,
            Param[ParamNo].rotateType,
            Param[ParamNo].sizeLimit);
        ParamNo++;
    }

    fclose(fp);

    return(0);
}

int checkRotatePre(int no)
{
THREAD_DATA    *td=&TD[no];
char    *bakname;

    if(Param[no].rotateType=='H'){
        time_t    t;
        struct tm    tm;
        t=time(NULL);
        localtime_r(&t,&tm);
        if(td->hour==-1){
            td->year=tm.tm_year+1900;
            td->mon=tm.tm_mon+1;
            td->mday=tm.tm_mday;
            td->hour=tm.tm_hour;
            return(0);
        }
        else if(td->hour!=tm.tm_hour){
            if(td->fp!=NULL){
                fclose(td->fp);
            }
            bakname=(char *)calloc(strlen(td->fname)+1+4+2+2+2+1,sizeof(char));
            sprintf(bakname,"%s.%04d%02d%02d%02d",td->fname,td->year,td->mon,td->mday,td->hour);
            rename(td->fname,bakname);
            free(bakname);
            td->year=tm.tm_year+1900;
            td->mon=tm.tm_mon+1;
            td->mday=tm.tm_mday;
            td->hour=tm.tm_hour;
            td->fp=fopen(td->fname,"a");
            return(1);
        }
        else{
            return(0);
        }
    }
    else{
        return(0);
    }
}

int checkRotatePost(int no)
{
THREAD_DATA    *td=&TD[no];
char    *bakname;

    if(Param[no].rotateType=='N'){
        return(0);
    }
    else if(Param[no].rotateType=='S'){
        long    size;
        size=ftell(td->fp);
        if(size>Param[no].sizeLimit*1024*1024){
            if(td->fp!=NULL){
                fclose(td->fp);
            }
            bakname=(char *)calloc(strlen(td->fname)+1+3+1,sizeof(char));
            sprintf(bakname,"%s.bak",td->fname);
            rename(td->fname,bakname);
            free(bakname);
            td->fp=fopen(td->fname,"a");
            return(1);
        }
        else{
            return(0);
        }
    }
    else{
        return(0);
    }
}

void *loggerThread(void *arg)
{
int    no=(int)arg;
THREAD_DATA    *td=&TD[no];
struct msgbuf{
    long    mtype;
    char    mtext[MAX_SIZE];
}buf;
ssize_t    size;

    while(EndFlag==0){
        if(td->reopen==1){
            if(td->fp!=NULL){
                fclose(td->fp);
            }
            td->fp=fopen(td->fname,"a");
            td->reopen=0;
        }
        if((size=msgrcv(td->qid,&buf,sizeof(buf.mtext),0L,MSG_NOERROR))==-1){
            perror("msgrcv");
        }
        else{
            checkRotatePre(no);
            if(td->fp!=NULL){
                fwrite(buf.mtext,sizeof(char),size,td->fp);
                fputc('\n',td->fp);
            }
            checkRotatePost(no);
        }
    }

    fclose(td->fp);

    return(0);
}

void ending(int sig)
{
    if(pthread_self()==ParentTid){
        EndFlag=1;
    }
}

void reopen(int sig)
{
int    i;

    if(pthread_self()==ParentTid){
        for(i=0;i<ParamNo;i++){
            TD[i].reopen=1;
            pthread_kill(TD[i].tid,SIGUSR1);
        }
    }
}

int main(int argc,char *argv[])
{
int    i,ret;

    if(argc==1){
        if(ReadParam(PARAM_PATH)==-1){
            return(-1);
        }
    }
    else{
        if(ReadParam(argv[1])==-1){
            return(-1);
        }
    }

    TD=(THREAD_DATA *)calloc(ParamNo,sizeof(THREAD_DATA));

    ParentTid=pthread_self();

    signal(SIGINT,ending);
    signal(SIGTERM,ending);
    signal(SIGQUIT,ending);
    signal(SIGUSR1,reopen);
    signal(SIGPIPE,SIG_IGN);

    for(i=0;i<ParamNo;i++){
        if((TD[i].qid=msgget(Param[i].qno,0666|IPC_CREAT))==-1){
            perror("msgget");
            return(-1);
        }
        TD[i].fname=(char *)calloc(strlen(Param[i].path)+1+strlen(Param[i].name)+1+3+1,sizeof(char));
        sprintf(TD[i].fname,"%s/%s.log",Param[i].path,Param[i].name);
        if((TD[i].fp=fopen(TD[i].fname,"a"))==NULL){
            perror(TD[i].fname);
            return(-1);
        }
        TD[i].hour=-1;
        if((ret=pthread_create(&TD[i].tid,NULL,loggerThread,i))!=0){
            errno=ret;
            perror("pthread_create");
            return(-1);
        }
    }

    while(EndFlag==0){
        for(i=0;i<ParamNo;i++){
            fflush(TD[i].fp);
        }
        sleep(1);
    }

    for(i=0;i<ParamNo;i++){
        pthread_kill(TD[i].tid,SIGINT);
    }

    for(i=0;i<ParamNo;i++){
        void    *retval;
        pthread_join(TD[i].tid,&retval);
    }

    return(0);
}

Makefile

OBJS=main.o
CFLAGS= -g -O2 -W -Wall -Wcast-align -Wcast-qual -Wcomment -Wformat -Wlong-long -Wno-import -Wparentheses -Wpointer-arith -Wredundant-decls -Wreturn-type -Wshadow -Wswitch -Wtrigraphs -Wunused -Wwrite-strings
SRCS=$(OBJS:%.o=%.c)
LDLIBS=-lpthread
TARGET=logger
$(TARGET): $(OBJS)
    $(CC) $(CFLAGS) $(LDFLAGS) -o $(TARGET) $(OBJS) $(LDLIBS)
clean:
    $(RM) $(RMFLAGS) $(OBJS) $(PROGRAM) $(DEPS) gmon.out y.tab.c y.tab.h

Comment(2)