本地缓存组件选型

需求: 记录系统执行数据,能持久化,容错。

Guava

官网就没有说有persistent的功能,直接略过。

ehcache

企业版需要缴费,然后持久化功能和保证重启可用的功能都需要缴费。

看看开源版ehcache的是否可以满足,现状是:可以在只是DiskStore相关配置,让cache把内容序列化后直接overflow到文件。
现在问题是:

重启

重启以后,是重新新建文件并覆盖同名文件—也就是之前同名cache生成的文件。尝试出使用自己实现的RebootCacheLoaderFactory implemets BootstrapCacheLoaderFactory可以来创建 RebootCacheLoader implements BootstrapCacheLoader来实现内容。

RootbootCacheLoaderFactory:

public class RebootCacheLoaderFactory extends BootstrapCacheLoaderFactory {
    @Override
    public BootstrapCacheLoader createBootstrapCacheLoader(Properties properties) {
        return new RebootCacheLoader();
    }
}

RebootCacheLoader:

public class RebootCacheLoader implements BootstrapCacheLoader {

    @Override
    public void load(Ehcache cache) throws CacheException {
        cache.put(new Element("haha"+Math.random(), Math.random()));
    }

    @Override
    public boolean isAsynchronous() {
        return true;
    }

    public Object clone(){
        return new RebootCacheLoader();
    }
}

Main:

String path = "/tmp/ehcache/";
String cacheManagerName = "cacheManagerName";
Long diskSizeBytes = 4 * 1024 * 1024 * 1024L;
String cache1Name = "40006_1232131231312";
String cache2Name = "40002_1232131231312";

Configuration configuration = new Configuration();
configuration.setName(cacheManagerName);
CacheConfiguration defaultCacheConfiguration = new CacheConfiguration();
defaultCacheConfiguration.overflowToOffHeap(false);
defaultCacheConfiguration.overflowToDisk(true);
defaultCacheConfiguration.maxEntriesLocalHeap(1); //0 is not limited, DiskStore, MemStore will all have the element; 1 means just the last element will be in memory.
defaultCacheConfiguration.maxBytesLocalDisk(2, MemoryUnit.MEGABYTES); //模糊的界定,异步写入,主线程结束了,还有一些记录没有记录的,会丢失数据。
defaultCacheConfiguration.addBootstrapCacheLoaderFactory(new CacheConfiguration.BootstrapCacheLoaderFactoryConfiguration().className(
            "com.daniel.test.RebootCacheLoaderFactory"
    ));

configuration.setDefaultCacheConfiguration(defaultCacheConfiguration);

CacheManager cacheManager = new CacheManager(configuration);

cacheManager.addCache(cache1Name);

cacheManager.addCache(cache1Name)方法里面clone一个defaultCache,然后改个名字,然后initialize cache完成之后,调用cache.bootstrap()方法:

public void bootstrap() {
    if (!disabled && bootstrapCacheLoader != null) {
        bootstrapCacheLoader.load(this);
    }

}

然后就可以load到自定义信息了,因为底层就是用了java自带序列化来存储Element对象,方法在:DiskStorageFactory.serializeElement()

持久化

从最近本的cache.put()方法不断debug进去,发现DiskStore.put()引用了segment.put(),然后Element的行记录的包装类PlaceHolder里面的install方法调用了如下内容:

public void installed() {
    DiskStorageFactory.this.schedule(new PersistentDiskWriteTask(this));
}

看着感觉很悲剧,果然看到里面是一个ScheduledThreadPoolExecutor在执行,实体是DiskStorageFactory里面的diskWriter,然后是默认生成的,还不能修改队列啥的:

diskWriter = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r, file.getName());
        t.setDaemon(false);
        return t;
    }
});

这个构造函数,消息队列是默认11个的PriorityQueue,所以机器中断可能导致数据还在queue里面没被写入到磁盘。这个组件不能用了!!!

OSCache

名声挺好的,搜了下,已经不维护了,略过

JCS

JCS是插件式管看着很棒,persistent功能介绍也很强大,符合我的需求,看了一眼bugfix清单,关于persistet的bug那么多,最新的版本还是beta的,完全不敢用,不想自找麻烦。

FQueue

转换下思路了。如果说持久化要做到可靠性最佳,理论上面就应该是同步插入数据到磁盘;如果还要保证消费的顺序性,想了一种方案,如下:

思路

虽然很开心的想出来了,但是可靠性却不能保证啊。我们的存储使用的是HBASE,scheme设计是:rowkey用index,所有相同index的记录都需要记录到一行记录里面。如果我再搞一个持久化的重试队列,Queue的Element先poll出来,push到retryQueue extends LinkedList,并在内存存储retryQueue的Element和位置的关系Map,然后等异步插入HBase数据完成之后,利用内存关系来做删除,这个时候有两个问题:

  1. fQueue使用是FIFO的Queue,如何在记录位置并删除呢,效率是否有影响?想象下多个线程同时操作一个文件。如果要做, 可能需要学ehcache的方式,在内存里面映射文件为一个map,做segment分段加锁,然后在内存中删除,同时交给一个只有一个线程的线程池异步删除对应文件数据,那这样最多就2个线程抢占一个文件,一个写入,一个删除;那么问题来了,因为是异步,所以可能存在crash的时候内存删除了,文件数据没删除的情况,对异常恢复造成影响,类似之后提到的2问题。

  2. 插入HBase的时候,机器crash了,我们不知道是否真的插入成功了,就需要在机器重启的时候重新插入,这个时候会在同一行记录里面append内容一样的column了。

换一种思路:
我每一个index下的内容都生成一个持久化文件,并在内存中存储一个mark标记,标记是否index文件生成完了,监听器监听 生成完的状态,然后交给一个线程处理这个index相关的一个文件,因为内容是顺序的,所以直接拼装成一条记录,并且写入,写入成功之后,删除这个index对应的文件。异常状况处理如下:

  1. index下内容生成一半,crash,重启以后首先消费掉这些数据并做插入到HBase,保证异常数据也能够被系统得到。

  2. 插入HBase时,crash,重启以后同样重复消费文件数据,构造一条记录,插入。如果上次已经插入通用的rowkey数据,那么HBase自己会做update,那么业务上面完全没有影响,就是DB方面多update了一次。

恩,现在感觉完美了。

FQueue默认文件大小是300M,用400个线程同时创建文件,ssd 硬盘,2.5GCPU的配置直接卡爆。然后调整为默认为1M。重写了三个类:FQueue.java, FSQueue.java, LogEntity.java

HMFQueue.java

public HMFQueue(String path) throws Exception {
    fsQueue = new HMFSQueue(path, 1024 * 1024 * 1); //默认是1M大小
}

@Override
//not thread safe, be careful , 添加了iterator的支持,需要修改FSQueue和LogEntity
public Iterator<byte[]> iterator() {
    try {
        fsQueue.curseToHead();
    } catch (IOException e) {
        log.error(" cannot open the file!",e );
        throw new UnsupportedOperationException("failed to move to head!");
    } catch (FileFormatException e) {
        log.error("failed to move to head! the file format is wrong",e );
        throw new UnsupportedOperationException("failed to move to head!");
    }

    return new Iterator<byte[]>() {

        @Override
         public boolean hasNext() {
            return fsQueue.hasNext();
        }

        @Override
        public byte[] next() {
            return fsQueue.readNextAndMove();
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException("remove operation is not supported here!");
        }
    };
}

HMFSQueue.java 添加如下方法支持文件的重头读取和遍历

//not thread safe 定位到文件的头部,更新index
public void curseToHead()  throws IOException, FileFormatException {
    int fileNum = -1;
    File dir = new File(path);
    if(dir.exists() && dir.isDirectory()){
        if(dir.listFiles() != null && dir.listFiles().length > 0){
            for(int i = 1; i < Integer.MAX_VALUE; i++){
                String fileName = path + fileSeparator + filePrefix + "data_" + i + ".idb";
                File file = new File(fileName);
                if(file.exists()){
                    fileNum = i;
                    break;
                }
            }
            if(fileNum != -1){
                if (writerHandle.getCurrentFileNumber() == fileNum) {
                    readerHandle = writerHandle;
                }else{
                    readerHandle = createLogEntity(path + fileSeparator + filePrefix + "data_" + fileNum + ".idb", db,
                            fileNum);
                }
                readerHandle.resetReaderPosition();
            }
        }
    }
}

//not thread safe 判断是否有下一条数据
public boolean hasNext(){
    byte[] b = null;
    try {
        b = readerHandle.readNext();
    } catch (FileEOFException e) {
        int nextfile = readerHandle.getNextFile();
        readerHandle.close();
        // 更新下一次读取的位置和索引
        db.putReaderPosition(HMLogEntity.messageStartPosition);
        db.putReaderIndex(nextfile);
        if (writerHandle.getCurrentFileNumber() == nextfile) {
            readerHandle = writerHandle;
        } else {
            try {
                readerHandle = createLogEntity(path + fileSeparator + filePrefix + "data_" + nextfile + ".idb", db,
                        nextfile);
            } catch (IOException e1) {
                log.error("failed to read file: " + path + fileSeparator + filePrefix + "data_" + nextfile + ".idb",e1);
            } catch (FileFormatException e1) {
                log.error("the file has wrong format: " + path + fileSeparator + filePrefix + "data_" + nextfile + ".idb", e1);
            }
        }
        try {
            b = readerHandle.readNext();
        } catch (FileEOFException e1) {
            log.error("read new log file FileEOFException error occurred",e1);
        }
    }
    if (b != null) {
        return true;
    }
    return false;
}

//not thread safe 读取下一个并且更新索引
public byte[] readNextAndMove(){
    byte[] b = null;
    try {
        b = readerHandle.readNextAndMove();
    } catch (FileEOFException e) {
        int nextfile = readerHandle.getNextFile();
        readerHandle.close();
        // 更新下一次读取的位置和索引
        db.putReaderPosition(HMLogEntity.messageStartPosition);
        db.putReaderIndex(nextfile);
        if (writerHandle.getCurrentFileNumber() == nextfile) {
            readerHandle = writerHandle;
        } else {
            try {
                readerHandle = createLogEntity(path + fileSeparator + filePrefix + "data_" + nextfile + ".idb", db,
                        nextfile);
            } catch (IOException e1) {
                log.error("failed to read file: " + path + fileSeparator + filePrefix + "data_" + nextfile + ".idb", e1);
            } catch (FileFormatException e1) {
                log.error("the file has wrong format: " + path + fileSeparator + filePrefix + "data_" + nextfile + ".idb", e1);
            }
        }
        try {
            b = readerHandle.readNextAndMove();
        } catch (FileEOFException e1) {
            log.error("read new log file FileEOFException error occurred",e1);
        }
    }
    return b;
}

//删除消息队列相关数据文件
public void delQueue() throws IOException, FileFormatException {
    FileUtils.deleteDirectory(new File(path));
}

HMLogEntity.java

public HMLogEntity(String path, LogIndex db, int fileNumber,
                 int fileLimitLength) throws IOException, FileFormatException {
    ........
    if (file.exists() == false) {
        createLogEntity();
        //FileRunner.addCreateFile(Integer.toString(fileNumber + 1)); //注释掉,如果文件切片为1M时候,提前创建有问题:由于是异步的,消费的时候会存在还没有创建成功,写入失败的情况。
    } else {
        ........
    }      
}

测试下性能,单线程写20W的数据花了9s;

但是多线程写入多个文件性能和稳定性可能有问题了:

  1. 400个线程,每个线程创建自己的文件,写入300条数据,遍历300条数据,删除自己创建写入的文件总共要6.1s,且一切正常。

  2. 400个线程,每个线程创建自己的文件,写入400条数据,遍历400条数据,删除自己创建写入的文件出现写入失败和读取失败的情况:240个线程写入完成,133个线程遍历完成,且没有异常抛出。

  3. 600个线程,每个线程创建自己的文件,写入300条数据,遍历300条数据,删除自己创建写入的文件出现写入失败和读取失败的情况:244个线程写入完成,160个线程遍历完成,且没有异常抛出。

1的情况已经是达到了临界值。2,3都会出现不稳定的情况,我去,这个是啥原因呢??百思不得其解。