Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

review #1 #6

Open
darkdarkdragon opened this issue Aug 13, 2021 · 0 comments
Open

review #1 #6

darkdarkdragon opened this issue Aug 13, 2021 · 0 comments

Comments

@darkdarkdragon
Copy link
Contributor

Here

s.records = sync.Map{}

is data race.
Here
msgChan <- StreamMessage{consumerCtx, message}

it can write to msgChan after it was closed here

Here

return record.LastStatus, nil

and here
status := record.LastStatus

LastStatus being copied not under the lock.

victorges added a commit that referenced this issue Aug 18, 2021
@darkdarkdragon pointed out on #6
that a copy is not atomic so could have issues when copying a value
without holding a lock. I don't want to increaase the usage of the lock
either, so decided to change the status to a pointer instead so we will
have a consistent view of the status, not one in between a copy.

Also, this is probably better for perf anyway since we were copying that
big object multiple times on every event processing. Better like this I
guess.
victorges added a commit that referenced this issue Aug 26, 2021
* pkg/data: Store events time as millis, not nano

int64 is not JSON serializable, so we better avoid it

* Create new WebhookEvent type

* pkg/data: Create MultistreamWebhookPayload type

* pkd/data: Make webhook event payload optional

* health: Crete new multistream reducer for ms status

* health: Add multistream reducer to pipeline&health

* health: Use pointers of status to avoid racy copies

@darkdarkdragon pointed out on #6
that a copy is not atomic so could have issues when copying a value
without holding a lock. I don't want to increaase the usage of the lock
either, so decided to change the status to a pointer instead so we will
have a consistent view of the status, not one in between a copy.

Also, this is probably better for perf anyway since we were copying that
big object multiple times on every event processing. Better like this I
guess.

* health: Create root MultistreamHealthy condition

* WIP: Add TODO to fix Status recreation issue

* api: Accept timestamps only in unix milliseconds

(or RFC3339)

* health: Fix all comments from self-review

* pkg/event: Update bindings one existing streams

Now we'll make our first deploy already with
a change to the queue bindings. It already feels
overkill to have to change the stream name to do
that, so I'm  changing the logic on stream_consumer
to always set the bindings on the existing stream,
even if it already existed before. This works more
similarly to how other AMQP clients generally do
this.

* health: Add some helpers for Status immutability

Will reduce the boilerplate a bit. We should keep
iterating on this on the following code changes,
but I believe this is enough for this change.

* reducers: Namespace RealTime and NoErrors conditions

We might want to have other "no errors" and "real time"
flags in the future, so we better not start with too
generic names here.

* docker-compose: Create webhooks exchange as well

* health: Rename MultistreamHealthy to Multistreaming

Consistency with Transcoding condition.

* health: Fix webhook exchange name

* pkg/data: Change ManifestID field to StreamID

* api: Fix health status JSON response
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant