From 04e6a38d6e268a8cba3857eb3855c8b9c57e7122 Mon Sep 17 00:00:00 2001 From: Patrick Zhai Date: Sat, 24 Jun 2023 18:21:56 -0700 Subject: [PATCH 1/4] Make grouping deterministic by allocate groups in LineFileDocs rather than IndexThreads Introduce DocGrouper to do grouping for text and binary based LFD --- src/main/perf/DocGrouper.java | 438 ++++++++++++++++++++++++++++++ src/main/perf/IndexThreads.java | 172 +++++------- src/main/perf/Indexer.java | 2 +- src/main/perf/LineFileDocs.java | 170 +++++------- src/main/perf/NRTPerfTest.java | 2 +- src/main/perf/SearchPerfTest.java | 2 +- 6 files changed, 576 insertions(+), 210 deletions(-) create mode 100644 src/main/perf/DocGrouper.java diff --git a/src/main/perf/DocGrouper.java b/src/main/perf/DocGrouper.java new file mode 100644 index 000000000..51a1c0b6c --- /dev/null +++ b/src/main/perf/DocGrouper.java @@ -0,0 +1,438 @@ +package perf; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static org.apache.lucene.tests.util.TestUtil.randomRealisticUnicodeString; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import org.apache.lucene.util.BytesRef; + +/** + * Consuming {@link perf.LineFileDocs.LineFileDoc}, group them and put the grouped docs into + * a thread-safe queue. + */ +public abstract class DocGrouper { + protected final int numDocs; + protected final int targetNumGroups; + protected final BlockingQueue outputQueue = new ArrayBlockingQueue<>(1024); + public static final DocGroups END = new DocGroups() { + @Override + public BytesRef getGroupId() { + return null; + } + + @Override + public LineFileDocs.LineFileDoc getNextLFD() { + return null; + } + + @Override + public int getNumOfDocsInGroup() { + return 0; + } + + @Override + public int getRemainingNumGroups() { + return 0; + } + }; + + public static BytesRef[] group100; + public static BytesRef[] group100K; + public static BytesRef[] group10K; + public static BytesRef[] group1M; + public static BytesRef[] group2M; + + DocGrouper(int numDocs, int targetNumGroups) { + this.numDocs = numDocs; + this.targetNumGroups = targetNumGroups; + } + + public DocGroups getNextDocGroups() throws InterruptedException { + return outputQueue.take(); + } + + public static void initGroupIds(Random random) { + assert group100 == null; + group100 = randomStrings(100, random); + group10K = randomStrings(10000, random); + group100K = randomStrings(100000, random); + group1M = randomStrings(1000000, random); + group2M = randomStrings(2000000, random); + } + + static int getTargetNumGroups(int numDocs) { + if (numDocs >= 5000000) { + return 1_000_000; + } else if (numDocs >= 500000) { + return 100_000; + } else { + return 10_000; + } + } + + // returned array will not have dups + private static BytesRef[] randomStrings(int count, Random random) { + final BytesRef[] strings = new BytesRef[count]; + HashSet idSet = new HashSet<>(count); + int i = 0; + while(i < count) { + String s = randomRealisticUnicodeString(random); + while (s.equals("") == false && idSet.contains(s)) { + s = randomRealisticUnicodeString(random); + } + strings[i++] = new BytesRef(s); + idSet.add(s); + } + + return strings; + } + + public abstract void add(LineFileDocs.LineFileDoc lfd) throws InterruptedException; + + /** + * A simple impl when we do not need grouping + */ + static final class NoGroupImpl extends DocGrouper { + + NoGroupImpl(int numDocs) { + super(numDocs, 0); + } + + @Override + public void add(LineFileDocs.LineFileDoc lfd) throws InterruptedException { + if (lfd == LineFileDocs.END) { + outputQueue.put(END); + } + outputQueue.put(new DocGroups.SingleLFD(lfd)); + } + } + + static final class TextGrouper extends DocGrouper { + + private int groupCounter; + private int docCounter; + private int nextNumDocs; + private LineFileDocs.LineFileDoc[] buffer; + private final BytesRef[] groupIds; + private final float docsPerGroupBlock; + + TextGrouper(int numDocs) { + super(numDocs, getTargetNumGroups(numDocs)); + assert group100 != null; + if (numDocs >= 5000000) { + groupIds = group1M; + } else if (numDocs >= 500000) { + groupIds = group100K; + } else { + groupIds = group10K; + } + docsPerGroupBlock = ((float) numDocs) / groupIds.length; + reset(); + } + + @Override + public void add(LineFileDocs.LineFileDoc lfd) throws InterruptedException { + if (lfd == LineFileDocs.END) { + assert docCounter == 0; + outputQueue.put(END); + } + buffer[docCounter++] = lfd; + if (docCounter == nextNumDocs) { + outputQueue.put(new DocGroups.TextBased(groupIds[groupCounter], buffer)); + groupCounter++; + reset(); + } + } + + /* Called when we move to next group */ + private void reset() { + nextNumDocs = calculateNextGroupDocNum(); + buffer = new LineFileDocs.LineFileDoc[nextNumDocs]; + } + + private int calculateNextGroupDocNum() { + // This will toggle between X and X+1 docs, + // converging over time on average to the + // floating point docsPerGroupBlock: + if (groupCounter == groupIds.length - 1) { + return numDocs - ((int) (groupCounter * docsPerGroupBlock)); + } else { + return ((int) ((1 + groupCounter) * docsPerGroupBlock)) - ((int) (groupCounter * docsPerGroupBlock)); + } + } + } + + /** + * The binary LFD is naturally grouped by the binary blob, so that if we have two groups sharing the same + * binary blob then the two groups cannot be indexed concurrently. + * This grouper will produce {@link DocGroups.BinaryBased}, which will contain either: + * 1. One group, consists of multiple LFD, or + * 2. One LFD, split into multiple groups + * Such that each {@link DocGroups.BinaryBased} can be indexed in parallel + *
+ * The algorithm will first calculate average number of documents in the group, then init first budget as the avg + * value, then tries to follow: + * 1. If the coming LFD has number of documents within 0.5 * budget and 1.5 * budget, then the LFD sole will form + * a group, we will then adjust next budget as: newBudget = 2 * budget - LFD.docNum() + * 2. If the coming LFD has number of documents less than 0.5 * budget, then we will try to accumulate the next LFDs + * until the total number of documents of the accumulated LFD reaches 0.5 * budget and form the group. + * The next budget will be adjusted as: newBudget = 2 * budget - total_number_doc_of_the_group + * 3. If the coming LFD has number of documents larger than 1.5 * budget, then we will evenly divide this LFD into + * round(LFD.docNum() / budget) number of groups. Then adjust the next budget as: + * newBudget = (num_group + 1) * budget - LFD.docNum() + * + * In addition to above rule, we will also calibrate the budget to be within 1 ~ 1.4 range of the initial budget + * such that we won't generate too big or small groups, and gives a theoretical upper bound number of groups of + * 2 * target number of groups. (Reality will be much less) + */ + static final class BinaryGrouper extends DocGrouper { + + private final List buffer = new ArrayList<>(); + private final BytesRef[] groupIds; + private final int avg; + private int budget; + private int accumDocNum; + private int groupCounter; + + BinaryGrouper(int numDocs) { + super(numDocs, getTargetNumGroups(numDocs)); + assert group100 != null; + if (numDocs >= 5000000) { + groupIds = group2M; + } else if (numDocs >= 500000) { + groupIds = group1M; + } else { + groupIds = group100K; + } + avg = numDocs / targetNumGroups; + budget = avg; + } + + @Override + public void add(LineFileDocs.LineFileDoc lfd) throws InterruptedException { + if (buffer.size() != 0) { + // case 2 + // we previously have some smallish document block + if (lfd != LineFileDocs.END) { + buffer.add(lfd); + accumDocNum += lfd.remainingDocs(); + } + if (accumDocNum >= 0.5 * budget || lfd == LineFileDocs.END) { + outputQueue.put(new DocGroups.BinaryBased( + buffer.toArray(new LineFileDocs.LineFileDoc[0]), + new BytesRef[]{groupIds[groupCounter++]}, + new int[] {accumDocNum})); + adjustBuffer(accumDocNum, 1); + reset(); + } + } else { + if (lfd == LineFileDocs.END) { + outputQueue.put(END); + return; + } + if (lfd.remainingDocs() >= 0.5 * budget && lfd.remainingDocs() <= 1.5 * budget) { + // case 1 + outputQueue.put(new DocGroups.BinaryBased( + new LineFileDocs.LineFileDoc[]{lfd}, + new BytesRef[]{groupIds[groupCounter++]}, + new int[] {lfd.remainingDocs()})); + adjustBuffer(lfd.remainingDocs(), 1); + } else if (lfd.remainingDocs() < 0.5 * budget) { + // case 2, accumulate but not form a group until we have enough documents + buffer.add(lfd); + accumDocNum += lfd.remainingDocs(); + } else { + // case 3 + int numGroups = lfd.remainingDocs() / budget; + if (lfd.remainingDocs() % budget >= 0.5 * budget) { + numGroups++; + } + int remainder = lfd.remainingDocs() % numGroups; + int base = lfd.remainingDocs() / numGroups; + BytesRef[] nextGroupIds = new BytesRef[numGroups]; + int[] numDocs = new int[numGroups]; + for (int i = 0; i < numGroups; i++) { + nextGroupIds[i] = groupIds[groupCounter++]; + numDocs[i] = base; + if (remainder > 0) { + numDocs[i]++; + remainder--; + } + } + outputQueue.put(new DocGroups.BinaryBased( + new LineFileDocs.LineFileDoc[]{lfd}, + nextGroupIds, + numDocs + )); + } + } + } + + private void adjustBuffer(int lastAccumDocNum, int groupNum) { + budget = groupNum * budget - lastAccumDocNum; + if (budget < avg) { + budget = avg; + } else if (budget > avg * 1.4) { + budget = (int) (avg * 1.4); + } + } + + private void reset() { + buffer.clear(); + accumDocNum = 0; + } + } + + /** + * The class represent one or more document groups + * Note only when we're consuming binary LFD there'll be more than one groups in + * the class + */ + public static abstract class DocGroups { + public abstract BytesRef getGroupId(); + public abstract LineFileDocs.LineFileDoc getNextLFD(); + public abstract int getNumOfDocsInGroup(); + public abstract int getRemainingNumGroups(); + + /** + * A wrapper for singleLFD, when we don't use group fields + */ + static final class SingleLFD extends DocGroups { + private final LineFileDocs.LineFileDoc lfd; + + SingleLFD(LineFileDocs.LineFileDoc lfd) { + this.lfd = lfd; + } + + @Override + public BytesRef getGroupId() { + throw new UnsupportedOperationException("We're not indexing groups"); + } + + @Override + public LineFileDocs.LineFileDoc getNextLFD() { + return lfd; + } + + @Override + public int getNumOfDocsInGroup() { + return 1; + } + + @Override + public int getRemainingNumGroups() { + return lfd.remainingDocs(); + } + } + + static final class BinaryBased extends DocGroups { + + private final LineFileDocs.LineFileDoc[] lfdArray; + private final BytesRef[] groupIds; + private final int[] numDocsPerGroup; + private int consumedDocNumInOneGroup; + private int groupIdx; + private int lfdIdx; + + BinaryBased(LineFileDocs.LineFileDoc[] lfdArray, BytesRef[] groupIds, int[] numDocsPerGroup) { + assert lfdArray.length == 1 || groupIds.length == 1; + assert groupIds.length == numDocsPerGroup.length; + + this.numDocsPerGroup = numDocsPerGroup; + this.lfdArray = lfdArray; + this.groupIds = groupIds; + } + + @Override + public BytesRef getGroupId() { + return groupIds[groupIdx]; + } + + @Override + public LineFileDocs.LineFileDoc getNextLFD() { + if (lfdArray[lfdIdx].remainingDocs() <= 0) { + if (lfdIdx == lfdArray.length - 1) { + throw new IllegalStateException("The group has no more document!"); + } + lfdIdx++; + } + consumeDoc(); + assert lfdArray[lfdIdx].remainingDocs() > 0; + return lfdArray[lfdIdx]; + } + + private void consumeDoc() { + consumedDocNumInOneGroup++; + if (consumedDocNumInOneGroup > numDocsPerGroup[groupIdx]) { + consumedDocNumInOneGroup = 1; + groupIdx++; + } + } + + @Override + public int getNumOfDocsInGroup() { + return numDocsPerGroup[groupIdx]; + } + + @Override + public int getRemainingNumGroups() { + return groupIds.length - groupIdx; + } + } + + static final class TextBased extends DocGroups { + private final BytesRef groupId; + private final LineFileDocs.LineFileDoc[] lfdArray; + private int cursor; + + TextBased(BytesRef groupId, LineFileDocs.LineFileDoc[] lfdArray) { + this.groupId = groupId; + this.lfdArray = lfdArray; + } + + @Override + public BytesRef getGroupId() { + return groupId; + } + + @Override + public LineFileDocs.LineFileDoc getNextLFD() { + return lfdArray[cursor++]; + } + + @Override + public int getNumOfDocsInGroup() { + return lfdArray.length; + } + + @Override + public int getRemainingNumGroups() { + if (cursor == lfdArray.length) { + return 0; + } + return 1; + } + } + } +} diff --git a/src/main/perf/IndexThreads.java b/src/main/perf/IndexThreads.java index 85a9c0c29..e5aa0b175 100644 --- a/src/main/perf/IndexThreads.java +++ b/src/main/perf/IndexThreads.java @@ -17,6 +17,11 @@ * limitations under the License. */ +import static perf.DocGrouper.group100; +import static perf.DocGrouper.group100K; +import static perf.DocGrouper.group10K; +import static perf.DocGrouper.group1M; + import java.io.IOException; import java.util.Iterator; import java.util.Locale; @@ -57,10 +62,7 @@ public IndexThreads(Random random, IndexWriter w, AtomicBoolean indexingFailed, this.docs = lineFileDocs; if (addGroupingFields) { - IndexThread.group100 = randomStrings(100, random); - IndexThread.group10K = randomStrings(10000, random); - IndexThread.group100K = randomStrings(100000, random); - IndexThread.group1M = randomStrings(1000000, random); + DocGrouper.initGroupIds(random); groupBlockIndex = new AtomicInteger(); } else { groupBlockIndex = null; @@ -128,10 +130,6 @@ public static interface UpdatesListener { } private static class IndexThread extends Thread { - public static BytesRef[] group100; - public static BytesRef[] group100K; - public static BytesRef[] group10K; - public static BytesRef[] group1M; private final LineFileDocs docs; private final int numTotalDocs; private final IndexWriter w; @@ -242,95 +240,67 @@ public void run() { final double docsPerGroupBlock = numTotalDocs / (double) groupBlocks.length; while (stop.get() == false) { - int groupCounter = -1; - if (groupBlockIndex.get() >= groupBlocks.length) { - docs.recycle(); + final int numDocs = docs.reserve(); + if (numDocs == 0) { break; - } else { - synchronized (groupBlockIndex) { - // we need to make sure we have more group index - // as well as more docs to index at the same time - if (groupBlockIndex.get() >= groupBlocks.length) { - docs.recycle(); - break; - } - if (docs.reserve() == false) { - break; - } - groupCounter = groupBlockIndex.getAndIncrement(); - } } - final int numDocs; - // This will toggle between X and X+1 docs, - // converging over time on average to the - // floating point docsPerGroupBlock: - if (groupCounter == groupBlocks.length-1) { - numDocs = numTotalDocs - ((int) (groupCounter*docsPerGroupBlock)); - } else { - numDocs = ((int) ((1+groupCounter)*docsPerGroupBlock)) - ((int) (groupCounter*docsPerGroupBlock)); - } - groupBlockField.setBytesValue(groupBlocks[groupCounter]); - - w.addDocuments(new Iterable() { - @Override - public Iterator iterator() { - return new Iterator() { - int upto; - Document doc; - - @SuppressWarnings("synthetic-access") - @Override - public boolean hasNext() { - if (upto < numDocs) { - upto++; - - Field extraField; - - try { - doc = docs.nextDoc(docState, true); - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - if (doc == null) { - throw new IllegalStateException("Expected more docs"); - } - - if (upto == numDocs) { - // Sneaky: we remove it down below, so that in the not-cloned case we don't accumulate this field: - doc.add(groupEndField); - } - - final int id = LineFileDocs.idToInt(idField.stringValue()); - if (id >= numTotalDocs) { - throw new IllegalStateException(); - } - if (((1+id) % 10000) == 0) { - System.out.println("Indexer: " + (1+id) + " docs... (" + (System.currentTimeMillis() - tStart) + " msec)"); - } - group100Field.setBytesValue(group100[id%100]); - group10KField.setBytesValue(group10K[id%10000]); - group100KField.setBytesValue(group100K[id%100000]); - group1MField.setBytesValue(group1M[id%1000000]); - count.incrementAndGet(); - return true; - } else { - doc = null; - return false; - } - } - - @Override - public Document next() { - return doc; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; + groupBlockField.setBytesValue(docs.getCurrentGroupId()); + + w.addDocuments((Iterable) () -> new Iterator<>() { + int upto; + Document doc; + + @SuppressWarnings("synthetic-access") + @Override + public boolean hasNext() { + if (upto < numDocs) { + upto++; + + Field extraField; + + try { + doc = docs.nextDoc(docState); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + if (doc == null) { + throw new IllegalStateException("Expected more docs"); + } + + if (upto == numDocs) { + // Sneaky: we remove it down below, so that in the not-cloned case we don't accumulate this field: + doc.add(groupEndField); + } + + final int id = LineFileDocs.idToInt(idField.stringValue()); + if (id >= numTotalDocs) { + throw new IllegalStateException(); + } + if (((1 + id) % 10000) == 0) { + System.out.println("Indexer: " + (1 + id) + " docs... (" + (System.currentTimeMillis() - tStart) + " msec)"); + } + group100Field.setBytesValue(group100[id % 100]); + group10KField.setBytesValue(group10K[id % 10000]); + group100KField.setBytesValue(group100K[id % 100000]); + group1MField.setBytesValue(group1M[id % 1000000]); + count.incrementAndGet(); + return true; + } else { + doc = null; + return false; } - }); + } + + @Override + public Document next() { + return doc; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }); docState.doc.removeField("groupend"); } @@ -487,20 +457,6 @@ public void run() { } } - // NOTE: returned array might have dups - private static BytesRef[] randomStrings(int count, Random random) { - final BytesRef[] strings = new BytesRef[count]; - int i = 0; - while(i < count) { - final String s = randomRealisticUnicodeString(random); - if (s.length() >= 7) { - strings[i++] = new BytesRef(s); - } - } - - return strings; - } - // NOTE: copied from Lucene's _TestUtil, so we don't have // a [dangerous] dep on test-framework: diff --git a/src/main/perf/Indexer.java b/src/main/perf/Indexer.java index b6778c8f1..b741b87dc 100644 --- a/src/main/perf/Indexer.java +++ b/src/main/perf/Indexer.java @@ -472,7 +472,7 @@ public DocValuesFormat getDocValuesFormatForField(String field) { LineFileDocs lineFileDocs = new LineFileDocs(lineFile, repeatDocs, storeBody, tvsBody, bodyPostingsOffsets, false, taxoWriter, facetDimMethods, facetsConfig, addDVFields, - vectorFile, vectorDimension, vectorEncoding); + vectorFile, vectorDimension, vectorEncoding, addGroupingFields, docCountLimit); float docsPerSecPerThread = -1f; //float docsPerSecPerThread = 100f; diff --git a/src/main/perf/LineFileDocs.java b/src/main/perf/LineFileDocs.java index 68dc5499f..9da65cf5a 100644 --- a/src/main/perf/LineFileDocs.java +++ b/src/main/perf/LineFileDocs.java @@ -43,9 +43,6 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.lucene.document.BinaryDocValuesField; @@ -78,7 +75,7 @@ public class LineFileDocs implements Closeable { // sentinel: - private final static LineFileDoc END = new LineFileDoc.TextBased("END", null, -1); + final static LineFileDoc END = new LineFileDoc.TextBased("END", null, -1); private BufferedReader reader; private SeekableByteChannel channel; @@ -96,21 +93,20 @@ public class LineFileDocs implements Closeable { private final FacetsConfig facetsConfig; private String[] extraFacetFields; private final boolean addDVFields; - private final BlockingQueue queue = new ArrayBlockingQueue<>(1024); - private final BlockingQueue recycleBin = new ArrayBlockingQueue<>(1024); private final Thread readerThread; final boolean isBinary; - private final ThreadLocal nextDocs = new ThreadLocal<>(); private final String[] months = DateFormatSymbols.getInstance(Locale.ROOT).getMonths(); private final String vectorFile; private final int vectorDimension; private final VectorEncoding vectorEncoding; private SeekableByteChannel vectorChannel; + private final DocGrouper docGrouper; + private final ThreadLocal nextDocGroups = new ThreadLocal<>(); public LineFileDocs(String path, boolean doRepeat, boolean storeBody, boolean tvsBody, boolean bodyPostingsOffsets, boolean doClone, TaxonomyWriter taxoWriter, Map facetFields, FacetsConfig facetsConfig, boolean addDVFields, String vectorFile, int vectorDimension, - VectorEncoding vectorEncoding) + VectorEncoding vectorEncoding, boolean addGroupingFields, int totalDocs) throws IOException { this.path = path; this.isBinary = path.endsWith(".bin"); @@ -126,6 +122,14 @@ public LineFileDocs(String path, boolean doRepeat, boolean storeBody, boolean tv this.vectorFile = vectorFile; this.vectorDimension = vectorDimension; this.vectorEncoding = vectorEncoding; + if (addGroupingFields == false) { + // NOTE: totalDocs can be -1 if we don't add group fields + docGrouper = new DocGrouper.NoGroupImpl(totalDocs); + } else if (isBinary) { + docGrouper = new DocGrouper.BinaryGrouper(totalDocs); + } else { + docGrouper = new DocGrouper.TextGrouper(totalDocs); + } open(); readerThread = new Thread() { @@ -175,7 +179,7 @@ private void readDocs() throws Exception { throw new RuntimeException("expected " + length + " document bytes but read " + x); } buffer.position(0); - queue.put(new LineFileDoc.BinaryBased(buffer, readVector(docCountInBlock), totalDocCount, docCountInBlock)); + docGrouper.add(new LineFileDoc.BinaryBased(buffer, readVector(docCountInBlock), totalDocCount, docCountInBlock)); totalDocCount += docCountInBlock; } } else { @@ -192,11 +196,11 @@ private void readDocs() throws Exception { break; } } - queue.put(new LineFileDoc.TextBased(line, readVector(1), id++)); + docGrouper.add(new LineFileDoc.TextBased(line, readVector(1), id++)); } } for(int i=0;i<128;i++) { - queue.put(END); + docGrouper.add(END); } } @@ -510,47 +514,38 @@ static Document cloneDoc(Document doc1) { return doc2; } - /* Call this function to put the remaining doc block into recycle queue */ - public void recycle() { - if (isBinary && nextDocs.get() != null && nextDocs.get().getBlockByteText().hasRemaining()) { - try { - recycleBin.put(nextDocs.get()); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new RuntimeException(ie); - } - nextDocs.set(null); - } - } - - /* Call this function to make sure the calling thread will have something to index */ - public boolean reserve() { - if (isBinary == false) { - return true; // don't need to reserve anything with text based LFD - } - LineFileDoc lfd = nextDocs.get(); - if (lfd != null && lfd.getBlockByteText().hasRemaining()) { - return true; // we have next document + /** + * Call this function to make sure the calling thread will have something to index + * @return number of documents in the next group to be indexed, 0 means nothing and the + * thread should stop indexing. + */ + public int reserve() { + DocGrouper.DocGroups docGroups = nextDocGroups.get(); + if (docGroups != null && docGroups.getRemainingNumGroups() > 0) { + return docGroups.getNumOfDocsInGroup(); } try { - lfd = queue.take(); - } catch (InterruptedException ie) { + docGroups = docGrouper.getNextDocGroups(); + } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new RuntimeException(ie); + throw new RuntimeException(e); } - if (lfd == END) { - return false; + if (docGroups == DocGrouper.END) { + return 0; } - nextDocs.set(lfd); - return true; + nextDocGroups.set(docGroups); + return docGroups.getNumOfDocsInGroup(); } - public Document nextDoc(DocState doc) throws IOException { - return nextDoc(doc, false); + /** + * Should only call this method after {@link #reserve()} return positive value + */ + public BytesRef getCurrentGroupId() { + return nextDocGroups.get().getGroupId(); } @SuppressWarnings({"rawtypes", "unchecked"}) - public Document nextDoc(DocState doc, boolean expected) throws IOException { + public Document nextDoc(DocState doc) throws IOException { long msecSinceEpoch; int timeSec; @@ -560,34 +555,16 @@ public Document nextDoc(DocState doc, boolean expected) throws IOException { String body; String randomLabel; int myID = -1; + LineFileDoc lfd; - if (isBinary) { - - float[] vector = new float[vectorDimension]; - FloatBuffer vectorBuffer = null; - - LineFileDoc lfd; + // reserve() is okay to be called multiple times + if (reserve() == 0) { + return null; + } else { + lfd = nextDocGroups.get().getNextLFD(); + } - // reserve() is okay to be called multiple times - if (reserve() == false) { - if (expected == false) { - return null; - } else { - // the caller expects there are more documents, we will be blocking on recycleBin for 10 seconds - try { - lfd = recycleBin.poll(10, TimeUnit.SECONDS); - if (lfd == null) { - throw new IllegalStateException("Expected docs in recycleBin but not found anything"); - } - nextDocs.set(lfd); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new RuntimeException(ie); - } - } - } else { - lfd = nextDocs.get(); - } + if (isBinary) { assert lfd != null && lfd != END && lfd.getBlockByteText().hasRemaining(); // buffer format described in buildBinaryLineDocs.py ByteBuffer buffer = lfd.getBlockByteText(); @@ -632,16 +609,6 @@ public Document nextDoc(DocState doc, boolean expected) throws IOException { } } else { - LineFileDoc lfd; - try { - lfd = queue.take(); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new RuntimeException(ie); - } - if (lfd == END) { - return null; - } line = lfd.getStringText(); myID = lfd.getNextId(); @@ -819,7 +786,7 @@ public Document nextDoc(DocState doc, boolean expected) throws IOException { } } - private static abstract class LineFileDoc { + static abstract class LineFileDoc { // This vector can be vector value for one or more documents // more specifically, for text based LFD the vector is single valued @@ -842,6 +809,12 @@ private static abstract class LineFileDoc { */ abstract int getNextId(); + /** + * How many docs in this LFD is not yet read, mainly for BinaryBased, as TextBased LFD will + * only have 1 doc always + */ + abstract int remainingDocs(); + /** * This method is only for txt based LFD, should only return value for 1 document */ @@ -862,6 +835,7 @@ private static final class TextBased extends LineFileDoc { final String stringText; final int id; // This is the exact id since it is txt based LFD + boolean consumed; TextBased(String text, float[] vector, int id) { super(vector); @@ -871,9 +845,18 @@ private static final class TextBased extends LineFileDoc { @Override int getNextId() { + consumed = true; return id; } + @Override + int remainingDocs() { + if (consumed) { + return 0; + } + return 1; + } + @Override String getStringText() { return stringText; @@ -884,42 +867,31 @@ private static final class BinaryBased extends LineFileDoc { final ByteBuffer blockByteText; private int nextId; // will have multiple doc in a same LFD so we'll determine id using a base - private int docCount; // and a count + private int remainingDocs; // and a count BinaryBased(ByteBuffer bytes, float[] vector, int idBase, int docCount) { super(vector); blockByteText = bytes; this.nextId = idBase; - this.docCount = docCount; + this.remainingDocs = docCount; } @Override int getNextId() { - if (docCount-- == 0) { - throw new IllegalStateException("Calling getId more than docCount"); + if (remainingDocs-- == 0) { + throw new IllegalStateException("Calling getId more than number of docs in a block"); } return nextId++; } @Override - ByteBuffer getBlockByteText() { - return blockByteText; + int remainingDocs() { + return remainingDocs; } - } - LineFileDoc(String text, byte[] vector) { - if (vector == null) { - this.vector = null; - } else { - this.vector = ByteBuffer.wrap(vector); - } - } - - LineFileDoc(ByteBuffer bytes, byte[] vector) { - if (vector == null) { - this.vector = null; - } else { - this.vector = ByteBuffer.wrap(vector); + @Override + ByteBuffer getBlockByteText() { + return blockByteText; } } diff --git a/src/main/perf/NRTPerfTest.java b/src/main/perf/NRTPerfTest.java index 11fead943..0d0185e68 100644 --- a/src/main/perf/NRTPerfTest.java +++ b/src/main/perf/NRTPerfTest.java @@ -272,7 +272,7 @@ public static void main(String[] args) throws Exception { System.out.println("Max merge MB/sec = " + (mergeMaxWriteMBPerSec <= 0.0 ? "unlimited" : mergeMaxWriteMBPerSec)); final Random random = new Random(seed); - final LineFileDocs docs = new LineFileDocs(lineDocFile, true, false, false, false, false, null, Collections.emptyMap(), null, true, null, 0, null); + final LineFileDocs docs = new LineFileDocs(lineDocFile, true, false, false, false, false, null, Collections.emptyMap(), null, true, null, 0, null, false, -1); final Directory dir0; if (dirImpl.equals("MMapDirectory")) { diff --git a/src/main/perf/SearchPerfTest.java b/src/main/perf/SearchPerfTest.java index deb4ec491..25e01c72b 100755 --- a/src/main/perf/SearchPerfTest.java +++ b/src/main/perf/SearchPerfTest.java @@ -356,7 +356,7 @@ public void warm(LeafReader reader) throws IOException { // TODO: add -nrtBodyPostingsOffsets instead of // hardwired false: boolean addDVFields = mode == Mode.BDV_UPDATE || mode == Mode.NDV_UPDATE; - LineFileDocs lineFileDocs = new LineFileDocs(lineDocsFile, false, storeBody, tvsBody, false, cloneDocs, null, null, null, addDVFields, null, 0, null); + LineFileDocs lineFileDocs = new LineFileDocs(lineDocsFile, false, storeBody, tvsBody, false, cloneDocs, null, null, null, addDVFields, null, 0, null, false, -1); IndexThreads threads = new IndexThreads(new Random(17), writer, new AtomicBoolean(false), lineFileDocs, indexThreadCount, -1, false, false, mode, docsPerSecPerThread, null, -1.0, -1); threads.start(); From e2e3735a4cf3115ac7901a3036ab9dab9cca35d9 Mon Sep 17 00:00:00 2001 From: Patrick Zhai Date: Wed, 2 Aug 2023 22:39:40 -0700 Subject: [PATCH 2/4] Add to compile file list --- src/python/competition.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/python/competition.py b/src/python/competition.py index b5cc6eafe..5cbe609eb 100644 --- a/src/python/competition.py +++ b/src/python/competition.py @@ -387,6 +387,7 @@ def compile(self, cp): 'StringFieldDocSelector.java', 'UnparsedTask.java', 'TaskParserFactory.java', + 'DocGrouper.java', )] print('files %s' % files) From b8ecd8208bf7ba46cbc11b15e27c0d4cb5be238b Mon Sep 17 00:00:00 2001 From: Patrick Zhai Date: Mon, 4 Sep 2023 22:29:51 -0700 Subject: [PATCH 3/4] Address easy comments --- src/main/perf/DocGrouper.java | 17 ++++++++++------- src/main/perf/LineFileDocs.java | 2 +- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/main/perf/DocGrouper.java b/src/main/perf/DocGrouper.java index 51a1c0b6c..d1750a048 100644 --- a/src/main/perf/DocGrouper.java +++ b/src/main/perf/DocGrouper.java @@ -72,8 +72,10 @@ public DocGroups getNextDocGroups() throws InterruptedException { return outputQueue.take(); } - public static void initGroupIds(Random random) { - assert group100 == null; + public static synchronized void initGroupIds(Random random) { + if (group100 != null) { + throw new IllegalStateException("Cannot init group ids twice"); + } group100 = randomStrings(100, random); group10K = randomStrings(10000, random); group100K = randomStrings(100000, random); @@ -167,17 +169,18 @@ public void add(LineFileDocs.LineFileDoc lfd) throws InterruptedException { /* Called when we move to next group */ private void reset() { - nextNumDocs = calculateNextGroupDocNum(); + nextNumDocs = calculateNextGroupDocCount(); buffer = new LineFileDocs.LineFileDoc[nextNumDocs]; } - private int calculateNextGroupDocNum() { - // This will toggle between X and X+1 docs, - // converging over time on average to the - // floating point docsPerGroupBlock: + private int calculateNextGroupDocCount() { if (groupCounter == groupIds.length - 1) { + // The last group, we make sure the sum matches the total doc count return numDocs - ((int) (groupCounter * docsPerGroupBlock)); } else { + // This will toggle between X and X+1 docs, + // converging over time on average to the + // floating point docsPerGroupBlock: return ((int) ((1 + groupCounter) * docsPerGroupBlock)) - ((int) (groupCounter * docsPerGroupBlock)); } } diff --git a/src/main/perf/LineFileDocs.java b/src/main/perf/LineFileDocs.java index 9da65cf5a..8ad0a81d8 100644 --- a/src/main/perf/LineFileDocs.java +++ b/src/main/perf/LineFileDocs.java @@ -878,7 +878,7 @@ private static final class BinaryBased extends LineFileDoc { @Override int getNextId() { - if (remainingDocs-- == 0) { + if (remainingDocs-- <= 0) { throw new IllegalStateException("Calling getId more than number of docs in a block"); } return nextId++; From 41d76c2c975b4b6909dd3177ebc49addc9019382 Mon Sep 17 00:00:00 2001 From: Patrick Zhai Date: Mon, 4 Sep 2023 22:48:03 -0700 Subject: [PATCH 4/4] Refactor, disable binaryLFD with grouping --- src/main/perf/DocGrouper.java | 196 +------------------------------- src/main/perf/IndexThreads.java | 16 +-- src/main/perf/LineFileDocs.java | 12 +- 3 files changed, 11 insertions(+), 213 deletions(-) diff --git a/src/main/perf/DocGrouper.java b/src/main/perf/DocGrouper.java index d1750a048..0b7b4a950 100644 --- a/src/main/perf/DocGrouper.java +++ b/src/main/perf/DocGrouper.java @@ -33,7 +33,6 @@ */ public abstract class DocGrouper { protected final int numDocs; - protected final int targetNumGroups; protected final BlockingQueue outputQueue = new ArrayBlockingQueue<>(1024); public static final DocGroups END = new DocGroups() { @Override @@ -61,11 +60,9 @@ public int getRemainingNumGroups() { public static BytesRef[] group100K; public static BytesRef[] group10K; public static BytesRef[] group1M; - public static BytesRef[] group2M; - DocGrouper(int numDocs, int targetNumGroups) { + DocGrouper(int numDocs) { this.numDocs = numDocs; - this.targetNumGroups = targetNumGroups; } public DocGroups getNextDocGroups() throws InterruptedException { @@ -80,17 +77,6 @@ public static synchronized void initGroupIds(Random random) { group10K = randomStrings(10000, random); group100K = randomStrings(100000, random); group1M = randomStrings(1000000, random); - group2M = randomStrings(2000000, random); - } - - static int getTargetNumGroups(int numDocs) { - if (numDocs >= 5000000) { - return 1_000_000; - } else if (numDocs >= 500000) { - return 100_000; - } else { - return 10_000; - } } // returned array will not have dups @@ -118,7 +104,7 @@ private static BytesRef[] randomStrings(int count, Random random) { static final class NoGroupImpl extends DocGrouper { NoGroupImpl(int numDocs) { - super(numDocs, 0); + super(numDocs); } @Override @@ -140,7 +126,7 @@ static final class TextGrouper extends DocGrouper { private final float docsPerGroupBlock; TextGrouper(int numDocs) { - super(numDocs, getTargetNumGroups(numDocs)); + super(numDocs); assert group100 != null; if (numDocs >= 5000000) { groupIds = group1M; @@ -186,127 +172,6 @@ private int calculateNextGroupDocCount() { } } - /** - * The binary LFD is naturally grouped by the binary blob, so that if we have two groups sharing the same - * binary blob then the two groups cannot be indexed concurrently. - * This grouper will produce {@link DocGroups.BinaryBased}, which will contain either: - * 1. One group, consists of multiple LFD, or - * 2. One LFD, split into multiple groups - * Such that each {@link DocGroups.BinaryBased} can be indexed in parallel - *
- * The algorithm will first calculate average number of documents in the group, then init first budget as the avg - * value, then tries to follow: - * 1. If the coming LFD has number of documents within 0.5 * budget and 1.5 * budget, then the LFD sole will form - * a group, we will then adjust next budget as: newBudget = 2 * budget - LFD.docNum() - * 2. If the coming LFD has number of documents less than 0.5 * budget, then we will try to accumulate the next LFDs - * until the total number of documents of the accumulated LFD reaches 0.5 * budget and form the group. - * The next budget will be adjusted as: newBudget = 2 * budget - total_number_doc_of_the_group - * 3. If the coming LFD has number of documents larger than 1.5 * budget, then we will evenly divide this LFD into - * round(LFD.docNum() / budget) number of groups. Then adjust the next budget as: - * newBudget = (num_group + 1) * budget - LFD.docNum() - * - * In addition to above rule, we will also calibrate the budget to be within 1 ~ 1.4 range of the initial budget - * such that we won't generate too big or small groups, and gives a theoretical upper bound number of groups of - * 2 * target number of groups. (Reality will be much less) - */ - static final class BinaryGrouper extends DocGrouper { - - private final List buffer = new ArrayList<>(); - private final BytesRef[] groupIds; - private final int avg; - private int budget; - private int accumDocNum; - private int groupCounter; - - BinaryGrouper(int numDocs) { - super(numDocs, getTargetNumGroups(numDocs)); - assert group100 != null; - if (numDocs >= 5000000) { - groupIds = group2M; - } else if (numDocs >= 500000) { - groupIds = group1M; - } else { - groupIds = group100K; - } - avg = numDocs / targetNumGroups; - budget = avg; - } - - @Override - public void add(LineFileDocs.LineFileDoc lfd) throws InterruptedException { - if (buffer.size() != 0) { - // case 2 - // we previously have some smallish document block - if (lfd != LineFileDocs.END) { - buffer.add(lfd); - accumDocNum += lfd.remainingDocs(); - } - if (accumDocNum >= 0.5 * budget || lfd == LineFileDocs.END) { - outputQueue.put(new DocGroups.BinaryBased( - buffer.toArray(new LineFileDocs.LineFileDoc[0]), - new BytesRef[]{groupIds[groupCounter++]}, - new int[] {accumDocNum})); - adjustBuffer(accumDocNum, 1); - reset(); - } - } else { - if (lfd == LineFileDocs.END) { - outputQueue.put(END); - return; - } - if (lfd.remainingDocs() >= 0.5 * budget && lfd.remainingDocs() <= 1.5 * budget) { - // case 1 - outputQueue.put(new DocGroups.BinaryBased( - new LineFileDocs.LineFileDoc[]{lfd}, - new BytesRef[]{groupIds[groupCounter++]}, - new int[] {lfd.remainingDocs()})); - adjustBuffer(lfd.remainingDocs(), 1); - } else if (lfd.remainingDocs() < 0.5 * budget) { - // case 2, accumulate but not form a group until we have enough documents - buffer.add(lfd); - accumDocNum += lfd.remainingDocs(); - } else { - // case 3 - int numGroups = lfd.remainingDocs() / budget; - if (lfd.remainingDocs() % budget >= 0.5 * budget) { - numGroups++; - } - int remainder = lfd.remainingDocs() % numGroups; - int base = lfd.remainingDocs() / numGroups; - BytesRef[] nextGroupIds = new BytesRef[numGroups]; - int[] numDocs = new int[numGroups]; - for (int i = 0; i < numGroups; i++) { - nextGroupIds[i] = groupIds[groupCounter++]; - numDocs[i] = base; - if (remainder > 0) { - numDocs[i]++; - remainder--; - } - } - outputQueue.put(new DocGroups.BinaryBased( - new LineFileDocs.LineFileDoc[]{lfd}, - nextGroupIds, - numDocs - )); - } - } - } - - private void adjustBuffer(int lastAccumDocNum, int groupNum) { - budget = groupNum * budget - lastAccumDocNum; - if (budget < avg) { - budget = avg; - } else if (budget > avg * 1.4) { - budget = (int) (avg * 1.4); - } - } - - private void reset() { - buffer.clear(); - accumDocNum = 0; - } - } - /** * The class represent one or more document groups * Note only when we're consuming binary LFD there'll be more than one groups in @@ -349,61 +214,6 @@ public int getRemainingNumGroups() { } } - static final class BinaryBased extends DocGroups { - - private final LineFileDocs.LineFileDoc[] lfdArray; - private final BytesRef[] groupIds; - private final int[] numDocsPerGroup; - private int consumedDocNumInOneGroup; - private int groupIdx; - private int lfdIdx; - - BinaryBased(LineFileDocs.LineFileDoc[] lfdArray, BytesRef[] groupIds, int[] numDocsPerGroup) { - assert lfdArray.length == 1 || groupIds.length == 1; - assert groupIds.length == numDocsPerGroup.length; - - this.numDocsPerGroup = numDocsPerGroup; - this.lfdArray = lfdArray; - this.groupIds = groupIds; - } - - @Override - public BytesRef getGroupId() { - return groupIds[groupIdx]; - } - - @Override - public LineFileDocs.LineFileDoc getNextLFD() { - if (lfdArray[lfdIdx].remainingDocs() <= 0) { - if (lfdIdx == lfdArray.length - 1) { - throw new IllegalStateException("The group has no more document!"); - } - lfdIdx++; - } - consumeDoc(); - assert lfdArray[lfdIdx].remainingDocs() > 0; - return lfdArray[lfdIdx]; - } - - private void consumeDoc() { - consumedDocNumInOneGroup++; - if (consumedDocNumInOneGroup > numDocsPerGroup[groupIdx]) { - consumedDocNumInOneGroup = 1; - groupIdx++; - } - } - - @Override - public int getNumOfDocsInGroup() { - return numDocsPerGroup[groupIdx]; - } - - @Override - public int getRemainingNumGroups() { - return groupIds.length - groupIdx; - } - } - static final class TextBased extends DocGroups { private final BytesRef groupId; private final LineFileDocs.LineFileDoc[] lfdArray; diff --git a/src/main/perf/IndexThreads.java b/src/main/perf/IndexThreads.java index e5aa0b175..b224fd3c7 100644 --- a/src/main/perf/IndexThreads.java +++ b/src/main/perf/IndexThreads.java @@ -222,25 +222,13 @@ public void run() { } if (group100 != null) { - + // We're indexing with grouping enabled if (numTotalDocs == -1) { throw new IllegalStateException("must specify numTotalDocs when indexing doc blocks for grouping"); } - // Add docs in blocks: - - final BytesRef[] groupBlocks; - if (numTotalDocs >= 5000000) { - groupBlocks = group1M; - } else if (numTotalDocs >= 500000) { - groupBlocks = group100K; - } else { - groupBlocks = group10K; - } - final double docsPerGroupBlock = numTotalDocs / (double) groupBlocks.length; - while (stop.get() == false) { - final int numDocs = docs.reserve(); + final int numDocs = docs.reserveNextGroup(); if (numDocs == 0) { break; } diff --git a/src/main/perf/LineFileDocs.java b/src/main/perf/LineFileDocs.java index 8ad0a81d8..e0ab7e9e8 100644 --- a/src/main/perf/LineFileDocs.java +++ b/src/main/perf/LineFileDocs.java @@ -126,7 +126,7 @@ public LineFileDocs(String path, boolean doRepeat, boolean storeBody, boolean tv // NOTE: totalDocs can be -1 if we don't add group fields docGrouper = new DocGrouper.NoGroupImpl(totalDocs); } else if (isBinary) { - docGrouper = new DocGrouper.BinaryGrouper(totalDocs); + throw new IllegalArgumentException("We don't support group field with binary LFD"); } else { docGrouper = new DocGrouper.TextGrouper(totalDocs); } @@ -519,7 +519,7 @@ static Document cloneDoc(Document doc1) { * @return number of documents in the next group to be indexed, 0 means nothing and the * thread should stop indexing. */ - public int reserve() { + public int reserveNextGroup() { DocGrouper.DocGroups docGroups = nextDocGroups.get(); if (docGroups != null && docGroups.getRemainingNumGroups() > 0) { return docGroups.getNumOfDocsInGroup(); @@ -538,7 +538,7 @@ public int reserve() { } /** - * Should only call this method after {@link #reserve()} return positive value + * Should only call this method after {@link #reserveNextGroup()} return positive value */ public BytesRef getCurrentGroupId() { return nextDocGroups.get().getGroupId(); @@ -554,11 +554,11 @@ public Document nextDoc(DocState doc) throws IOException { String title; String body; String randomLabel; - int myID = -1; + int myID; LineFileDoc lfd; - // reserve() is okay to be called multiple times - if (reserve() == 0) { + // reserveNextGroup() is okay to be called multiple times + if (reserveNextGroup() == 0) { return null; } else { lfd = nextDocGroups.get().getNextLFD();