hacking phoenix

因为微服务使用了springboot,并且用使用了phoenix 来读写Hbase,应用经常超过1000的线程数量。


分析 stack:

自己管理了400个线程的线程池,phoenix-1-thread 有128个,HConnection的线程有256个,再加上一些tomcat的http线程,轻松就能到1000个线程。

略多,希望把phoenix和HConnection相关线程干掉。官方文档没有提到怎么设置,网上也没有资料。

那么,看源码吧。


解剖代码:

  1. JobManager 创建了 phoenix-{index}-thread ,JobManager 初始化是由QueryServicesImpl完成的(使用了QueryServicesOptions)

    图片

  2. JobManager.JobCallable 被 ServerCacheClient 使用。

  3. ServerCacheClient 被 MutationState的方法send(Iterator<TableRef> tableRefIterator) 使用。
  4. MutationState 被 PhoenixConnection的方法commit()使用。

恩,到这里,逻辑已经串联起来了。


分析问题:

PhoenixDriver 获取 connection 的时候可以使用方法 Connection connect(String url, Properties info), info 按理是我们的自定义配置,但是实际却不能生效。配置项参考

但是我们发现 PhoenixDriver.getQueryServices() 创建了 QueryServicesImpl

  1. PhoenixEmbeddedDriver 定义了函数 abstract public QueryServices getQueryServices() throws SQLException; 参数列表竟然是空的!
  2. QueryServicesImpl使用了一个默认的参数配置!

图片


第一次尝试:

所以修改 PhoenixEmbeddedDriver 定义 , PhoenixDriver 的实现:

  1. PhoenixEmbeddedDriver

    abstract public QueryServices getQueryServices(Properties properties) throws SQLException;
    
  2. PhoenixDriver

    @Override
    public QueryServices getQueryServices(Properties properties) throws SQLException {
        try {
            lockInterruptibly(LockMode.READ);
            checkClosed();
            // Lazy initialize QueryServices so that we only attempt to create an HBase Configuration
            // object upon the first attempt to connect to any cluster. Otherwise, an attempt will be
            // made at driver initialization time which is too early for some systems.
            QueryServices result = services;
            if (result == null) {
                synchronized(this) {
                       result = services;
                    if(result == null) {
                        services = result = new QueryServicesImpl( getDefaultProps().addAll(properties) );
                    }
                }
            }
            return result;
        } finally {
            unlock(LockMode.READ);
        }
    }
    

恩,到目前为止,自定义的参数能够传进去了。这个时候我实验了下,还是没有设置成功!


第二次尝试:

继续看QueryServicesImpl是怎么处理我们传进去的自定义的 配置的。竟然是这样的:

super(defaultProps, QueryServicesOptions.withDefaults())

QueryServiceOptions也是默认的。传入的defaultProps啥用都没有。ok,我们改造下:

super(defaultProps, QueryServicesOptions.withDefaults().setAll(defaultProps));

这下看起来对了,在实验下,还是没有成功!这个坑真深。再看下setAll方法,看到ReadOnlyProps对象。发现内部包了propsoverrideProps。 同时isEmptyasMapiterator实现的时候都只考虑到了props, 但是我们可以看到getRaw方法其实把两个map都考虑到了,我姑且认为实现这个类的人,忘记了修正asMapiteratorisEmpty,那么我们自己来修改吧,实现好的如下:

public Map<String,String> asMap() {
    Map<String, String> mergedMap = new HashMap<String, String>();
    mergedMap.putAll(props);
    mergedMap.putAll(overrideProps);
    return mergedMap;
}

@Override
public Iterator<Entry<String, String>> iterator() {
    return this.asMap().entrySet().iterator();
}

public boolean isEmpty() {
    return props.isEmpty() && overrideProps.isEmpty();
}

好的,再实验一下,终于好了~~~~~

可配置参数请参考QueryServices,附带使用方式:

Properties properties = new Properties();
properties.setProperty(QueryServices.THREAD_POOL_SIZE_ATTRIB, "32");
conn = DriverManager.getConnection(phoenixUrl, properties);

那么现在也修改下HConnection的连接数。

HConnection是由HConnectionFactory创建,实际也是用了HConnectionManager。

一路跟下去,可以看到ConnectionManager管理了一个pool,通过getBatchPool可以看到参数有:

hbase.hconnection.threads.max
hbase.hconnection.threads.core

默认都是256。

好,那我们修改下,都改成 128:

properties.setProperty(QueryServices.HCONNECTION_POOL_CORE_SIZE, "128");
properties.setProperty(QueryServices.HCONNECTION_POOL_MAX_SIZE, "128");

最后都搞定了!


回过头来,既然已经解决问题了,就了解了下这两个线程池都是干嘛用的。

  1. phoenix-1-thread 用来:

    缓存了HTable 和 HRegionLocation 的关系, Callable 任务是 thrift 的RPC调用。

  2. HConnection 线程池用来:

    缓存复用,减少创建开销。