Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make grouping deterministic #225

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 251 additions & 0 deletions src/main/perf/DocGrouper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
package perf;

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import static org.apache.lucene.tests.util.TestUtil.randomRealisticUnicodeString;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.apache.lucene.util.BytesRef;

/**
* Consuming {@link perf.LineFileDocs.LineFileDoc}, group them and put the grouped docs into
* a thread-safe queue.
*/
public abstract class DocGrouper {
zhaih marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this final, non-abstract, and rename it to TextDocGrouper maybe? No need for separate subclass since we have no binary case anymore?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need a NoGroupImpl to deal with the case where we don't need groups, or we can handle that difference in LineFileDocs which requires several if/else there... I'm ok with either but think this might be (slightly) cleaner?

protected final int numDocs;
zhaih marked this conversation as resolved.
Show resolved Hide resolved
protected final BlockingQueue<DocGroups> outputQueue = new ArrayBlockingQueue<>(1024);
public static final DocGroups END = new DocGroups() {
@Override
public BytesRef getGroupId() {
return null;
}

@Override
public LineFileDocs.LineFileDoc getNextLFD() {
return null;
}

@Override
public int getNumOfDocsInGroup() {
return 0;
}

@Override
public int getRemainingNumGroups() {
return 0;
}
};

public static BytesRef[] group100;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment that this is lazily initialized only 1) during indexing, and 2) when grouping is enabled?

public static BytesRef[] group100K;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could these maybe become non-static now? Init them on construction of TextDocGrouper class? And fix indexer threads to reference them in this instance?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually why they are static previously?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not certain :) I think because there was no obvious singleton instantiated class to store them on (though, IndexThreads could've been used, hmm)? But this new DocGrouper is instantiated once, and is all about grouping, so it seems like the right place to put these compute-once group values?

public static BytesRef[] group10K;
public static BytesRef[] group1M;

DocGrouper(int numDocs) {
this.numDocs = numDocs;
zhaih marked this conversation as resolved.
Show resolved Hide resolved
}

public DocGroups getNextDocGroups() throws InterruptedException {
return outputQueue.take();
}

public static synchronized void initGroupIds(Random random) {
if (group100 != null) {
throw new IllegalStateException("Cannot init group ids twice");
}
group100 = randomStrings(100, random);
group10K = randomStrings(10000, random);
group100K = randomStrings(100000, random);
group1M = randomStrings(1000000, random);
}

// returned array will not have dups
private static BytesRef[] randomStrings(int count, Random random) {
final BytesRef[] strings = new BytesRef[count];
HashSet<String> idSet = new HashSet<>(count);
int i = 0;
while(i < count) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe remove i and change to while (idSet.size() < count) { ... }?

String s = randomRealisticUnicodeString(random);
while (s.equals("") == false && idSet.contains(s)) {
s = randomRealisticUnicodeString(random);
}
strings[i++] = new BytesRef(s);
idSet.add(s);
}

return strings;
}

public abstract void add(LineFileDocs.LineFileDoc lfd) throws InterruptedException;

/**
* A simple impl when we do not need grouping
*/
static final class NoGroupImpl extends DocGrouper {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm can we eliminate this? It seems wasteful way to do nothing? (putting a single doc into a new DocGroup into the queue for threads to then read? Can we just add if (addGroupingFields == false) and skip adding grouping fields in index threads?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we need to retrieve lfd in two different ways, one with group one without, means we need to keep 2 blocking queues in LineFileDocs and have 2 ways to retrieve them..

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I see. Then I think you're right -- let's keep the abstract base class and the no-op subclass?


NoGroupImpl(int numDocs) {
super(numDocs);
}

@Override
public void add(LineFileDocs.LineFileDoc lfd) throws InterruptedException {
if (lfd == LineFileDocs.END) {
outputQueue.put(END);
}
outputQueue.put(new DocGroups.SingleLFD(lfd));
}
}

static final class TextGrouper extends DocGrouper {

private int groupCounter;
private int docCounter;
private int nextNumDocs;
private LineFileDocs.LineFileDoc[] buffer;
private final BytesRef[] groupIds;
private final float docsPerGroupBlock;

TextGrouper(int numDocs) {
super(numDocs);
assert group100 != null;
if (numDocs >= 5000000) {
groupIds = group1M;
zhaih marked this conversation as resolved.
Show resolved Hide resolved
} else if (numDocs >= 500000) {
groupIds = group100K;
} else {
groupIds = group10K;
}
docsPerGroupBlock = ((float) numDocs) / groupIds.length;
reset();
}

@Override
public void add(LineFileDocs.LineFileDoc lfd) throws InterruptedException {
if (lfd == LineFileDocs.END) {
assert docCounter == 0;
outputQueue.put(END);
}
buffer[docCounter++] = lfd;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm are LineFileDoc instances reused (it's unsafe to buffer/hold onto more than one at a time in a thread)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I don't think they're reused?

if (docCounter == nextNumDocs) {
outputQueue.put(new DocGroups.TextBased(groupIds[groupCounter], buffer));
groupCounter++;
reset();
}
}

/* Called when we move to next group */
private void reset() {
nextNumDocs = calculateNextGroupDocCount();
buffer = new LineFileDocs.LineFileDoc[nextNumDocs];
}

private int calculateNextGroupDocCount() {
if (groupCounter == groupIds.length - 1) {
// The last group, we make sure the sum matches the total doc count
return numDocs - ((int) (groupCounter * docsPerGroupBlock));
} else {
// This will toggle between X and X+1 docs,
// converging over time on average to the
// floating point docsPerGroupBlock:
return ((int) ((1 + groupCounter) * docsPerGroupBlock)) - ((int) (groupCounter * docsPerGroupBlock));
}
}
}

/**
* The class represent one or more document groups
* Note only when we're consuming binary LFD there'll be more than one groups in
* the class
*/
public static abstract class DocGroups {
public abstract BytesRef getGroupId();
public abstract LineFileDocs.LineFileDoc getNextLFD();
public abstract int getNumOfDocsInGroup();
public abstract int getRemainingNumGroups();

/**
* A wrapper for singleLFD, when we don't use group fields
*/
static final class SingleLFD extends DocGroups {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we must also keep this, since we're going with the NoGroupImpl approach?

private final LineFileDocs.LineFileDoc lfd;

SingleLFD(LineFileDocs.LineFileDoc lfd) {
this.lfd = lfd;
}

@Override
public BytesRef getGroupId() {
throw new UnsupportedOperationException("We're not indexing groups");
}

@Override
public LineFileDocs.LineFileDoc getNextLFD() {
return lfd;
}

@Override
public int getNumOfDocsInGroup() {
return 1;
}

@Override
public int getRemainingNumGroups() {
return lfd.remainingDocs();
}
}

static final class TextBased extends DocGroups {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also elide this class up into super class, and have only a TextDocGroups?

private final BytesRef groupId;
private final LineFileDocs.LineFileDoc[] lfdArray;
private int cursor;

TextBased(BytesRef groupId, LineFileDocs.LineFileDoc[] lfdArray) {
this.groupId = groupId;
this.lfdArray = lfdArray;
}

@Override
public BytesRef getGroupId() {
return groupId;
}

@Override
public LineFileDocs.LineFileDoc getNextLFD() {
return lfdArray[cursor++];
}

@Override
public int getNumOfDocsInGroup() {
return lfdArray.length;
}

@Override
public int getRemainingNumGroups() {
if (cursor == lfdArray.length) {
return 0;
}
return 1;
}
}
}
}
Loading