Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/develop' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
nwoodward committed May 22, 2024
2 parents c5ab249 + 725bb42 commit 3f8c795
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class BitCheckExecutionState {
private StorageProviderType storageProviderType;
private String manifestChecksum;
private String storeChecksum;

private String contentSize;
private Map<String, String> contentProperties;
private BitLogStore bitLogStore;
private TaskQueue bitErrorQueue;
Expand Down Expand Up @@ -134,6 +136,14 @@ public void setStoreChecksum(String storeChecksum) {
this.storeChecksum = storeChecksum;
}

public String getContentSize() {
return contentSize;
}

public void setContentSize(String contentSize) {
this.contentSize = contentSize;
}

public void setContentProperties(Map<String, String> contentProperties) {
this.contentProperties = contentProperties;
}
Expand Down
22 changes: 13 additions & 9 deletions workman/src/main/java/org/duracloud/mill/bit/BitCheckHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,15 @@ public final boolean handle(BitCheckExecutionState bitCheckState) throws TaskExe
}

writeResult(result.getResult(),
bitCheckState.getManifestChecksum(),
bitCheckState.getStoreChecksum(),
contentChecksum,
bitCheckState.getBitLogStore(),
bitCheckState.getStorageProviderType(),
bitCheckState.getTask(),
result.getMessage());
bitCheckState.getManifestChecksum(),
bitCheckState.getStoreChecksum(),
contentChecksum,
bitCheckState.getContentSize(),
bitCheckState.getBitLogStore(),
bitCheckState.getStorageProviderType(),
bitCheckState.getTask(),
result.getMessage()
);
}
return true;
} else {
Expand Down Expand Up @@ -152,6 +154,7 @@ private void writeResult(final BitIntegrityResult result,
final String manifestChecksum,
final String storeChecksum,
final String contentChecksum,
final String contentSize,
final BitLogStore bitLogStore,
final StorageProviderType storageProviderType,
final BitIntegrityCheckTask bitTask,
Expand Down Expand Up @@ -187,7 +190,7 @@ public String retry() throws Exception {
String message = MessageFormat
.format("Checksum result={0} account={1} storeId={2} storeType={3} space={4}"
+ " contentId={5} manifestChecksum={6} storeChecksum={7}"
+ " contentChecksum={8}",
+ " contentChecksum={8} contentSize={9}",
result,
bitTask.getAccount(),
bitTask.getStoreId(),
Expand All @@ -196,7 +199,8 @@ public String retry() throws Exception {
bitTask.getContentId(),
manifestChecksum,
storeChecksum,
contentChecksum);
contentChecksum,
contentSize);

if (result == BitIntegrityResult.SUCCESS) {
log.info(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,14 @@ protected void executeImpl() throws TaskExecutionFailedException {
checksumHelper,
manifestStore);

Map<String, String> contentProperties = getContentProperties();
state.setContentProperties(contentProperties);
String storeChecksum = null;
if (contentProperties != null) {
storeChecksum = contentProperties.get(StorageProvider.PROPERTIES_CONTENT_CHECKSUM);
final var cprops = getContentProperties();
state.setContentProperties(cprops);
if (cprops != null) {
state.setStoreChecksum(cprops.get(StorageProvider.PROPERTIES_CONTENT_CHECKSUM));
state.setContentSize(cprops.get(StorageProvider.PROPERTIES_CONTENT_SIZE));
}

state.setStoreChecksum(storeChecksum);

String manifestChecksum = getManifestChecksum();
state.setManifestChecksum(manifestChecksum);
state.setManifestChecksum(getManifestChecksum());
boolean handled = false;
for (BitCheckHandler handler : getHandlers()) {
if (handler.handle(state)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.ws.rs.core.HttpHeaders;

Expand Down Expand Up @@ -48,6 +50,8 @@ public class DuplicationTaskProcessor extends TaskProcessorBase {
private File workDir;
private ManifestStore manifestStore;

private List<File> cachedFiles = new ArrayList<>();

private final Logger log =
LoggerFactory.getLogger(DuplicationTaskProcessor.class);

Expand All @@ -64,6 +68,10 @@ public DuplicationTaskProcessor(DuplicationTask dupTask,
this.manifestStore = manifestStore;
}

public List<File> getCachedFiles() {
return this.cachedFiles;
}

@Override
protected void executeImpl() throws TaskExecutionFailedException {
// Read task
Expand Down Expand Up @@ -389,45 +397,53 @@ private void duplicateContent(final String spaceId,
ChecksumUtil checksumUtil = new ChecksumUtil(MD5);
boolean localChecksumMatch = false;
int attempt = 0;

File localFile = null;
while (!localChecksumMatch && attempt < 3) {
// Get content stream
try (InputStream sourceStream = getSourceContent(spaceId, contentId)) {
// Cache content locally
localFile = cacheContent(sourceStream);
// Check content
String localChecksum = checksumUtil.generateChecksum(localFile);
if (sourceChecksum.equals(localChecksum)) {
localChecksumMatch = true;
} else {
try {
while (!localChecksumMatch && attempt < 3) {
// Get content stream
try (InputStream sourceStream = getSourceContent(spaceId, contentId)) {
// Cache content locally
localFile = cacheContent(sourceStream);
// Check content
String localChecksum = checksumUtil.generateChecksum(localFile);
if (sourceChecksum.equals(localChecksum)) {
localChecksumMatch = true;
} else {
// if the local checksums don't match we need to clean up the local file
// since the next attempt will use a different file path.
cleanup(localFile);
}
} catch (Exception e) {
//if the local file failed to be cached for the checksum generation failed
//we'll need to remove the local file since it will be restreamed to a different
//file.
cleanup(localFile);
log.warn("Error generating checksum for source content: " + e.getMessage(), e);
}
} catch (Exception e) {
log.warn("Error generating checksum for source content: " + e.getMessage(), e);
attempt++;
}
attempt++;
}

// Put content
if (localChecksumMatch) {
putDestinationContent(spaceId,
contentId,
sourceChecksum,
sourceProperties,
localFile);
log.info(
"Successfully duplicated id={} dup_size={} space={} account={}",
contentId,
localFile.length(),
spaceId, dupTask.getAccount());
} else {
// Put content
if (localChecksumMatch) {
putDestinationContent(spaceId,
contentId,
sourceChecksum,
sourceProperties,
localFile);
log.info(
"Successfully duplicated id={} dup_size={} space={} account={}",
contentId,
localFile.length(),
spaceId, dupTask.getAccount());
} else {
String msg = "Unable to retrieve content which matches the" +
" expected source checksum of: " + sourceChecksum;
throw new DuplicationTaskExecutionFailedException(buildFailureMessage(msg));
}

} finally {
cleanup(localFile);
String msg = "Unable to retrieve content which matches the" +
" expected source checksum of: " + sourceChecksum;
throw new DuplicationTaskExecutionFailedException(buildFailureMessage(msg));
}
cleanup(localFile);
}

/*
Expand Down Expand Up @@ -458,16 +474,12 @@ private File cacheContent(InputStream inStream)
File localFile = null;
try {
localFile = File.createTempFile("content-item", ".tmp", workDir);
this.cachedFiles.add(localFile);
try (OutputStream outStream = FileUtils.openOutputStream(localFile)) {
IOUtils.copy(inStream, outStream);
}
inStream.close();
} catch (IOException e) {

if (localFile != null) {
cleanup(localFile);
}

} catch (Exception e) {
cleanup(localFile);
String msg = "Unable to cache content file due to: " + e.getMessage();
throw new DuplicationTaskExecutionFailedException(buildFailureMessage(msg), e);
}
Expand Down Expand Up @@ -507,15 +519,16 @@ public String retry() throws Exception {
}
});
} catch (Exception e) {
cleanup(file);
String msg = "Error attempting to add destination content: " + e.getMessage();
throw new DuplicationTaskExecutionFailedException(buildFailureMessage(msg), e);
}
}

private void cleanup(File file) {
try {
FileUtils.forceDelete(file);
if (file != null && file.exists()) {
FileUtils.forceDelete(file);
}
} catch (IOException e) {
log.info("Unable to delete temp file: " + file.getAbsolutePath() +
" due to: " + e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,4 +593,116 @@ public void testExecuteChecksumMismatch() throws Exception {
taskProcessor.execute();
}

@Test
public void testDuplicationFailsDueToCachedFileChecksumNotMatchingDestinationChecksum() throws Exception {
// Check space
destStore.createSpace(EasyMock.eq(spaceId));
EasyMock.expectLastCall().once();

// Prepare source content
String content = "source-content";
ChecksumUtil checksumUtil = new ChecksumUtil(ChecksumUtil.Algorithm.MD5);
final String srcChecksum = checksumUtil.generateChecksum(content);
final String destChecksum = "changed-checksum";
final String mimetype = "text/plain";

// Source properties
Map<String, String> srcProps = new HashMap<>();
srcProps.put(StorageProvider.PROPERTIES_CONTENT_CHECKSUM, srcChecksum);
srcProps.put(StorageProvider.PROPERTIES_CONTENT_MIMETYPE, mimetype);
EasyMock.expect(srcStore.getContentProperties(EasyMock.eq(spaceId),
EasyMock.eq(contentId)))
.andReturn(srcProps);

Map<String, String> destProps = new HashMap<>();
destProps.put(StorageProvider.PROPERTIES_CONTENT_CHECKSUM, destChecksum);
destProps.put(StorageProvider.PROPERTIES_CONTENT_MIMETYPE, mimetype);
EasyMock.expect(destStore.getContentProperties(EasyMock.eq(spaceId),
EasyMock.eq(contentId)))
.andReturn(destProps);

// Get source content
InputStream contentStream = IOUtil.writeStringToStream(content);
RetrievedContent retrievedContent = new RetrievedContent();
retrievedContent.setContentStream(contentStream);
EasyMock.expect(srcStore.getContent(EasyMock.eq(spaceId),
EasyMock.eq(contentId)))
.andReturn(retrievedContent);

// Add dest content
EasyMock.expect(destStore.addContent(EasyMock.eq(spaceId),
EasyMock.eq(contentId),
EasyMock.eq(mimetype),
EasyMock.eq(srcProps),
EasyMock.eq((long) content.length()),
EasyMock.eq(srcChecksum),
EasyMock.<InputStream>anyObject()))
.andReturn("bad-checksum").times(4);

replayMocks();

try {
taskProcessor.execute();
fail("Expected exception not thrown.");
} catch (DuplicationTaskExecutionFailedException ex) {
assertTrue(true);
}

assertTrue(!taskProcessor.getCachedFiles().isEmpty());
for (File file : taskProcessor.getCachedFiles()) {
assertTrue(!file.exists());
}
}

@Test
public void testDuplicationFailsDueToCachedFileChecksumDoesNotMatchSourceChecksum() throws Exception {
// Check space
destStore.createSpace(EasyMock.eq(spaceId));
EasyMock.expectLastCall().once();

// Prepare source content
String content = "source-content";
ChecksumUtil checksumUtil = new ChecksumUtil(ChecksumUtil.Algorithm.MD5);
final String srcChecksum = checksumUtil.generateChecksum(content);
final String destChecksum = "changed-checksum";
final String mimetype = "text/plain";

// Source properties
Map<String, String> srcProps = new HashMap<>();
srcProps.put(StorageProvider.PROPERTIES_CONTENT_CHECKSUM, srcChecksum);
srcProps.put(StorageProvider.PROPERTIES_CONTENT_MIMETYPE, mimetype);
EasyMock.expect(srcStore.getContentProperties(EasyMock.eq(spaceId),
EasyMock.eq(contentId)))
.andReturn(srcProps);

Map<String, String> destProps = new HashMap<>();
destProps.put(StorageProvider.PROPERTIES_CONTENT_CHECKSUM, destChecksum);
destProps.put(StorageProvider.PROPERTIES_CONTENT_MIMETYPE, mimetype);
EasyMock.expect(destStore.getContentProperties(EasyMock.eq(spaceId),
EasyMock.eq(contentId)))
.andReturn(destProps);

// Get mismatching source content
InputStream contentStream = IOUtil.writeStringToStream(content + "changed");
RetrievedContent retrievedContent = new RetrievedContent();
retrievedContent.setContentStream(contentStream);
EasyMock.expect(srcStore.getContent(EasyMock.eq(spaceId),
EasyMock.eq(contentId)))
.andReturn(retrievedContent).times(3);

replayMocks();

try {
taskProcessor.execute();
fail("Expected exception not thrown.");
} catch (DuplicationTaskExecutionFailedException ex) {
assertTrue(true);
}

assertTrue(!taskProcessor.getCachedFiles().isEmpty());
for (File file : taskProcessor.getCachedFiles()) {
assertTrue(!file.exists());
}
}

}

0 comments on commit 3f8c795

Please sign in to comment.