博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ceph中用到的压缩引擎
阅读量:4216 次
发布时间:2019-05-26

本文共 3256 字,大约阅读时间需要 10 分钟。

ceph可以默认支持snappy/zlib/zstd,如果要支持lz4的话,必须打开宏HAVE_LZ4set(compressor_srcs   Compressor.cc)add_library(compressor_objs OBJECT ${compressor_srcs})## compressor pluginsset(compressor_plugin_dir ${CMAKE_INSTALL_PKGLIBDIR}/compressor)add_subdirectory(snappy)add_subdirectory(zlib)add_subdirectory(zstd)if (HAVE_LZ4)  add_subdirectory(lz4)endif()从源码压缩又分为同步和异步压缩两种,其中异步压缩也是调用同步压缩的接口,只是不去等待结果就返回了这里我们以异步压缩为例AsyncCompressor::AsyncCompressor(CephContext *c):  #这里调用同步接口conpressor新建一个压缩engine  compressor(Compressor::create(c, c->_conf->async_compressor_type)), cct(c),  #新建一个thread pool 用来处理压缩和解压  compress_tp(cct, "AsyncCompressor::compressor_tp", "tp_async_compr", cct->_conf->async_compressor_threads, "async_compressor_threads"),  #定义锁  job_lock("AsyncCompressor::job_lock"),  #定义一个workqueue,要与要压缩和解压的工作  compress_wq(this, c->_conf->async_compressor_thread_timeout, c->_conf->async_compressor_thread_suicide_timeout, &compress_tp) {}在其init 函数中启动压缩用的thread poolvoid AsyncCompressor::init(){  ldout(cct, 10) << __func__ << dendl;  compress_tp.start();}调用如下接口开始压缩uint64_t AsyncCompressor::async_compress(bufferlist &data){  uint64_t id = ++job_id;  pair
::iterator, bool> it; { Mutex::Locker l(job_lock); #每个压缩的数据对应一个id it = jobs.insert(make_pair(id, Job(id, true))); it.first->second.data = data; } #可以看到这里讲要压缩的数据组成job 放到wq中就算返回了,并没有等待压缩完成在返回,这就是同步和异步压缩的最大的区别 compress_wq.queue(&it.first->second); ldout(cct, 10) << __func__ << " insert async compress job id=" << id << dendl; return id;} return id;}要获取压缩后的数据可以调用下面的接口来得到int AsyncCompressor::get_compress_data(uint64_t compress_id, bufferlist &data, bool blocking, bool *finished){ assert(finished); Mutex::Locker l(job_lock); #根据开始压缩时候的id来查找压缩后的数据 unordered_map
::iterator it = jobs.find(compress_id); #如果没有找到id则返回error if (it == jobs.end() || !it->second.is_compress) { ldout(cct, 10) << __func__ << " missing to get compress job id=" << compress_id << dendl; return -ENOENT; } retry: auto status = it->second.status.load(); #压缩已经完成,则返回数据,并删除这个job if (status == status_t::DONE) { ldout(cct, 20) << __func__ << " successfully getting compressed data, job id=" << compress_id << dendl; *finished = true; data.swap(it->second.data); jobs.erase(it); #压缩 失败,也删除这个job,返回error } else if (status == status_t::ERROR) { ldout(cct, 20) << __func__ << " compressed data failed, job id=" << compress_id << dendl; jobs.erase(it); return -EIO; } else if (blocking) { auto expected = status_t::WAIT; if (it->second.status.compare_exchange_strong(expected, status_t::DONE)) { ldout(cct, 10) << __func__ << " compress job id=" << compress_id << " hasn't finished, abort!"<< dendl; #正在被block中,如果没有abort,则调用同步压缩接口等待压缩完成,这个时候异步压缩退化为同步压缩 if (compressor->compress(it->second.data, data)) { ldout(cct, 1) << __func__ << " compress job id=" << compress_id << " failed!"<< dendl; it->second.status = status_t::ERROR; return -EIO; } *finished = true; } else { #等待1s在检查数据是否压缩完成 job_lock.Unlock(); usleep(1000); job_lock.Lock(); goto retry; } } else { ldout(cct, 10) << __func__ << " compress job id=" << compress_id << " hasn't finished."<< dendl; *finished = false; } return 0;}

转载地址:http://ojnmi.baihongyu.com/

你可能感兴趣的文章
UIAlertController样式集合
查看>>
数据结构之—图
查看>>
计算机网络基础
查看>>
C++中内存(堆和栈)
查看>>
循环队列
查看>>
网络基础知识点总结1
查看>>
操作系统知识点总结1
查看>>
C++常见知识点总结
查看>>
Linux知识点小结One
查看>>
数据库知识点小结
查看>>
指针、内存和字节
查看>>
设计模式中类的关系
查看>>
策略模式、代理模式
查看>>
单例模式(singleton),工厂方法模式(factory),门面模式(facade)
查看>>
抽象模式,适配器模式(Adapter),模板方法模式(Template method)
查看>>
建造者模式(builder),桥梁模式(bridge mode),命令模式(Command mode)
查看>>
装饰模式(Decorator),迭代器模式(Iterator),组合模式(composite)
查看>>
观察者模式(Observer),责任链模式,访问者模式(Visitor)
查看>>
状态模式(State)
查看>>
堆、归并排序
查看>>