Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a MultipartUploadStream IO object #46

Merged
merged 30 commits into from
Nov 16, 2023
Merged

Conversation

nantiamak
Copy link
Collaborator

This PR adds a MultipartUploadStream IO object in CloudStore.jl, which facilitates uploading a stream of data chunks to blob storage. After creating a MultipartUploadStream object, we can repeatedly call write() and upload each part till there are no more chunks, when we can complete uploading and close the stream.

@nantiamak
Copy link
Collaborator Author

@Drvi I pushed a first version of the MultipartUploadStream struct with two tests, one for S3 and one for Azure. I guess we could also integrate with GzipCompressorStream, but I think it's not necessary for starters.

@nantiamak nantiamak requested a review from Drvi October 17, 2023 12:22
@codecov
Copy link

codecov bot commented Oct 17, 2023

Codecov Report

Attention: 3 lines in your changes are missing coverage. Please review.

Comparison is base (328b427) 83.13% compared to head (0790f79) 84.21%.

❗ Current head 0790f79 differs from pull request most recent head c18952b. Consider uploading reports for the commit c18952b to get more accurate results

Files Patch % Lines
src/object.jl 94.91% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main      #46      +/-   ##
==========================================
+ Coverage   83.13%   84.21%   +1.07%     
==========================================
  Files           7        7              
  Lines         587      646      +59     
==========================================
+ Hits          488      544      +56     
- Misses         99      102       +3     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Member

@Drvi Drvi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey Nantia and thanks for the PR. I think the implementation would work, but it will probably be noticeably slower than CloudStore.put because we don't attempt any parallelism here. Even though I think your main concern was memory overhead, it shouldn't be that hard to get even faster than CloudStore.put since we should be able to overlap arrow serialization and uploading.

An approach inspired by PrefetchedDownloadStream would be:
a) When creating MultipartUploadStream, you'd also spawn multiple tasks
b) The spawned tasks would try to take buffers from a new Channel field we'd add to the MultipartUploadStream, and upload them
c) When we call Base.write(io::MultipartUploadStream, bytes) we put bytes to the Channel and let the tasks handle it.
d) the close method would have to block until all submitted buffers were successfully uploaded and shutdown the tasks.

For bonus points, once we're done uploading a buffer, we'd give it back to the caller, maybe to a user-provided channel, so you could safely reuse them.

@nantiamak
Copy link
Collaborator Author

Hi @Drvi, thanks for the feedback! It's very helpful.
One question I have is about the number of spawned tasks for the upload. For download/prefetching this is dependent on prefetch_multipart_size and the size of input, but for upload we cannot know upfront the total upload size. How should the number of spawned tasks be determined in this case?
I think this is also related to how we'll know when all the tasks are done and the parts uploaded, as for _download_task we decrease io.cond.ntasks by 1 each time a task is done, if I understand correctly.

@Drvi
Copy link
Member

Drvi commented Oct 23, 2023

@nantiamak Ah, good point. I think the design space is a bit larger than I initially thought:) A couple of options

How many tasks to spawn?

  • Pre-spawn a fixed amount / an amount based on some estimate input size. Relatively easy, but you might overspawn for small files...
  • Start spawning tasks dynamically in write up to a limit as needed (e.g. uploading tasks grab a semaphore, if there is a new buffer to upload and semafore if not fully acquired we check if we spawned max of tasks). This is probably trickier to implement.
  • Or we can do that the CloudStore.jl.put does and just always spawn a new task for each buffer during write, probably the simplest

and we should target each buffer being 8MiB (MULTIPART_SIZE). For PrefetchedDownloadStream a had to experiment quite a bit on EC2 to figure out which buffer sizes and number of task combos worked well, usually it was a good idea to follow the behavior of CloudStore.get.

I think this is also related to how we'll know when all the tasks are done and the parts uploaded

Yes, so for PrefetchedDownloadStream I used TaskCondition as a counter, but since you are already using the OrderedSynchronizer, I think you could use their counter that is used internally (https://github.com/JuliaServices/ConcurrentUtilities.jl/blob/main/src/synchronizer.jl#L120C1-L120C20) together with the condition. Maybe we should rethink the API of OrderedSynchronizer so we wouldn't have to touch internals like this...

Copy link
Member

@Drvi Drvi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey Nantia, exciting progress! Left a couple of concurrency related notes. Thanks for working on this!

src/object.jl Outdated Show resolved Hide resolved
src/object.jl Outdated Show resolved Hide resolved
src/object.jl Outdated
Comment on lines 485 to 488
if x.ntasks == 0
break
end
wait(x.cond_wait)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think before we check ntasks values, we should confirm that we're in a good state. If an upload task failed for whatever reason before it decremented here and if it notified before we grabbed the lock, we'd wait forever. The upload task grabs the lock to error notify -- if you in the same block also set some state that singals error, then it would be safe, because here in close
a) we didn't held the lock, so we'd wait for the upload task to error notify and set the error state, then we'd grab the lock and checked the error state
b) we did held the lock, so when we'd get to wait and the upload task would succesfully error-notify us

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added an is_error flag in MultipartUploadStream, which I'm setting to true here, when catching an exception. I'm looking for a test case checking this code path. Any thoughts?

src/object.jl Outdated
end
end
# atomically increment our part counter
@atomic io.cur_part_id += 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we increment after we call put!, I think it could happen that two tasks could reach the put! call with the same value

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed I've seen this happening. On the other hand if we increment before put! two tasks might have incremented io_cur_part_id before reaching put! and io.sync.i, which is incremented inside put! will be != io.cur_part_id` resulting in a deadlock.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed this by assigning io.cur_part_idto a variable and passing it as a parameter to _upload_task rather than getting its value from io.cur_part_id directly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, making this the callers problem seems like a nice solution to me 👍

@nantiamak
Copy link
Collaborator Author

CI/Julia 1.6 is failing with UndefVarError: @atomic not defined. How to go past this error? Is it an incompatibility issue with older Julia versions?

@nantiamak
Copy link
Collaborator Author

@Drvi About your comment "we should target each buffer being 8MiB (MULTIPART_SIZE)", the buffer is currently constructed outside write(). Inside write we only put it in the channel. Do you mean that buffers should be created inside write() or that wherever they're created, the batch size passed to a buffer should be of size MULTIPART_SIZE?

@Drvi
Copy link
Member

Drvi commented Oct 31, 2023

Hey @nantiamak, sorry for the delay.

CI/Julia 1.6 is failing with UndefVarError: @atomic not defined. How to go past this error? Is it an incompatibility issue with older Julia versions?

You can see e.g. in the OrderedSynchronizer code how Jacob dealt with the issue -- https://github.com/JuliaServices/ConcurrentUtilities.jl/blob/main/src/synchronizer.jl
There are @static if VERSION < v"1.7" version checks which are using the older API closed::Threads.Atomic{Bool} instead of @atomic closed::Bool etc.

@Drvi About your comment "we should target each buffer being 8MiB (MULTIPART_SIZE)", the buffer is currently constructed outside write(). Inside write we only put it in the channel. Do you mean that buffers should be created inside write() or that wherever they're created, the batch size passed to a buffer should be of size MULTIPART_SIZE?

I meant in our usage code we should target cca 8MiB chunked to be given to the MultipartUploadStream. On the other hand, I agree it would be useful for the MultipartUploadStream to have the ability do the chunking internally, but I don't know how to do that without being more complicated than just chunking externally. I'd say this is an open design question worth some experimenting.

@nantiamak
Copy link
Collaborator Author

@Drvi Thanks for the pointer!
Indeed, I agree that chunking is more straightforward to be done externally for the time being.

@nantiamak
Copy link
Collaborator Author

@Drvi Could you please take another look on this PR to see if we can merge it?

@Drvi
Copy link
Member

Drvi commented Nov 13, 2023

@nantiamak Sorry for the late reply, in short:

  • We should either document that this should be called behind a semaphore or implement the semaphore throttling ourselves (put! does throttle the number of tasks in flight and we should not deviate from that)
  • I think we should document that the chunks need to be written in order, which makes me think that we don't really need the OrderedSynchronizer, when we get the (part_n, parteTag) all we need to do is io.eTags[part_n] = parteTag behind the lock (and making sure the io.eTags is grown as needed).
  • Our close method does two things -- it waits for the submitted chunks to be written and then it closes the channel and calls completeMultipartUpload. I think we should separate the two into wait and close.
  • The wait should actually throw the error if there was one.

Also, could you try larger files for the benchmark, say 700MiB, and use a semaphore?

notify(io.cond_wait)
end
catch e
isopen(io.upload_queue) && close(io.upload_queue, e)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Drvi Could you help me with adding a test case that triggers an error during uploading and covers these lines? I tried with uploading a part that was smaller than the minimum S3 size and got the following error, but it didn't exercise this catch block. 🤔

HTTP.Exceptions.StatusError(400, "POST", "/jl-minio-23869/test.csv?uploadId=7e57586f-e75d-4cf6-a14e-f480ebe655cd", HTTP.Messages.Response:
  """
  HTTP/1.1 400 Bad Request
  Accept-Ranges: bytes
  Cache-Control: no-cache
  Content-Length: 364
  Content-Security-Policy: block-all-mixed-content
  Content-Type: application/xml
  Server: MinIO
  Strict-Transport-Security: max-age=31536000; includeSubDomains
  Vary: Origin, Accept-Encoding
  X-Accel-Buffering: no
  X-Amz-Request-Id: 179736FE62556FB8
  X-Content-Type-Options: nosniff
  X-Xss-Protection: 1; mode=block
  Date: Mon, 13 Nov 2023 15:04:10 GMT
 
  <?xml version="1.0" encoding="UTF-8"?>
  <Error><Code>EntityTooSmall</Code><Message>Your proposed upload is smaller than the minimum allowed object size.</Message><Key>test.csv</Key><BucketName>jl-minio-23869</BucketName><Resource>/jl-minio-23869/test.csv</Resource><RequestId>179736FE62556FB8</RequestId><HostId>7b2b4a12-8baf-4d22-bc30-e4c3f2f12bff</HostId></Error>""")
  Stacktrace:
    [1] (::HTTP.ConnectionRequest.var"#connections#4"{HTTP.ConnectionRequest.var"#connections#1#5"{HTTP.TimeoutRequest.var"#timeouts#3"{HTTP.TimeoutRequest.var"#timeouts#1#4"{HTTP.ExceptionRequest.var"#exceptions#2"
...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you had a good idea, try writing a small file and then testing, that the channel is closed and that the exp field is populated.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that, but when the errors happens I'm getting Exiting on signal: TERMINATED and the test terminates before I get to check the channel and the exception.

src/object.jl Outdated
Base.@lock io.cond_wait begin
while true
io.ntasks == 0 && !io.is_error && break
if io.is_error
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Drvi I added this if check here to account for the case when the upload task has error notified before we enter wait() here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you don't need is_error field, checking for !isnothing(io.exc). Then you can just check for error first and only if we don't throw, you'd io.ntasks == 0 && break

src/object.jl Outdated
Base.@lock io.cond_wait begin
while true
io.ntasks == 0 && !io.is_error && break
if io.is_error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you don't need is_error field, checking for !isnothing(io.exc). Then you can just check for error first and only if we don't throw, you'd io.ntasks == 0 && break

notify(io.cond_wait)
end
catch e
isopen(io.upload_queue) && close(io.upload_queue, e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you had a good idea, try writing a small file and then testing, that the channel is closed and that the exp field is populated.

end
end
catch e
rethrow()
Copy link
Member

@Drvi Drvi Nov 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are missing a way to signal to S3/Blobs that we're aborting the multi-part upload a la AbortMultipartUpload... can you open an issue about it?

Also, I think we should consider the following syntax to the user

MultipartUploadStream(...) do io 
    ...
end

something like

function MultipartUploadStream(f::Function, args...; kwargs...)
    io = MultipartUploadStream(args...; kwargs...)
    try
        f(io)
        wait(io)
        close(io)
    catch e
        abort(io, e) # todo, we don't have this yet
        rethrow()
    end
end

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already an issue for adding more low-level functionality regarding multipart uploads including list parts and and list parts that are in progress, which are related to aborting: #3. Should I add a comment there about abort?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the alternative syntax, f would need to encapsulate a call to MultipartUploadStream.write() though, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#3

ah, sorry, I didn't realize there was an issue already

For the alternative syntax, f would need to encapsulate a call to MultipartUploadStream.write() though, right?

Yes, similar to what one would do with a local file

open("file") do io
    write(io, "my stuff")
end

src/object.jl Outdated
Comment on lines 427 to 431
mutable struct MultipartUploadStream <: IO
store::AbstractStore
url::String
credentials::Union{Nothing, AWS.Credentials, Azure.Credentials}
uploadState
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we specialize on the store type? What is the type of uploadState? Ideally, we'd specialize so that touching the credentials and store is inferrable

Copy link
Collaborator Author

@nantiamak nantiamak Nov 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I replaced AbstractStore for store with Union{AWS.Bucket, Azure.Container}. uploadState is either String or nothing.

Copy link
Member

@Drvi Drvi Nov 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But do we want to specialize MultipartUploadStream{T<:AbstractStore}, similar to PrefetchedDownloadStream? It's possible that Julia will manage to union split on small unions like these but it would be nice to make sure we don't box unnecessarily. Also to enforce that we have compatible Credetial and Store objects (i.e. both Azure or both AWS specific).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind making MultipartUploadStream being parametrized on AbstractStore, but I'm not quite following what's the benefit in comparison to Union{AWS.Bucket, Azure.Container}, as I'm not very familiar with the details of boxing. We do use a union for credentials. Is this because there isn't an abstract struct for those?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe boxing is not the right CS term, but the idea is to make the code as inferrable as possible, so Julia can optimize things easily without having to do dynamic dispatch and so on. In these type-unstable cases, Julia tends to allocate defensively because it doesn't know which types it will encounter. In this specific example, it might be ok since the unions are small, but there is no harm in specializing these since people tend to know in advance if they want to talk to Azure or S3 anyway.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification! I admit I like the flexibility of Union 😄, but I see your point.

@nantiamak
Copy link
Collaborator Author

A couple more results on a larger file and with adding a semaphore for MultipartUploadStream.

Method Filename Schema Size (MB) Time (s) Allocations
MultipartUploadStream csv_various_larger.csv Tuple{Int64,Int64,Float64,Float64,VS,VS} 860.3 140.94 (362.30 k allocations: 834.048 MiB, 0.01% gc time)
Regular Put csv_various_larger.csv Tuple{Int64,Int64,Float64,Float64,VS,VS} 860.3 21.16 (356.52 k allocations: 831.520 MiB, 0.04% gc time)

There is a big difference now between MultipartUploadStream and put, which could be because I'm not configuring the semaphore correctly. I've currently set the default value to 4 * Threads.nthreads() similar to defaultBatchSize().

@Drvi
Copy link
Member

Drvi commented Nov 14, 2023

Hmm, the performance difference seems problematic, we should investigate. Can you share the benchmarking code again?

@nantiamak
Copy link
Collaborator Author

@Drvi Regarding the following:

I think we should document that the chunks need to be written in order, which makes me think that we don't really need the OrderedSynchronizer, when we get the (part_n, parteTag) all we need to do is io.eTags[part_n] = parteTag behind the lock (and making sure the io.eTags is grown as needed).

Why should we change this behaviour for MultipleUploadStream? putObjectImpl() that also does a multipart upload works with an OrderedSynchronizer.

@Drvi
Copy link
Member

Drvi commented Nov 15, 2023

Why should we change this behaviour for MultipleUploadStream? putObjectImpl() that also does a multipart upload works with an OrderedSynchronizer.

I just think it is simple to use 1 synchronization mechanism than 2, Since we already do the locking for the condition, we might as well assign the eTag to the eTags vector without involving the OrderedSynchornizer

@nantiamak
Copy link
Collaborator Author

Ah I get your point now. But what do you mean by "making sure the io.eTags is grown as needed"? I only know of push! to grow a vector without knowing its size beforehand, but if I'm not mistaken you mean something different here.

@nantiamak
Copy link
Collaborator Author

@Drvi Do you maybe mean to use resize!?

src/object.jl Outdated Show resolved Hide resolved
src/object.jl Show resolved Hide resolved
@nantiamak
Copy link
Collaborator Author

@Drvi I think I've addressed all of your feedback.

@nantiamak nantiamak requested a review from Drvi November 16, 2023 11:59
Copy link
Member

@Drvi Drvi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks good, I've left a couple of docs improvement ideas but apart from that, I think this is ready to be used. I'm tempted to mark this API as experimental since we'd probably want a story about reusing buffers and handing them back to the caller via a channel and also a way to pre-spawn uploader tasks.

src/object.jl Outdated Show resolved Hide resolved
src/object.jl Outdated Show resolved Hide resolved
src/object.jl Outdated
Comment on lines 407 to 408
Data chunks are written to a channel and spawned tasks read buffers from this channel.
We expect the chunks to be written in order.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd mention this is currently spawns one task per chunk and lets explicitly mention the write method and also put as an alternative.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please elaborate a bit more on the use of put as an alternative here? Doesn't put require the total input to be known upfront?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to mention it, that if you don't need to upload files by part (or if you simply cannot, because your file is too small), you can always use put

src/object.jl Outdated Show resolved Hide resolved
src/object.jl Outdated Show resolved Hide resolved
@Drvi
Copy link
Member

Drvi commented Nov 16, 2023

Btw, I think the benchnark results are heavily influenced by the performance of copy!. I added some logging to it:

@time "copy " copyto!(buf, 1, csv, i, nb)

And the copy got progressively slower over time:

copy : 0.016634 seconds
copy : 0.028683 seconds
copy : 0.041162 seconds
copy : 0.052870 seconds
copy : 0.065678 seconds
copy : 0.077296 seconds
copy : 0.089498 seconds
copy : 0.101765 seconds
copy : 0.113838 seconds
copy : 0.126034 seconds
copy : 0.139463 seconds
copy : 0.151689 seconds
copy : 0.163353 seconds
copy : 0.175153 seconds
copy : 0.187306 seconds
copy : 0.199294 seconds
copy : 0.212154 seconds
copy : 0.223732 seconds
copy : 0.236243 seconds
copy : 0.248252 seconds
copy : 0.260920 seconds
copy : 0.272577 seconds
copy : 0.284779 seconds
copy : 0.297669 seconds
copy : 0.309319 seconds
copy : 0.322311 seconds
copy : 0.333299 seconds
copy : 0.347037 seconds
copy : 0.357561 seconds
copy : 0.371530 seconds
copy : 0.382138 seconds
copy : 0.395172 seconds
copy : 0.408420 seconds
copy : 0.420083 seconds
copy : 0.430917 seconds
copy : 0.445276 seconds
copy : 0.455702 seconds
copy : 0.468660 seconds
copy : 0.481685 seconds
copy : 0.493671 seconds
copy : 0.504613 seconds
copy : 0.516266 seconds
copy : 0.528877 seconds
copy : 0.550385 seconds
copy : 0.554715 seconds
copy : 0.565004 seconds
copy : 0.577645 seconds
copy : 0.590031 seconds
copy : 0.603796 seconds
copy : 0.615178 seconds
copy : 0.625942 seconds
copy : 0.638203 seconds
copy : 0.650485 seconds
copy : 0.663207 seconds
copy : 0.674765 seconds
copy : 0.688626 seconds
copy : 0.699776 seconds
copy : 0.712376 seconds
copy : 0.724644 seconds
copy : 0.737175 seconds
copy : 0.748462 seconds
copy : 0.760385 seconds
copy : 0.777539 seconds
copy : 0.784269 seconds

@nantiamak
Copy link
Collaborator Author

Wow! Good catch. I didn't expect it to be something in the benchmark code.

@nantiamak
Copy link
Collaborator Author

nantiamak commented Nov 16, 2023

Then, I'll remove the comment about performance getting worse with larger files for now, as it might be misleading, but I've mentioned that the API is experimental.

@nantiamak nantiamak merged commit d6dee8c into main Nov 16, 2023
5 checks passed
@nantiamak nantiamak deleted the nm-add-multipart-upload-stream branch November 16, 2023 14:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants