Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SimpleS3Client::copy #1592

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
3 changes: 3 additions & 0 deletions docs/integration/simple-s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ $resource = \fopen('/path/to/cat/image.jpg', 'r');
$s3->upload('my-image-bucket', 'photos/cat_2.jpg', $resource);
$s3->upload('my-image-bucket', 'photos/cat_2.txt', 'I like this cat');

// Copy objects between buckets
$s3->copy('source-bucket', 'source-key', 'destination-bucket', 'destination-key');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Options parameters deserve a small documentation IMHO


// Check if a file exists
$s3->has('my-image-bucket', 'photos/cat_2.jpg'); // true

Expand Down
4 changes: 4 additions & 0 deletions src/Integration/Aws/SimpleS3/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

- Upgrade to `async-aws/s3` 2.0

### Added

- Added `SimpleS3Client::copy()` method

## 1.1.1

### Changed
Expand Down
2 changes: 1 addition & 1 deletion src/Integration/Aws/SimpleS3/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"require": {
"php": "^7.2.5 || ^8.0",
"ext-json": "*",
"async-aws/s3": "^2.0"
"async-aws/s3": "^2.1"
},
"autoload": {
"psr-4": {
Expand Down
117 changes: 117 additions & 0 deletions src/Integration/Aws/SimpleS3/src/SimpleS3Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,16 @@
use AsyncAws\Core\Stream\FixedSizeStream;
use AsyncAws\Core\Stream\ResultStream;
use AsyncAws\Core\Stream\StreamFactory;
use AsyncAws\S3\Input\AbortMultipartUploadRequest;
use AsyncAws\S3\Input\CompleteMultipartUploadRequest;
use AsyncAws\S3\Input\CopyObjectRequest;
use AsyncAws\S3\Input\CreateMultipartUploadRequest;
use AsyncAws\S3\Input\GetObjectRequest;
use AsyncAws\S3\Input\UploadPartCopyRequest;
use AsyncAws\S3\S3Client;
use AsyncAws\S3\ValueObject\CompletedMultipartUpload;
use AsyncAws\S3\ValueObject\CompletedPart;
use AsyncAws\S3\ValueObject\CopyPartResult;

/**
* A simplified S3 client that hides some of the complexity of working with S3.
Expand Down Expand Up @@ -47,6 +53,117 @@
return $this->objectExists(['Bucket' => $bucket, 'Key' => $key])->isSuccess();
}

/**
* @param array{
* ACL?: \AsyncAws\S3\Enum\ObjectCannedACL::*,
* CacheControl?: string,
* ContentLength?: int,
* ContentType?: string,
* Metadata?: array<string, string>,
* PartSize?: positive-int,
* Concurrency?: positive-int,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use int, this syntax is not a standard, therefore not understood by all IDE.

* } $options
*/
public function copy(string $srcBucket, string $srcKey, string $destBucket, string $destKey, array $options = []): void
{
$megabyte = 1024 * 1024;
if (!empty($options['ContentLength'])) {
$contentLength = (int) $options['ContentLength'];
unset($options['ContentLength']);
} else {
$contentLength = (int) $this->headObject(['Bucket' => $srcBucket, 'Key' => $srcKey])->getContentLength();
}

$concurrency = (int) ($options['Concurrency'] ?? 10);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to cast, as it's an int by contract. Let people deal with PHP runtime errors if they misuse the method.

unset($options['Concurrency']);
if ($concurrency < 1) {
$concurrency = 1;
}

/*
* The maximum number of parts is 10.000. The partSize must be a power of 2.
* We default this to 64MB per part. That means that we only support to copy
* files smaller than 64 * 10 000 = 640GB. If you are coping larger files,
* please set PartSize to a higher number, like 128, 256 or 512. (Max 4096).
*/
$partSize = ($options['PartSize'] ?? 64) * $megabyte;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If PartSize is not defined, I'd rather default to max(64 * $megabyte , 2 ** ceil(log($contentLength / 10000, 2)));

unset($options['PartSize']);

// If file is less than 5GB, use normal atomic copy
if ($contentLength < 5120 * $megabyte) {
$this->copyObject(
CopyObjectRequest::create(
array_merge($options, ['Bucket' => $destBucket, 'Key' => $destKey, 'CopySource' => "{$srcBucket}/{$srcKey}"])
)
);

return;
}

$uploadId = $this->createMultipartUpload(
CreateMultipartUploadRequest::create(
array_merge($options, ['Bucket' => $destBucket, 'Key' => $destKey])
)
)->getUploadId();

$partNumber = 1;
$startByte = 0;
$parts = [];
while ($startByte < $contentLength) {
$parallelChunks = $concurrency;
$responses = [];
while ($startByte < $contentLength && $parallelChunks > 0) {
$endByte = min($startByte + $partSize, $contentLength) - 1;
$responses[$partNumber] = $this->uploadPartCopy(
UploadPartCopyRequest::create([

Check failure on line 118 in src/Integration/Aws/SimpleS3/src/SimpleS3Client.php

View workflow job for this annotation

GitHub Actions / Psalm

InvalidArgument

src/Integration/Aws/SimpleS3/src/SimpleS3Client.php:118:51: InvalidArgument: Argument 1 of AsyncAws\S3\Input\UploadPartCopyRequest::create expects AsyncAws\S3\Input\UploadPartCopyRequest|array{'@region'?: null|string, Bucket?: string, CopySource?: string, CopySourceIfMatch?: null|string, CopySourceIfModifiedSince?: DateTimeImmutable|null|string, CopySourceIfNoneMatch?: null|string, CopySourceIfUnmodifiedSince?: DateTimeImmutable|null|string, CopySourceRange?: null|string, CopySourceSSECustomerAlgorithm?: null|string, CopySourceSSECustomerKey?: null|string, CopySourceSSECustomerKeyMD5?: null|string, ExpectedBucketOwner?: null|string, ExpectedSourceBucketOwner?: null|string, Key?: string, PartNumber?: int, RequestPayer?: 'requester'|null, SSECustomerAlgorithm?: null|string, SSECustomerKey?: null|string, SSECustomerKeyMD5?: null|string, UploadId?: string}, but array{Bucket: string, CopySource: non-empty-string, CopySourceRange: non-empty-string, Key: string, PartNumber: int<1, max>, UploadId: null|string} provided (see https://psalm.dev/004)
stof marked this conversation as resolved.
Show resolved Hide resolved
'Bucket' => $destBucket,
'Key' => $destKey,
'UploadId' => $uploadId,
'CopySource' => "{$srcBucket}/{$srcKey}",
'CopySourceRange' => "bytes={$startByte}-{$endByte}",
'PartNumber' => $partNumber,
])
);

$startByte += $partSize;
++$partNumber;
--$parallelChunks;
}
$error = null;
foreach ($responses as $idx => $response) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not efficient. We define 10 concurents upload, but in fact, we are uploading 10, waiting for the 10 to finish, and uploading 10 more, ...
That means:

  • we are waiting for the 10th to finish before starting the 9 previous.
  • if 1 upload takes time to finish, we don't leverage the 9 others.

A Beter implementation would be to use a pool (or buffer), to fill the pool with 10 responses, and once 1 response is over, add a new one before checking the other responses.

Better: leverage async processing, by not waiting for the end of a response, but checking if the response is over, if not check the next one.

while not all part uploaded
  while pool not full
    => start new upload
  while pool full
    foreach response in pull
      if response not finished
        continue
      else
        => process response
        => remove response from pool

foreach response in pull
  => wait for response to finish
  => process response
  => remove response from pool

try {
$copyPartResult = $response->getCopyPartResult();
$parts[] = new CompletedPart(['ETag' => $copyPartResult->getEtag(), 'PartNumber' => $idx]);

Check failure on line 136 in src/Integration/Aws/SimpleS3/src/SimpleS3Client.php

View workflow job for this annotation

GitHub Actions / Psalm

PossiblyNullReference

src/Integration/Aws/SimpleS3/src/SimpleS3Client.php:136:78: PossiblyNullReference: Cannot call method getEtag on possibly null value (see https://psalm.dev/083)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The weird thing about the PossiblyNullReference error reported by Psalm here is that https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html#API_UploadPartCopy_ResponseElements documents CopyPartResult as required in the output. So it is weird that the operation was generated with a nullable type. Maybe the AWS SDK has metadata that does not match the documentation there.

Copy link
Contributor Author

@nmuntyanov nmuntyanov Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest reporting this to AWS as it looks like a mismatch between the human-readable documentation and the machine-readable data used to generate the SDKs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@nmuntyanov nmuntyanov Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what should I do?
Add additional checking? Or will wait for AWS fix?
We are waiting for this functionality

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stof any updates on this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @stof
I wanted to bring to your attention an ongoing issue regarding AWS that was posted approximately 4 months ago. It seems that progress on resolving this matter has been slower than anticipated.

Considering the delay in fixing the manifests, I'd like to propose a potential solution. Would it be feasible to remove the annotations related to this issue from the code and handle the error suppression in the Psalm baseline instead? I've come across a few similar issues in the baseline that could potentially benefit from this approach.

I believe this adjustment could provide a temporary workaround while we await a resolution from the AWS side. However, I'm open to discussing alternative solutions or any concerns you may have regarding this proposal.

} catch (\Throwable $e) {
$error = $e;

break;
}
Comment on lines +136 to +140
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could move the try/catch outside the loop for more readability, and removing the error variable.

try {
    foreach ($responses as $idx => $response) {
        $parts[] = new CompletedPart(['ETag' => $response->getCopyPartResult()->getEtag(), 'PartNumber' => $idx]);
} catch (\Throwable $e) {
   foreach ($responses as $response) {
       try {
           $response->cancel();
       } catch (\Throwable $e) {
           continue;
       }
    }
    $this->abortMultipartUpload(AbortMultipartUploadRequest::create(['Bucket' => $destBucket, 'Key' => $destKey, 'UploadId' => $uploadId]));
}

}
if ($error) {
foreach ($responses as $response) {
try {
$response->cancel();
} catch (\Throwable $e) {
continue;
}
}
$this->abortMultipartUpload(AbortMultipartUploadRequest::create(['Bucket' => $destBucket, 'Key' => $destKey, 'UploadId' => $uploadId]));

Check failure on line 151 in src/Integration/Aws/SimpleS3/src/SimpleS3Client.php

View workflow job for this annotation

GitHub Actions / Psalm

InvalidArgument

src/Integration/Aws/SimpleS3/src/SimpleS3Client.php:151:81: InvalidArgument: Argument 1 of AsyncAws\S3\Input\AbortMultipartUploadRequest::create expects AsyncAws\S3\Input\AbortMultipartUploadRequest|array{'@region'?: null|string, Bucket?: string, ExpectedBucketOwner?: null|string, Key?: string, RequestPayer?: 'requester'|null, UploadId?: string}, but array{Bucket: string, Key: string, UploadId: null|string} provided (see https://psalm.dev/004)

throw $error;
}
}

$this->completeMultipartUpload(
CompleteMultipartUploadRequest::create([

Check failure on line 158 in src/Integration/Aws/SimpleS3/src/SimpleS3Client.php

View workflow job for this annotation

GitHub Actions / Psalm

InvalidArgument

src/Integration/Aws/SimpleS3/src/SimpleS3Client.php:158:52: InvalidArgument: Argument 1 of AsyncAws\S3\Input\CompleteMultipartUploadRequest::create expects AsyncAws\S3\Input\CompleteMultipartUploadRequest|array{'@region'?: null|string, Bucket?: string, ChecksumCRC32?: null|string, ChecksumCRC32C?: null|string, ChecksumSHA1?: null|string, ChecksumSHA256?: null|string, ExpectedBucketOwner?: null|string, Key?: string, MultipartUpload?: AsyncAws\S3\ValueObject\CompletedMultipartUpload|array<array-key, mixed>|null, RequestPayer?: 'requester'|null, SSECustomerAlgorithm?: null|string, SSECustomerKey?: null|string, SSECustomerKeyMD5?: null|string, UploadId?: string}, but array{Bucket: string, Key: string, MultipartUpload: AsyncAws\S3\ValueObject\CompletedMultipartUpload, UploadId: null|string} provided (see https://psalm.dev/004)
'Bucket' => $destBucket,
'Key' => $destKey,
'UploadId' => $uploadId,
'MultipartUpload' => new CompletedMultipartUpload(['Parts' => $parts]),
])
);
}

/**
* @param string|resource|(callable(int): string)|iterable<string> $object
* @param array{
Expand Down
71 changes: 71 additions & 0 deletions src/Integration/Aws/SimpleS3/tests/Unit/SimpleS3ClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@

use AsyncAws\Core\Credentials\NullProvider;
use AsyncAws\Core\Test\ResultMockFactory;
use AsyncAws\S3\Input\CompleteMultipartUploadRequest;
use AsyncAws\S3\Result\CreateMultipartUploadOutput;
use AsyncAws\S3\Result\HeadObjectOutput;
use AsyncAws\S3\Result\UploadPartCopyOutput;
use AsyncAws\S3\ValueObject\CopyPartResult;
use AsyncAws\SimpleS3\SimpleS3Client;
use PHPUnit\Framework\TestCase;
use Symfony\Component\HttpClient\MockHttpClient;
Expand Down Expand Up @@ -137,6 +141,73 @@ public function testUploadSmallFileEmptyClosure()
});
}

public function testCopySmallFileWithProvidedLength()
{
$megabyte = 1024 * 1024;
$s3 = $this->getMockBuilder(SimpleS3Client::class)
->disableOriginalConstructor()
->onlyMethods(['createMultipartUpload', 'abortMultipartUpload', 'copyObject', 'completeMultipartUpload'])
->getMock();

$s3->expects(self::never())->method('createMultipartUpload');
$s3->expects(self::never())->method('abortMultipartUpload');
$s3->expects(self::never())->method('completeMultipartUpload');
$s3->expects(self::once())->method('copyObject');

$s3->copy('bucket', 'robots.txt', 'bucket', 'copy-robots.txt', ['ContentLength' => 5 * $megabyte]);
}

public function testCopySmallFileWithoutProvidedLength()
{
$megabyte = 1024 * 1024;
$s3 = $this->getMockBuilder(SimpleS3Client::class)
->disableOriginalConstructor()
->onlyMethods(['createMultipartUpload', 'abortMultipartUpload', 'copyObject', 'completeMultipartUpload', 'headObject'])
->getMock();

$s3->expects(self::never())->method('createMultipartUpload');
$s3->expects(self::never())->method('abortMultipartUpload');
$s3->expects(self::never())->method('completeMultipartUpload');
$s3->expects(self::once())->method('copyObject');
$s3->expects(self::once())->method('headObject')
->willReturn(ResultMockFactory::create(HeadObjectOutput::class, ['ContentLength' => 50 * $megabyte]));

$s3->copy('bucket', 'robots.txt', 'bucket', 'copy-robots.txt');
}

public function testCopyLargeFile()
{
$megabyte = 1024 * 1024;
$uploadedParts = 0;
$completedParts = 0;

$s3 = $this->getMockBuilder(SimpleS3Client::class)
->disableOriginalConstructor()
->onlyMethods(['createMultipartUpload', 'abortMultipartUpload', 'copyObject', 'completeMultipartUpload', 'uploadPartCopy'])
->getMock();

$s3->expects(self::once())->method('createMultipartUpload')
->willReturn(ResultMockFactory::create(CreateMultipartUploadOutput::class, ['UploadId' => '4711']));
$s3->expects(self::never())->method('abortMultipartUpload');
$s3->expects(self::never())->method('copyObject');
$s3->expects(self::any())->method('uploadPartCopy')
->with(self::callback(function () use (&$uploadedParts) {
++$uploadedParts;

return true;
}))
->willReturn(ResultMockFactory::create(UploadPartCopyOutput::class, ['copyPartResult' => new CopyPartResult(['ETag' => 'etag-4711'])]));
$s3->expects(self::once())->method('completeMultipartUpload')->with(self::callback(function (CompleteMultipartUploadRequest $request) use (&$completedParts) {
$completedParts = \count($request->getMultipartUpload()->getParts());

return true;
}));

$s3->copy('bucket', 'robots.txt', 'bucket', 'copy-robots.txt', ['ContentLength' => 6144 * $megabyte]);

self::assertEquals($completedParts, $uploadedParts);
}

private function assertSmallFileUpload(\Closure $callback, string $bucket, string $file, $object): void
{
$s3 = $this->getMockBuilder(SimpleS3Client::class)
Expand Down
Loading