Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: watch and filter #151

Merged
merged 59 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from 55 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
ebc20a7
chore: add a TODO
zach-robinson-dev Jul 8, 2024
f9d7d60
chore: add a TODO
zach-robinson-dev Jul 8, 2024
860e9f4
chore: add a TODO
zach-robinson-dev Jul 8, 2024
c641760
chore: add a TODO
zach-robinson-dev Jul 8, 2024
ea79612
chore: add a TODO
zach-robinson-dev Jul 8, 2024
22af0e1
chore: add a TODO
zach-robinson-dev Jul 9, 2024
7aaeb62
feat: watcher for remote resources
zach-robinson-dev Jul 9, 2024
df2618f
fix: an extra case here
zach-robinson-dev Jul 9, 2024
901f02b
fix: an extra case here
zach-robinson-dev Jul 9, 2024
309d4c5
chore: Add a TODO
zach-robinson-dev Jul 9, 2024
f1ea4b5
chore: Add a TODO
zach-robinson-dev Jul 9, 2024
5706852
fix: selector type
zach-robinson-dev Jul 10, 2024
624a08b
feat: filtering for targets
zach-robinson-dev Jul 10, 2024
88fce4a
feat: don't send reconcile if object was modified last by us
zach-robinson-dev Jul 10, 2024
639f74f
chore: cleanup
zach-robinson-dev Jul 10, 2024
5eafec4
task: use tokio versions of sender and rwlock
zach-robinson-dev Jul 10, 2024
f007600
chore: Add a TODO
zach-robinson-dev Jul 10, 2024
f7a6bdc
feat: exponential backoff in remote watcher
zach-robinson-dev Jul 11, 2024
4eb284b
chore: cleaner
zach-robinson-dev Jul 11, 2024
f543909
feat: use context and select
zach-robinson-dev Jul 11, 2024
88fe423
chore: fix redundancy
zach-robinson-dev Jul 11, 2024
b4b1694
chore: cleanup
zach-robinson-dev Jul 11, 2024
cbb317f
feat: expose cancel and use parent
zach-robinson-dev Jul 11, 2024
e43908f
chore: match the receiver type
zach-robinson-dev Jul 11, 2024
6638279
chore: use unbounded channel
zach-robinson-dev Jul 11, 2024
de22c26
chore: add clone to key type
zach-robinson-dev Jul 11, 2024
86d721c
fix: handle cancel externally
zach-robinson-dev Jul 11, 2024
dd39568
feat: manager for remote watchers
zach-robinson-dev Jul 11, 2024
d48bdb5
chore: cleanup variable name
zach-robinson-dev Jul 11, 2024
00563a4
fix: lock the map for duration of functions
zach-robinson-dev Jul 11, 2024
25a7c07
feat: pass client directly
zach-robinson-dev Jul 11, 2024
f6a6e10
feat: generation predicate on ResourceSync and initialize RemoteWatch…
zach-robinson-dev Jul 11, 2024
e05106b
feat: add watch control for source and target to controller
zach-robinson-dev Jul 11, 2024
55a186d
chore: add some debug logging
zach-robinson-dev Jul 12, 2024
ca8fd4d
fix: requeue faster after adding finalizer
zach-robinson-dev Jul 12, 2024
ee52cce
feat: add another version of a macro
zach-robinson-dev Jul 12, 2024
ab40473
fix: start from beginning if rv expired
zach-robinson-dev Jul 12, 2024
1715bec
fix: start from beginning if rv expired
zach-robinson-dev Jul 12, 2024
1b01183
fix: better logging and don't reconcile on bookmark
zach-robinson-dev Jul 12, 2024
7ac1f1f
fix: logging
zach-robinson-dev Jul 12, 2024
8bbfbf8
fix: filtering logic
zach-robinson-dev Jul 12, 2024
2186ef9
fix: logging
zach-robinson-dev Jul 12, 2024
0f85951
fix: branching logic
zach-robinson-dev Jul 12, 2024
e9079cb
fix: logging
zach-robinson-dev Jul 12, 2024
344cc7e
fix: logging
zach-robinson-dev Jul 12, 2024
60c89a5
fix: logging
zach-robinson-dev Jul 12, 2024
1e3cdf1
fix: don't filter target deletes
zach-robinson-dev Jul 12, 2024
ea65e60
chore: add TODOs
zach-robinson-dev Jul 12, 2024
76e01a5
chore: more debug logging
zach-robinson-dev Jul 12, 2024
5048d0f
fix: message consistency
zach-robinson-dev Jul 12, 2024
5a74902
fix: message consistency
zach-robinson-dev Jul 12, 2024
d3a3381
fix: message consistency
zach-robinson-dev Jul 12, 2024
024f365
chore: update TODOs
zach-robinson-dev Jul 12, 2024
9cc1813
fix: dependency scope
zach-robinson-dev Jul 12, 2024
1564a4b
chore: cleanup
zach-robinson-dev Jul 15, 2024
9d6abd1
fix: Mux lock being blocked on shared parent RefContext
zach-robinson-dev Jul 16, 2024
9a2f6b9
feat: clean terminate watchers on shutdown
zach-robinson-dev Jul 17, 2024
9b2710d
chore: cleaner stop_all
zach-robinson-dev Jul 17, 2024
bcbbbb1
chore: cleanup imports
zach-robinson-dev Jul 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 52 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ description = "Copy k8s resources (or parts thereof) across clusters"
[dependencies]
clap = { version = "4.4", features = ["derive", "help", "env", "std"] }
futures = "0.3"
kube = { version = "0.85.0", features = ["runtime", "derive"] }
kube = { version = "0.85.0", features = ["runtime", "derive", "unstable-runtime"] }
kube-derive = "0.84.0"
k8s-openapi = { version = "0.19.0", features = ["v1_26"] }
kubert = { version = "0.19.0", features = [
Expand All @@ -29,6 +29,12 @@ serde_json = "1"
serde_yaml = "0.9.34"
thiserror = "1"
serde_json_path = "0.6.7"
backoff = "0.4.*"
tokio-context = "0.1.*"
tokio-stream = "0.1.*"

[dev-dependencies]
rstest = "0.17.0"
once_cell = "1.19.*"
chrono = "0.4.*"
rand = "0.8.5"
74 changes: 58 additions & 16 deletions src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use futures::StreamExt;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference;
use kube::api::DeleteParams;
use kube::api::Patch::Merge;
use kube::runtime::{predicates, reflector, WatchStreamExt};
use kube::{
api::{ListParams, Patch, PatchParams},
runtime::{
Expand All @@ -13,24 +14,28 @@ use kube::{
Api, Client, Resource, ResourceExt,
};
use serde_json::json;
use tokio_context::context::RefContext;
#[allow(unused_imports)]
use tracing::{debug, error, info, warn};

use util::{WithItemAdded, WithItemRemoved};

use crate::mapping::{apply_mappings, clone_resource};
use crate::remote_watcher_manager::RemoteWatcherManager;
use crate::resource_extensions::NamespacedApi;
use crate::{requeue_after, resources::ResourceSync, util, Error, Result, FINALIZER};

pub struct Context {
pub client: Client,
pub remote_watcher_manager: RemoteWatcherManager,
}

async fn reconcile_deleted_resource(
resource_sync: Arc<ResourceSync>,
name: &str,
target_api: NamespacedApi,
parent_api: Api<ResourceSync>,
ctx: Arc<Context>,
) -> Result<Action> {
if !resource_sync.has_target_finalizer() {
// We have already removed our finalizer, so nothing more needs to be done
Expand All @@ -41,19 +46,24 @@ async fn reconcile_deleted_resource(

match target_api.get(target_name).await {
Ok(target) if target.metadata.deletion_timestamp.is_some() => {
// Target is being deleted, wait for it to be deleted
// For now we need a requeue after, but in the future we should try to watch the target if we can
requeue_after!()
resource_sync
.start_remote_watches_if_not_watching(ctx)
.await;
Ok(Action::await_change())
}
Ok(_) => {
target_api
.delete(target_name, &DeleteParams::foreground())
.await?;
// Deleted target, wait for it to be deleted
// For now we need a requeue after, but in the future we should try to watch the target if we can
requeue_after!()

resource_sync
.start_remote_watches_if_not_watching(ctx)
.await;
Ok(Action::await_change())
}
Err(kube::Error::Api(err)) if err.code == 404 => {
resource_sync.stop_remote_watches_if_watching(ctx).await;

let patched_finalizers = resource_sync
.finalizers_clone_or_empty()
.with_item_removed(&FINALIZER.to_string());
Expand Down Expand Up @@ -95,15 +105,15 @@ async fn add_target_finalizer(
.patch(name, &PatchParams::default(), &patch)
.await?;

// For now we are watching all events for the ResourceSync, so the patch will trigger a reconcile
Ok(Action::await_change())
requeue_after!(Duration::from_millis(500))
}

async fn reconcile_normally(
resource_sync: Arc<ResourceSync>,
name: &str,
source_api: NamespacedApi,
target_api: NamespacedApi,
ctx: Arc<Context>,
) -> Result<Action> {
let target_namespace = &target_api.namespace;
let target_ar = &target_api.ar;
Expand Down Expand Up @@ -157,11 +167,19 @@ async fn reconcile_normally(
.patch(&target_ref.name, &ssapply, &Patch::Apply(&target))
.await?;

resource_sync
.start_remote_watches_if_not_watching(ctx)
.await;

info!(?name, ?target_ref, "successfully reconciled");

requeue_after!()
Ok(Action::await_change())
}

// TODO: Until CDC finalizer is implemented (and even after that if a CDC is deleted using foreground propagation) CDC deletion could cause the cluster to be deleted or unreachable before we have a chance to clean up the resource(s) owned by the ResourceSync, we should be able to detect and handle this scenario gracefully
// TODO: Immutability on source/target (via CEL?)
// TODO: If secrets for remote clusters on target and source (when applicable) no longer exist then simply allow the ResourceSync to be deleted by removing the finalizer

async fn reconcile(resource_sync: Arc<ResourceSync>, ctx: Arc<Context>) -> Result<Action> {
let name = resource_sync
.metadata
Expand All @@ -176,26 +194,27 @@ async fn reconcile(resource_sync: Arc<ResourceSync>, ctx: Arc<Context>) -> Resul
let target_api = resource_sync
.spec
.target
.api_for(Arc::clone(&ctx), &local_ns)
.api_for(ctx.client.clone(), &local_ns)
.await?;
let source_api = resource_sync
.spec
.source
.api_for(Arc::clone(&ctx), &local_ns)
.api_for(ctx.client.clone(), &local_ns)
.await?;
let parent_api = resource_sync.api(Arc::clone(&ctx));
let parent_api = resource_sync.api(ctx.client.clone());

match resource_sync {
resource_sync if resource_sync.has_been_deleted() => {
reconcile_deleted_resource(resource_sync, &name, target_api, parent_api).await
reconcile_deleted_resource(resource_sync, &name, target_api, parent_api, ctx).await
}
resource_sync if !resource_sync.has_target_finalizer() => {
add_target_finalizer(resource_sync, &name, parent_api).await
}
_ => reconcile_normally(resource_sync, &name, source_api, target_api).await,
_ => reconcile_normally(resource_sync, &name, source_api, target_api, ctx).await,
}
}

// TODO: Exponential Backoff using DefaultBackoff for watcher
fn error_policy(resource_sync: Arc<ResourceSync>, error: &Error, _ctx: Arc<Context>) -> Action {
let name = resource_sync.name_any();
warn!(?name, %error, "reconcile failed");
Expand All @@ -209,13 +228,36 @@ pub async fn run(client: Client) -> Result<()> {
error!("CRD is not queryable; {e:?}. Is the CRD installed?");
std::process::exit(1);
}
Controller::new(docs, watcher::Config::default().any_semantic())

let (reader, writer) = reflector::store();
let resource_syncs = watcher(docs, watcher::Config::default().any_semantic())
.default_backoff()
.reflect(writer)
.applied_objects()
.predicate_filter(predicates::generation);

let (ctx, handle) = RefContext::new();

let (remote_watcher_manager, remote_objects_trigger) =
RemoteWatcherManager::new(ctx, client.clone());

Controller::for_stream(resource_syncs, reader)
.reconcile_on(remote_objects_trigger)
.shutdown_on_signal()
.run(reconcile, error_policy, Arc::new(Context { client }))
.run(
reconcile,
error_policy,
Arc::new(Context {
client,
remote_watcher_manager,
}),
)
.filter_map(|x| async move { Result::ok(x) })
.for_each(|_| futures::future::ready(()))
.await;

handle.cancel();

Ok(())
}

Expand Down
Loading
Loading