Ganglia 源码剖析之 gmetad

gmetad 负责汇总 gmond 节点的指标数据,相比 gmond 来说要简单很多。

1. 主流程

gmetad 的主要工作如下:

  • 收集 gmond 数据
  • 指标数据落盘
  • 提供指标数据查询(xml_port 和 interactive port)
int
main ( int argc, char *argv[] )
{
    ...
    // 命令行参数解析
    if (cmdline_parser(argc, argv, &args_info) != 0)
        err_quit("command-line parser error");
    
    
}

2. gmond 数据收集

在 gmetad 的 conf 会定义 datasource,类似如下。

data_source "my cluster" 10 localhost my.machine.edu:8649 1.2.3.5:8655 
data_source "my grid" 50 1.3.4.7:8655 grid.org:8651 grid-backup.org:8651 
data_source "another source" 1.3.4.8:8655 1.3.4.8

data_source 会指定 gmetad 采集的 gmond 的主机信息:server_ip 或者 server_hostname 和 port。如果 port 不指定则默认为 8649。data_source 后面可以指定多个 server,gmetad 采集的时候顺序采集 server list,指定多个 server list 的好处在于可以避免单点故障。收集 data source 的 metric 的主要代码如下。

int
main ( int argc, char *argv[] )
{
    num_sources = number_of_datasources( args_info.conf_arg );
    ...
    /* Get the real number of data sources later */
    sources = hash_create( num_sources + 10 );
  
    for(;;)
    {
        // 随机时间间隔采集
        /* Do at a random interval, between 
                 (shortest_step/2) +/- METADATA_SLEEP_RANDOMIZE percent */
        random_sleep_factor = (1 + (METADATA_SLEEP_RANDOMIZE / 50.0) * ((rand_r(&rand_seed) - RAND_MAX/2)/(float)RAND_MAX));
        sleep_time = random_sleep_factor * apr_time_from_sec(c->shortest_step) / 2;
        /* Make sure the sleep time is at least 1 second */
        if(apr_time_sec(apr_time_now() + sleep_time) < (METADATA_MINIMUM_SLEEP + apr_time_sec(apr_time_now())))
            sleep_time += apr_time_from_sec(METADATA_MINIMUM_SLEEP);
        apr_sleep(sleep_time);
      
        // 收集数据
        /* Sum the new values */
        hash_foreach(root.authority, do_root_summary, NULL );
    }
}

// 类似 C++ 的 STL 中的 for_each
int
hash_foreach (hash_t * hash, int (*func)(datum_t *, datum_t *, void *), void *arg)
{
    for (i = 0; i < hash->size && !stop; i++)
    {
        apr_thread_rwlock_rdlock(hash->lock[i]);
        for (bucket = &hash->node[i]; bucket != NULL && bucket->key != NULL; bucket = bucket->next)
        {
            if (bucket->key == NULL) continue;
            stop = func(bucket->key, bucket->val, arg);
            if (stop) break;
        }
        apr_thread_rwlock_unlock(hash->lock[i]);
    }
    return stop;
}

/* Sums the metric summaries from all data sources. */
static int
do_root_summary( datum_t *key, datum_t *val, void *arg ) {}

3. tcp server: xml_port

gmetad 会根据配置文件新建两种 tcp server,一种是根据配置 xml_port(默认为 8651),一种是根据配置 interactive_port(默认为 8652)。根据 xml_port 创建的 tcp server 会以 xml 格式返回 gmetad 收集的所有集群数据,多层 gmetad 结构的话可以使用这些数据。根据 interactive_port 创建的 tcp server 一般是用来和 gweb 交互的,并且可以指定一些信息。先看 xml_port 的处理逻辑。

int
main ( int argc, char *argv[] )
{
    parse_config_file ( args_info.conf_arg );
  
    server_socket = g_tcp_socket_server_new( c->xml_port );
    ...
    
    interactive_socket = g_tcp_socket_server_new( c->interactive_port );
    ...
      
    /* Spin off the non-interactive server threads. (Half as many as interactive). */
    for (i=0; i < c->server_threads/2; i++)
        pthread_create(&pid, &attr, server_thread, (void*) 0);

    /* Spin off the interactive server threads. */
    for (i=0; i < c->server_threads; i++)
        pthread_create(&pid, &attr, server_thread, (void*) 1);
}

从上面代码中可以看出 gmetad 会使用 pthread_create 创建进程负责 tcp server 的请求处理。server_threads 是 配置文件中 server_threads 指定的,默认为4。xml_port server 会使用 server_threads/2 个进程,interactive_port server 会使用 server_threads 个进程。

void *
server_thread (void *arg)
{
    int interactive = (arg != NULL);
  
    for (;;)
    {
        if (interactive)
        {
            pthread_mutex_lock(&server_interactive_mutex);
            SYS_CALL( client.fd, accept(interactive_socket->sockfd, (struct sockaddr *) &(client.addr), &len));
            pthread_mutex_unlock(&server_interactive_mutex);
            ganglia_scoreboard_inc(TCP_REQS_ALL);
            ganglia_scoreboard_inc(TCP_REQS_INTXML);
            now = apr_time_now();
        }
        else
        {
            pthread_mutex_lock  ( &server_socket_mutex );
            SYS_CALL( client.fd, accept(server_socket->sockfd, (struct sockaddr *) &(client.addr), &len));
            pthread_mutex_unlock( &server_socket_mutex );
            ganglia_scoreboard_inc(TCP_REQS_ALL);
            ganglia_scoreboard_inc(TCP_REQS_XML);
            now = apr_time_now();
        }
      
        if (interactive)
        {
            request_len = readline(client.fd, request, REQUESTLEN);
            // interactive 模式要处理 request
        }
        else
        {
            ganglia_scoreboard_inc(TCP_REQS_ALL);
            strcpy(request, "/");
        }
      
        // 输出 xml 数据
        if(root_report_start(&client))
        {
            ...
        }
        // 输出集群的状态信息
        if (process_path(&client, request, &rootdatum, NULL)) 
        {
            ...
        }
        // 输出 xml 数据
        if(root_report_end(&client)) 
        {
            ...
        }
    }
}

4. tcp server: interactive port

类似 xml_port 的 tcp server 处理,不再赘述。

5. 指标数据落盘

我们知道 gmetad 会将机器收集的指标数据存放在 rrd 文件中。

int
main ( int argc, char *argv[] )
{
    for(;;)
    {
        ...
        /* Sum the new values */
        hash_foreach(root.authority, do_root_summary, NULL );

        /* summary completed */
        pthread_mutex_unlock(root.sum_finished);
	
        // 落盘  
        /* Save them to RRD */
        hash_foreach(root.metric_summary, write_root_summary, NULL);
    }
}
我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章