注册

Lucene源码系列:正排索引文件构建

背景


Lucene中根据term字典和倒排可以快速查找到相关文档的id,那怎么获取文档中的字段内容呢,这就是我们今天要讲的正排数据。


Lucene中对于某个文档的各个字段,可以通过配置来控制是否要存储进正排索引文件中,只有存储到正排索引文件中,查询的时候,有需要才能返回相应的字段。


如果理解了之前介绍过的词向量的索引文件构建,可以发现其实正排索引文件和词向量索引文件构建非常类似。


最终生成的正排索引文件有3个:



  • fdt:按chunk存储doc的开启了store的字段
  • fdx:chunk的索引文件,记录的是每个chunk的起始docID,以及每个chunk的起始位置,方便根据docID快速定位到chunk。
  • fdm:正排索引文件的元信息,用来读取正排索引文件使用的。

前置知识



索引文件格式


fdm


fdm是正排索引文件的元信息,用来读取的时候使用。


fdm.png


字段详解


Header

文件头部信息,主要是包括:



  • 文件头魔数(同一lucene版本所有文件相同)
  • 该文件使用的codec名称:Lucene90FieldsIndexMeta
  • codec版本
  • segment id(也是Segment_N文件中的N)
  • segment后缀名(一般为空)

ChunkSize

用来判断是否满足一个chunk的一种条件,如果chunk的大小超过了ChunkSize的限制,则可以构建一个chunk


NumDocs

doc总数


BlockShift

DirectMonotonicWriter需要的参数,DirectMonotonicWriter压缩存储会生成多个block,BlockShift决定了block的大小。


TotalChunks + 1

chunk总数 + 1,在生成fdx索引文件中ChunkStartDocIDs和ChunkOffsets两个字段时,使用DirectMonotonicWriter写入的值的总数。


fdxDocStartFP

fdx索引文件中ChunkStartDocIDs的起始位置


DocBlockMetas

fdx索引文件中ChunkStartDocIDs使用DirectMonotonicWriter编码存储,会生成多个block,这些block的元信息。


tvxOffsetStartFP

fdx中ChunkOffsets的起始位置


OffsetBlockMetas

fdx索引文件中ChunkOffsets使用DirectMonotonicWriter编码存储,会生成多个block,这些block的元信息。


SPEndPoint

fdx文件的结束位置,后面是fdx的footer信息。


MaxPointer

fdt文件的结束位置,后面fdt的footer信息。


NumChunks

chunk总数


NumDirtyChunks

dirtyChunk总数


NumDirtyDocs

dirtyChunk中的doc总数


Footer

文件尾,主要包括



  • 文件尾魔数(同一个lucene版本所有文件一样)
  • 0
  • 校验码

fdt


fdt中按chunk存储各个doc所有的字段数据。


fdt.png


字段详解


Header

文件头部信息,主要是包括:




  • 文件头魔数(同一lucene版本所有文件相同)




  • 该文件使用的codec名称


    根据压缩模式的不同有两种:



    • Lucene90StoredFieldsFastData
    • Lucene90StoredFieldsHighData



  • codec版本




  • segment id(也是Segment_N文件中的N)




  • segment后缀名(一般为空)




chunk


  • DocBase:Chunk中Doc的起始编号,Chunk中所有doc的真实编号需要加上这个DocBase
  • NumDocsCode:是NumDocs和IsDirty,IsSlice的int组合体

    • NumDocs:chunk中的doc总数
    • IsDirty:chunk是否是dirtyChunk
    • IsSlice:chunk是否被分成多个slice


  • DocNumFields:chunk中每个文档的字段个数。
  • DocDataLengths:chunk中每个doc占用的存储空间大小。
  • Doc:doc中每个store字段的信息

    • Field:store的字段

      • FieldNumAndTypeCode:FieldNumber和type的long组合体

        • FieldNumber:字段的编号
        • type:字段的类型


      • FieldValue:根据不同的数值类型,有不同的存储方式

        • 二进制/string:先存length,再存数据
        • byte/short/int:zint存储
        • long:tlong存储
        • float:zfloat存储
        • double:zdouble存储







Footer

文件尾,主要包括



  • 文件尾魔数(同一个lucene版本所有文件一样)
  • 0
  • 校验码

fdx


fdt中所有chunk的索引信息,可以快速根据docID定位到chunk的位置。


fdx.png


字段详解


Header

文件头部信息,主要是包括:



  • 文件头魔数(同一lucene版本所有文件相同)
  • 该文件使用的codec名称:Lucene90FieldsIndexIdx
  • codec版本
  • segment id(也是Segment_N文件中的N)
  • segment后缀名(一般为空)

ChunkStartDocIDs

所有chunk的起始docID,使用DirectMonotonicWriter编码存储,会生成多个block。


ChunkOffsets

所有chunk在fdt索引文件中的起始位置,使用DirectMonotonicWriter编码存储,会生成多个block。


Footer

文件尾,主要包括



  • 文件尾魔数(同一个lucene版本所有文件一样)
  • 0
  • 校验码

构建源码


本文源码解析基于lucene-core-9.1.0。



  • StoredFieldsConsumer负责调度正排索引文件的构建主要有:启动一个doc的处理,处理doc中的field,结束一个doc的处理,结束正排索引的构建。
  • Lucene90CompressingStoredFieldsWriter负责持久化生成正排索引文件

构建涉及到的StoredFieldsConsumer和Lucene90CompressingStoredFieldsWriter逻辑其实非常简单,下面我们一起来看下。


StoredFieldsConsumer


class StoredFieldsConsumer {
final Codec codec;
// 索引文件的目录
final Directory directory;
final SegmentInfo info;
// 实现类是 Lucene90CompressingStoredFieldsWriter,负责正排索引文件的持久化
StoredFieldsWriter writer;
Accountable accountable = Accountable.NULL_ACCOUNTABLE;
// 前一个处理的docID
private int lastDoc;

StoredFieldsConsumer(Codec codec, Directory directory, SegmentInfo info) {
this.codec = codec;
this.directory = directory;
this.info = info;
this.lastDoc = -1;
}

// 创建 Lucene90CompressingStoredFieldsWriter
protected void initStoredFieldsWriter() throws IOException {
if (writer == null) {
this.writer = codec.storedFieldsFormat().fieldsWriter(directory, info, IOContext.DEFAULT);
accountable = writer;
}
}

// 开始处理一个doc
void startDocument(int docID) throws IOException {
assert lastDoc < docID;
initStoredFieldsWriter();
while (++lastDoc < docID) { // 确保doc是连续的
writer.startDocument();
writer.finishDocument();
}
// Lucene90CompressingStoredFieldsWriter中开始处理doc
writer.startDocument();
}

// 每个需要构建的正排字段都会被处理
void writeField(FieldInfo info, IndexableField field) throws IOException {
writer.writeField(info, field);
}

// 结束doc的处理
void finishDocument() throws IOException {
writer.finishDocument();
}

// 结束正排的构建
void finish(int maxDoc) throws IOException {
while (lastDoc < maxDoc - 1) {
startDocument(lastDoc);
finishDocument();
++lastDoc;
}
}

// 持久化正排索引文件
void flush(SegmentWriteState state, Sorter.DocMap sortMap) throws IOException {
try {
writer.finish(state.segmentInfo.maxDoc());
} finally {
IOUtils.close(writer);
}
}

void abort() {
IOUtils.closeWhileHandlingException(writer);
}
}

Lucene90CompressingStoredFieldsWriter


成员变量


  // 数据文件
public static final String FIELDS_EXTENSION = "fdt";
// 索引文件
public static final String INDEX_EXTENSION = "fdx";
// 元信息文件
public static final String META_EXTENSION = "fdm";
/** Codec name for the index. */
public static final String INDEX_CODEC_NAME = "Lucene90FieldsIndex";

// 不同数据类型编码
static final int STRING = 0x00;
static final int BYTE_ARR = 0x01;
static final int NUMERIC_INT = 0x02;
static final int NUMERIC_FLOAT = 0x03;
static final int NUMERIC_LONG = 0x04;
static final int NUMERIC_DOUBLE = 0x05;
// 数据类型编码的bit数量
static final int TYPE_BITS = PackedInts.bitsRequired(NUMERIC_DOUBLE);
// 提取类型的掩码
static final int TYPE_MASK = (int) PackedInts.maxValue(TYPE_BITS);

static final int VERSION_START = 1;
static final int VERSION_CURRENT = VERSION_START;
static final int META_VERSION_START = 0;

private final String segment;
// 索引生成工具
private FieldsIndexWriter indexWriter;
private IndexOutput metaStream, fieldsStream;

private Compressor compressor;
private final CompressionMode compressionMode;
// chunk的大小
private final int chunkSize;
// 每个chunk最多可以存储多少个doc
private final int maxDocsPerChunk;

// 缓存所有的字段的值
private final ByteBuffersDataOutput bufferedDocs;
// 下标是当前chunk中的docID的偏移量,值是对应doc的字段个数
private int[] numStoredFields;
// 下标是当前chunk中的docID的偏移量,值是对应doc的所有需要store的数据在bufferedDocs中的结束位置
private int[] endOffsets;
// chunk中的起始docID
private int docBase;
// chunk中的doc个数
private int numBufferedDocs;
// chunk总数
private long numChunks;
// dirtyChunk总数,未满足生成chunk的条件时,强制生成的chunk是dirtyChunk
private long numDirtyChunks;
// dirtyDoc总数,dirtyChunk中的doc是dirtyDoc
private long numDirtyDocs;

// 在处理一个doc的时候,统计已经处理的field个数
private int numStoredFieldsInDoc;

核心方法


开始处理一个doc

当前实现中是空操作。


  @Override
public void startDocument() throws IOException {}

处理一个field

处理一个field,就是读取field的值,根据值的类型按对应的值的存储方式存入bufferedDocs缓存中。


  public void writeField(FieldInfo info, IndexableField field) throws IOException {

++numStoredFieldsInDoc;

int bits = 0;
final BytesRef bytes;
final String string;

Number number = field.numericValue();
if (number != null) { // 如果是数值类型
if (number instanceof Byte || number instanceof Short || number instanceof Integer) {
// byte,short,int都标记为int
bits = NUMERIC_INT;
} else if (number instanceof Long) {
// long
bits = NUMERIC_LONG;
} else if (number instanceof Float) {
// float
bits = NUMERIC_FLOAT;
} else if (number instanceof Double) {
// double
bits = NUMERIC_DOUBLE;
} else {
throw new IllegalArgumentException("cannot store numeric type " + number.getClass());
}
string = null;
bytes = null;
} else {
bytes = field.binaryValue();
if (bytes != null) { // 是二进制
bits = BYTE_ARR;
string = null;
} else { // 是字符串
bits = STRING;
string = field.stringValue();
if (string == null) {
throw new IllegalArgumentException(
"field "
+ field.name()
+ " is stored but does not have binaryValue, stringValue nor numericValue");
}
}
}
// 字段的编号和类型组合体
final long infoAndBits = (((long) info.number) << TYPE_BITS) | bits;
bufferedDocs.writeVLong(infoAndBits);

if (bytes != null) {
bufferedDocs.writeVInt(bytes.length);
bufferedDocs.writeBytes(bytes.bytes, bytes.offset, bytes.length);
} else if (string != null) {
bufferedDocs.writeString(string);
} else {
if (number instanceof Byte || number instanceof Short || number instanceof Integer) {
bufferedDocs.writeZInt(number.intValue());
} else if (number instanceof Long) {
writeTLong(bufferedDocs, number.longValue());
} else if (number instanceof Float) {
writeZFloat(bufferedDocs, number.floatValue());
} else if (number instanceof Double) {
writeZDouble(bufferedDocs, number.doubleValue());
} else {
throw new AssertionError("Cannot get here");
}
}
}

结束处理一个doc

结束doc的处理,需要做4件事:



  • 如果numBufferedDocs空间不足了,需要扩容
  • 记录doc对应的field个数
  • 记录doc数据在bufferedDocs中的结束位置
  • 判断如果满足一个chunk的生成,则生成chunk

  public void finishDocument() throws IOException {
if (numBufferedDocs == this.numStoredFields.length) {
final int newLength = ArrayUtil.oversize(numBufferedDocs + 1, 4);
this.numStoredFields = ArrayUtil.growExact(this.numStoredFields, newLength);
endOffsets = ArrayUtil.growExact(endOffsets, newLength);
}
// 记录doc对应的field个数
this.numStoredFields[numBufferedDocs] = numStoredFieldsInDoc;
numStoredFieldsInDoc = 0;
// 记录当前doc在bufferedDocs中的结束位置
endOffsets[numBufferedDocs] = Math.toIntExact(bufferedDocs.size());
++numBufferedDocs;
if (triggerFlush()) {
flush(false);
}
}

生成一个chunk

生成一个chunk的条件:



  • bufferDocs缓存超出了chunkSize
  • chunk中收集的doc数量超出了maxDocsPerChunk
  • 强制生成

  // 生成一个chunk的条件
// 1.bufferDocs缓存超出了chunkSize
// 2.chunk中收集的doc数量超出了maxDocsPerChunk
private boolean triggerFlush() {
return bufferedDocs.size() >= chunkSize || numBufferedDocs >= maxDocsPerChunk;
}

private void flush(boolean force) throws IOException {
// chunk数+1
numChunks++;
if (force) { // 如果是强制构建chunk,可能是不满足chunk条件的,这种chunk被定义为dirtyChunk
numDirtyChunks++;
numDirtyDocs += numBufferedDocs;
}
// 生成chunk的索引
indexWriter.writeIndex(numBufferedDocs, fieldsStream.getFilePointer());

// 把各个doc在bufferedDocs中的endOffsets转成length
final int[] lengths = endOffsets;
for (int i = numBufferedDocs - 1; i > 0; --i) {
lengths[i] = endOffsets[i] - endOffsets[i - 1];
}
// 如果当前chunk的大小超出了2倍chunkSize,则需要分片
final boolean sliced = bufferedDocs.size() >= 2 * chunkSize;
final boolean dirtyChunk = force;
writeHeader(docBase, numBufferedDocs, numStoredFields, lengths, sliced, dirtyChunk);

// 下面是压缩处理
byte[] content = bufferedDocs.toArrayCopy();
bufferedDocs.reset();

if (sliced) {
// big chunk, slice it
for (int compressed = 0; compressed < content.length; compressed += chunkSize) {
compressor.compress(
content, compressed, Math.min(chunkSize, content.length - compressed), fieldsStream);
}
} else {
compressor.compress(content, 0, content.length, fieldsStream);
}

// 更新下一个chunk的起始docID
docBase += numBufferedDocs;
// 重置doc数统计
numBufferedDocs = 0;
bufferedDocs.reset();
}

private static void saveInts(int[] values, int length, DataOutput out) throws IOException {
if (length == 1) {
out.writeVInt(values[0]);
} else {
StoredFieldsInts.writeInts(values, 0, length, out);
}
}

private void writeHeader(
int docBase,
int numBufferedDocs,
int[] numStoredFields,
int[] lengths,
boolean sliced,
boolean dirtyChunk)
throws IOException {
final int slicedBit = sliced ? 1 : 0;
final int dirtyBit = dirtyChunk ? 2 : 0;
// save docBase and numBufferedDocs
fieldsStream.writeVInt(docBase);
fieldsStream.writeVInt((numBufferedDocs << 2) | dirtyBit | slicedBit);

// save numStoredFields
saveInts(numStoredFields, numBufferedDocs, fieldsStream);

// save lengths
saveInts(lengths, numBufferedDocs, fieldsStream);
}

结束构建

结束构建的时候最重要的就是生成fdx索引文件。


  public void finish(int numDocs) throws IOException {
if (numBufferedDocs > 0) { // 如果还有未处理的doc,强制生成一个chunk
flush(true);
} else {
assert bufferedDocs.size() == 0;
}
if (docBase != numDocs) {
throw new RuntimeException(
"Wrote " + docBase + " docs, finish called with numDocs=" + numDocs);
}
// 构建fdx文件
indexWriter.finish(numDocs, fieldsStream.getFilePointer(), metaStream);
// 记录一些元信息
metaStream.writeVLong(numChunks);
metaStream.writeVLong(numDirtyChunks);
metaStream.writeVLong(numDirtyDocs);
CodecUtil.writeFooter(metaStream);
CodecUtil.writeFooter(fieldsStream);
assert bufferedDocs.size() == 0;
}
}

作者:沧叔解码
链接:https://juejin.cn/post/7166898028941410312
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

0 个评论

要回复文章请先登录注册