Lucene源码系列:正排索引文件构建
背景
Lucene中根据term字典和倒排可以快速查找到相关文档的id,那怎么获取文档中的字段内容呢,这就是我们今天要讲的正排数据。
Lucene中对于某个文档的各个字段,可以通过配置来控制是否要存储进正排索引文件中,只有存储到正排索引文件中,查询的时候,有需要才能返回相应的字段。
如果理解了之前介绍过的词向量的索引文件构建,可以发现其实正排索引文件和词向量索引文件构建非常类似。
最终生成的正排索引文件有3个:
- fdt:按chunk存储doc的开启了store的字段
- fdx:chunk的索引文件,记录的是每个chunk的起始docID,以及每个chunk的起始位置,方便根据docID快速定位到chunk。
- fdm:正排索引文件的元信息,用来读取正排索引文件使用的。
前置知识
- 在对字段数据的存储时,对不同的数据类型,有不同的压缩算法,详见《单值编码压缩算法》。
- 构建chunk索引文件的时候会用到工具类FieldsIndexWriter,具体我们已经在《词向量索引文件构建》中详细介绍过了。
索引文件格式
fdm
fdm是正排索引文件的元信息,用来读取的时候使用。
字段详解
Header
文件头部信息,主要是包括:
- 文件头魔数(同一lucene版本所有文件相同)
- 该文件使用的codec名称:Lucene90FieldsIndexMeta
- codec版本
- segment id(也是Segment_N文件中的N)
- segment后缀名(一般为空)
ChunkSize
用来判断是否满足一个chunk的一种条件,如果chunk的大小超过了ChunkSize的限制,则可以构建一个chunk
NumDocs
doc总数
BlockShift
DirectMonotonicWriter需要的参数,DirectMonotonicWriter压缩存储会生成多个block,BlockShift决定了block的大小。
TotalChunks + 1
chunk总数 + 1,在生成fdx索引文件中ChunkStartDocIDs和ChunkOffsets两个字段时,使用DirectMonotonicWriter写入的值的总数。
fdxDocStartFP
fdx索引文件中ChunkStartDocIDs的起始位置
DocBlockMetas
fdx索引文件中ChunkStartDocIDs使用DirectMonotonicWriter编码存储,会生成多个block,这些block的元信息。
tvxOffsetStartFP
fdx中ChunkOffsets的起始位置
OffsetBlockMetas
fdx索引文件中ChunkOffsets使用DirectMonotonicWriter编码存储,会生成多个block,这些block的元信息。
SPEndPoint
fdx文件的结束位置,后面是fdx的footer信息。
MaxPointer
fdt文件的结束位置,后面fdt的footer信息。
NumChunks
chunk总数
NumDirtyChunks
dirtyChunk总数
NumDirtyDocs
dirtyChunk中的doc总数
Footer
文件尾,主要包括
- 文件尾魔数(同一个lucene版本所有文件一样)
- 0
- 校验码
fdt
fdt中按chunk存储各个doc所有的字段数据。
字段详解
Header
文件头部信息,主要是包括:
文件头魔数(同一lucene版本所有文件相同)
该文件使用的codec名称
根据压缩模式的不同有两种:
- Lucene90StoredFieldsFastData
- Lucene90StoredFieldsHighData
codec版本
segment id(也是Segment_N文件中的N)
segment后缀名(一般为空)
chunk
- DocBase:Chunk中Doc的起始编号,Chunk中所有doc的真实编号需要加上这个DocBase
- NumDocsCode:是NumDocs和IsDirty,IsSlice的int组合体
- NumDocs:chunk中的doc总数
- IsDirty:chunk是否是dirtyChunk
- IsSlice:chunk是否被分成多个slice
- DocNumFields:chunk中每个文档的字段个数。
- DocDataLengths:chunk中每个doc占用的存储空间大小。
- Doc:doc中每个store字段的信息
- Field:store的字段
- FieldNumAndTypeCode:FieldNumber和type的long组合体
- FieldNumber:字段的编号
- type:字段的类型
- FieldValue:根据不同的数值类型,有不同的存储方式
- 二进制/string:先存length,再存数据
- byte/short/int:zint存储
- long:tlong存储
- float:zfloat存储
- double:zdouble存储
Footer
文件尾,主要包括
- 文件尾魔数(同一个lucene版本所有文件一样)
- 0
- 校验码
fdx
fdt中所有chunk的索引信息,可以快速根据docID定位到chunk的位置。
字段详解
Header
文件头部信息,主要是包括:
- 文件头魔数(同一lucene版本所有文件相同)
- 该文件使用的codec名称:Lucene90FieldsIndexIdx
- codec版本
- segment id(也是Segment_N文件中的N)
- segment后缀名(一般为空)
ChunkStartDocIDs
所有chunk的起始docID,使用DirectMonotonicWriter编码存储,会生成多个block。
ChunkOffsets
所有chunk在fdt索引文件中的起始位置,使用DirectMonotonicWriter编码存储,会生成多个block。
Footer
文件尾,主要包括
- 文件尾魔数(同一个lucene版本所有文件一样)
- 0
- 校验码
构建源码
本文源码解析基于lucene-core-9.1.0。
- StoredFieldsConsumer负责调度正排索引文件的构建主要有:启动一个doc的处理,处理doc中的field,结束一个doc的处理,结束正排索引的构建。
- Lucene90CompressingStoredFieldsWriter负责持久化生成正排索引文件
构建涉及到的StoredFieldsConsumer和Lucene90CompressingStoredFieldsWriter逻辑其实非常简单,下面我们一起来看下。
StoredFieldsConsumer
class StoredFieldsConsumer {
final Codec codec;
// 索引文件的目录
final Directory directory;
final SegmentInfo info;
// 实现类是 Lucene90CompressingStoredFieldsWriter,负责正排索引文件的持久化
StoredFieldsWriter writer;
Accountable accountable = Accountable.NULL_ACCOUNTABLE;
// 前一个处理的docID
private int lastDoc;
StoredFieldsConsumer(Codec codec, Directory directory, SegmentInfo info) {
this.codec = codec;
this.directory = directory;
this.info = info;
this.lastDoc = -1;
}
// 创建 Lucene90CompressingStoredFieldsWriter
protected void initStoredFieldsWriter() throws IOException {
if (writer == null) {
this.writer = codec.storedFieldsFormat().fieldsWriter(directory, info, IOContext.DEFAULT);
accountable = writer;
}
}
// 开始处理一个doc
void startDocument(int docID) throws IOException {
assert lastDoc < docID;
initStoredFieldsWriter();
while (++lastDoc < docID) { // 确保doc是连续的
writer.startDocument();
writer.finishDocument();
}
// Lucene90CompressingStoredFieldsWriter中开始处理doc
writer.startDocument();
}
// 每个需要构建的正排字段都会被处理
void writeField(FieldInfo info, IndexableField field) throws IOException {
writer.writeField(info, field);
}
// 结束doc的处理
void finishDocument() throws IOException {
writer.finishDocument();
}
// 结束正排的构建
void finish(int maxDoc) throws IOException {
while (lastDoc < maxDoc - 1) {
startDocument(lastDoc);
finishDocument();
++lastDoc;
}
}
// 持久化正排索引文件
void flush(SegmentWriteState state, Sorter.DocMap sortMap) throws IOException {
try {
writer.finish(state.segmentInfo.maxDoc());
} finally {
IOUtils.close(writer);
}
}
void abort() {
IOUtils.closeWhileHandlingException(writer);
}
}
Lucene90CompressingStoredFieldsWriter
成员变量
// 数据文件
public static final String FIELDS_EXTENSION = "fdt";
// 索引文件
public static final String INDEX_EXTENSION = "fdx";
// 元信息文件
public static final String META_EXTENSION = "fdm";
/** Codec name for the index. */
public static final String INDEX_CODEC_NAME = "Lucene90FieldsIndex";
// 不同数据类型编码
static final int STRING = 0x00;
static final int BYTE_ARR = 0x01;
static final int NUMERIC_INT = 0x02;
static final int NUMERIC_FLOAT = 0x03;
static final int NUMERIC_LONG = 0x04;
static final int NUMERIC_DOUBLE = 0x05;
// 数据类型编码的bit数量
static final int TYPE_BITS = PackedInts.bitsRequired(NUMERIC_DOUBLE);
// 提取类型的掩码
static final int TYPE_MASK = (int) PackedInts.maxValue(TYPE_BITS);
static final int VERSION_START = 1;
static final int VERSION_CURRENT = VERSION_START;
static final int META_VERSION_START = 0;
private final String segment;
// 索引生成工具
private FieldsIndexWriter indexWriter;
private IndexOutput metaStream, fieldsStream;
private Compressor compressor;
private final CompressionMode compressionMode;
// chunk的大小
private final int chunkSize;
// 每个chunk最多可以存储多少个doc
private final int maxDocsPerChunk;
// 缓存所有的字段的值
private final ByteBuffersDataOutput bufferedDocs;
// 下标是当前chunk中的docID的偏移量,值是对应doc的字段个数
private int[] numStoredFields;
// 下标是当前chunk中的docID的偏移量,值是对应doc的所有需要store的数据在bufferedDocs中的结束位置
private int[] endOffsets;
// chunk中的起始docID
private int docBase;
// chunk中的doc个数
private int numBufferedDocs;
// chunk总数
private long numChunks;
// dirtyChunk总数,未满足生成chunk的条件时,强制生成的chunk是dirtyChunk
private long numDirtyChunks;
// dirtyDoc总数,dirtyChunk中的doc是dirtyDoc
private long numDirtyDocs;
// 在处理一个doc的时候,统计已经处理的field个数
private int numStoredFieldsInDoc;
核心方法
开始处理一个doc
当前实现中是空操作。
@Override
public void startDocument() throws IOException {}
处理一个field
处理一个field,就是读取field的值,根据值的类型按对应的值的存储方式存入bufferedDocs缓存中。
public void writeField(FieldInfo info, IndexableField field) throws IOException {
++numStoredFieldsInDoc;
int bits = 0;
final BytesRef bytes;
final String string;
Number number = field.numericValue();
if (number != null) { // 如果是数值类型
if (number instanceof Byte || number instanceof Short || number instanceof Integer) {
// byte,short,int都标记为int
bits = NUMERIC_INT;
} else if (number instanceof Long) {
// long
bits = NUMERIC_LONG;
} else if (number instanceof Float) {
// float
bits = NUMERIC_FLOAT;
} else if (number instanceof Double) {
// double
bits = NUMERIC_DOUBLE;
} else {
throw new IllegalArgumentException("cannot store numeric type " + number.getClass());
}
string = null;
bytes = null;
} else {
bytes = field.binaryValue();
if (bytes != null) { // 是二进制
bits = BYTE_ARR;
string = null;
} else { // 是字符串
bits = STRING;
string = field.stringValue();
if (string == null) {
throw new IllegalArgumentException(
"field "
+ field.name()
+ " is stored but does not have binaryValue, stringValue nor numericValue");
}
}
}
// 字段的编号和类型组合体
final long infoAndBits = (((long) info.number) << TYPE_BITS) | bits;
bufferedDocs.writeVLong(infoAndBits);
if (bytes != null) {
bufferedDocs.writeVInt(bytes.length);
bufferedDocs.writeBytes(bytes.bytes, bytes.offset, bytes.length);
} else if (string != null) {
bufferedDocs.writeString(string);
} else {
if (number instanceof Byte || number instanceof Short || number instanceof Integer) {
bufferedDocs.writeZInt(number.intValue());
} else if (number instanceof Long) {
writeTLong(bufferedDocs, number.longValue());
} else if (number instanceof Float) {
writeZFloat(bufferedDocs, number.floatValue());
} else if (number instanceof Double) {
writeZDouble(bufferedDocs, number.doubleValue());
} else {
throw new AssertionError("Cannot get here");
}
}
}
结束处理一个doc
结束doc的处理,需要做4件事:
- 如果numBufferedDocs空间不足了,需要扩容
- 记录doc对应的field个数
- 记录doc数据在bufferedDocs中的结束位置
- 判断如果满足一个chunk的生成,则生成chunk
public void finishDocument() throws IOException {
if (numBufferedDocs == this.numStoredFields.length) {
final int newLength = ArrayUtil.oversize(numBufferedDocs + 1, 4);
this.numStoredFields = ArrayUtil.growExact(this.numStoredFields, newLength);
endOffsets = ArrayUtil.growExact(endOffsets, newLength);
}
// 记录doc对应的field个数
this.numStoredFields[numBufferedDocs] = numStoredFieldsInDoc;
numStoredFieldsInDoc = 0;
// 记录当前doc在bufferedDocs中的结束位置
endOffsets[numBufferedDocs] = Math.toIntExact(bufferedDocs.size());
++numBufferedDocs;
if (triggerFlush()) {
flush(false);
}
}
生成一个chunk
生成一个chunk的条件:
- bufferDocs缓存超出了chunkSize
- chunk中收集的doc数量超出了maxDocsPerChunk
- 强制生成
// 生成一个chunk的条件
// 1.bufferDocs缓存超出了chunkSize
// 2.chunk中收集的doc数量超出了maxDocsPerChunk
private boolean triggerFlush() {
return bufferedDocs.size() >= chunkSize || numBufferedDocs >= maxDocsPerChunk;
}
private void flush(boolean force) throws IOException {
// chunk数+1
numChunks++;
if (force) { // 如果是强制构建chunk,可能是不满足chunk条件的,这种chunk被定义为dirtyChunk
numDirtyChunks++;
numDirtyDocs += numBufferedDocs;
}
// 生成chunk的索引
indexWriter.writeIndex(numBufferedDocs, fieldsStream.getFilePointer());
// 把各个doc在bufferedDocs中的endOffsets转成length
final int[] lengths = endOffsets;
for (int i = numBufferedDocs - 1; i > 0; --i) {
lengths[i] = endOffsets[i] - endOffsets[i - 1];
}
// 如果当前chunk的大小超出了2倍chunkSize,则需要分片
final boolean sliced = bufferedDocs.size() >= 2 * chunkSize;
final boolean dirtyChunk = force;
writeHeader(docBase, numBufferedDocs, numStoredFields, lengths, sliced, dirtyChunk);
// 下面是压缩处理
byte[] content = bufferedDocs.toArrayCopy();
bufferedDocs.reset();
if (sliced) {
// big chunk, slice it
for (int compressed = 0; compressed < content.length; compressed += chunkSize) {
compressor.compress(
content, compressed, Math.min(chunkSize, content.length - compressed), fieldsStream);
}
} else {
compressor.compress(content, 0, content.length, fieldsStream);
}
// 更新下一个chunk的起始docID
docBase += numBufferedDocs;
// 重置doc数统计
numBufferedDocs = 0;
bufferedDocs.reset();
}
private static void saveInts(int[] values, int length, DataOutput out) throws IOException {
if (length == 1) {
out.writeVInt(values[0]);
} else {
StoredFieldsInts.writeInts(values, 0, length, out);
}
}
private void writeHeader(
int docBase,
int numBufferedDocs,
int[] numStoredFields,
int[] lengths,
boolean sliced,
boolean dirtyChunk)
throws IOException {
final int slicedBit = sliced ? 1 : 0;
final int dirtyBit = dirtyChunk ? 2 : 0;
// save docBase and numBufferedDocs
fieldsStream.writeVInt(docBase);
fieldsStream.writeVInt((numBufferedDocs << 2) | dirtyBit | slicedBit);
// save numStoredFields
saveInts(numStoredFields, numBufferedDocs, fieldsStream);
// save lengths
saveInts(lengths, numBufferedDocs, fieldsStream);
}
结束构建
结束构建的时候最重要的就是生成fdx索引文件。
public void finish(int numDocs) throws IOException {
if (numBufferedDocs > 0) { // 如果还有未处理的doc,强制生成一个chunk
flush(true);
} else {
assert bufferedDocs.size() == 0;
}
if (docBase != numDocs) {
throw new RuntimeException(
"Wrote " + docBase + " docs, finish called with numDocs=" + numDocs);
}
// 构建fdx文件
indexWriter.finish(numDocs, fieldsStream.getFilePointer(), metaStream);
// 记录一些元信息
metaStream.writeVLong(numChunks);
metaStream.writeVLong(numDirtyChunks);
metaStream.writeVLong(numDirtyDocs);
CodecUtil.writeFooter(metaStream);
CodecUtil.writeFooter(fieldsStream);
assert bufferedDocs.size() == 0;
}
}
链接:https://juejin.cn/post/7166898028941410312
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。