Skip to content

Commit

Permalink
TCP support in dataplane (#86)
Browse files Browse the repository at this point in the history
This adds TCP support to the dataplane and includes integration tests to
verify `TCPRoute` functionality.

Checksums and conntrack cleanup will be handled by follow-up issues:

- #25
- #85
  • Loading branch information
shaneutt committed May 3, 2023
1 parent 8ec82fd commit cfde694
Show file tree
Hide file tree
Showing 11 changed files with 188 additions and 15 deletions.
3 changes: 2 additions & 1 deletion config/samples/tcproute/gateway.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
---
kind: GatewayClass
apiVersion: gateway.networking.k8s.io/v1beta1
metadata:
Expand All @@ -14,4 +15,4 @@ spec:
listeners:
- name: tcp
protocol: TCP
port: 9080
port: 8080
8 changes: 4 additions & 4 deletions config/samples/tcproute/server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ spec:
imagePullPolicy: IfNotPresent
env:
- name: LISTEN_PORT
value: "9080"
value: "8080"
ports:
- containerPort: 9080
- containerPort: 8080
protocol: TCP
---
apiVersion: v1
Expand All @@ -34,9 +34,9 @@ metadata:
spec:
ports:
- name: tcp
port: 9080
port: 8080
protocol: TCP
targetPort: 9080
targetPort: 8080
selector:
app: blixt-tcproute-sample
type: ClusterIP
4 changes: 2 additions & 2 deletions config/samples/tcproute/tcproute.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ metadata:
spec:
parentRefs:
- name: blixt-tcproute-sample
port: 9080
port: 8080
rules:
- backendRefs:
- name: blixt-tcproute-sample
port: 9080
port: 8080
2 changes: 2 additions & 0 deletions config/tests/tcproute/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
bases:
- ../../samples/tcproute
4 changes: 2 additions & 2 deletions dataplane/ebpf/src/egress/icmp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub fn handle_icmp_egress(ctx: TcContext) -> Result<i32, i64> {

// redirect icmp unreachable message back to client
unsafe {
(*ip_hdr).saddr = *new_src;
(*ip_hdr).saddr = new_src.0;
(*ip_hdr).check = 0;
}

Expand All @@ -56,7 +56,7 @@ pub fn handle_icmp_egress(ctx: TcContext) -> Result<i32, i64> {
let icmp_inner_ip_hdr: *mut iphdr = unsafe { ptr_at(&ctx, icmp_header_offset + ICMP_HDR_LEN) }?;

unsafe {
(*icmp_inner_ip_hdr).daddr = *new_src;
(*icmp_inner_ip_hdr).daddr = new_src.0;
(*icmp_inner_ip_hdr).check = 0;
}

Expand Down
1 change: 1 addition & 0 deletions dataplane/ebpf/src/egress/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod icmp;
pub mod tcp;
65 changes: 65 additions & 0 deletions dataplane/ebpf/src/egress/tcp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use core::mem;

use aya_bpf::{
bindings::{TC_ACT_OK, TC_ACT_PIPE},
helpers::bpf_csum_diff,
programs::TcContext,
};
use aya_log_ebpf::info;

use crate::{
bindings::{iphdr, tcphdr},
utils::{csum_fold_helper, ptr_at, ETH_HDR_LEN, IP_HDR_LEN},
BLIXT_CONNTRACK,
};

pub fn handle_tcp_egress(ctx: TcContext) -> Result<i32, i64> {
// gather the TCP header
let ip_hdr: *mut iphdr = unsafe { ptr_at(&ctx, ETH_HDR_LEN) }?;
let tcp_header_offset = ETH_HDR_LEN + IP_HDR_LEN;
let tcp_hdr: *mut tcphdr = unsafe { ptr_at(&ctx, tcp_header_offset)? };

// capture some IP and port information
let client_addr = unsafe { (*ip_hdr).daddr };
let dest_port = unsafe { (*tcp_hdr).dest.to_be() };
let ip_port_tuple = unsafe { BLIXT_CONNTRACK.get(&client_addr) }.ok_or(TC_ACT_PIPE)?;

// verify traffic destination
if ip_port_tuple.1 as u16 != dest_port {
return Ok(TC_ACT_PIPE);
}

info!(
&ctx,
"Received TCP packet destined for tracked IP {:ipv4}:{} setting source IP to VIP {:ipv4}",
u32::from_be(client_addr),
ip_port_tuple.1 as u16,
u32::from_be(ip_port_tuple.0),
);

unsafe {
(*ip_hdr).saddr = ip_port_tuple.0;
};

if (ctx.data() + ETH_HDR_LEN + mem::size_of::<iphdr>()) > ctx.data_end() {
info!(&ctx, "Iphdr is out of bounds");
return Ok(TC_ACT_OK);
}

unsafe { (*ip_hdr).check = 0 };
let full_cksum = unsafe {
bpf_csum_diff(
mem::MaybeUninit::zeroed().assume_init(),
0,
ip_hdr as *mut u32,
mem::size_of::<iphdr>() as u32,
0,
)
} as u64;
unsafe { (*ip_hdr).check = csum_fold_helper(full_cksum) };
unsafe { (*tcp_hdr).check = 0 };

// TODO: connection tracking cleanup https://github.com/kong/blixt/issues/85

Ok(TC_ACT_PIPE)
}
16 changes: 13 additions & 3 deletions dataplane/ebpf/src/ingress/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use aya_log_ebpf::info;
use crate::{
bindings::{iphdr, tcphdr},
utils::{csum_fold_helper, ptr_at, ETH_HDR_LEN, IP_HDR_LEN},
BACKENDS,
BACKENDS, BLIXT_CONNTRACK,
};
use common::BackendKey;

Expand All @@ -21,8 +21,10 @@ pub fn handle_tcp_ingress(ctx: TcContext) -> Result<i32, i64> {

let tcp_hdr: *mut tcphdr = unsafe { ptr_at(&ctx, tcp_header_offset)? };

let original_daddr = unsafe { (*ip_hdr).daddr };

let key = BackendKey {
ip: u32::from_be(unsafe { (*ip_hdr).daddr }),
ip: u32::from_be(original_daddr),
port: (u16::from_be(unsafe { (*tcp_hdr).dest })) as u32,
};

Expand All @@ -31,7 +33,7 @@ pub fn handle_tcp_ingress(ctx: TcContext) -> Result<i32, i64> {
info!(
&ctx,
"Received a TCP packet destined for svc ip: {:ipv4} at Port: {} ",
u32::from_be(unsafe { (*ip_hdr).daddr }),
u32::from_be(original_daddr),
u16::from_be(unsafe { (*tcp_hdr).dest })
);

Expand Down Expand Up @@ -72,6 +74,14 @@ pub fn handle_tcp_ingress(ctx: TcContext) -> Result<i32, i64> {
)
};

unsafe {
BLIXT_CONNTRACK.insert(
&(*ip_hdr).saddr,
&(original_daddr, (*tcp_hdr).source.to_be() as u32),
0 as u64,
)?;
};

info!(&ctx, "redirect action: {}", action);

Ok(action as i32)
Expand Down
6 changes: 5 additions & 1 deletion dataplane/ebpf/src/ingress/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ pub fn handle_udp_ingress(ctx: TcContext) -> Result<i32, i64> {
);

unsafe {
BLIXT_CONNTRACK.insert(&(*ip_hdr).saddr, &original_daddr, 0 as u64)?;
BLIXT_CONNTRACK.insert(
&(*ip_hdr).saddr,
&(original_daddr, (*udp_hdr).dest as u32),
0 as u64,
)?;
(*ip_hdr).daddr = backend.daddr.to_be();
};

Expand Down
6 changes: 4 additions & 2 deletions dataplane/ebpf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use aya_bpf::{

use bindings::{ethhdr, iphdr};
use common::{Backend, BackendKey};
use egress::icmp::handle_icmp_egress;
use egress::{icmp::handle_icmp_egress, tcp::handle_tcp_egress};
use ingress::{tcp::handle_tcp_ingress, udp::handle_udp_ingress};
use utils::{ETH_HDR_LEN, ETH_P_IP, IPPROTO_ICMP, IPPROTO_TCP, IPPROTO_UDP};

Expand All @@ -34,7 +34,8 @@ static mut BACKENDS: HashMap<BackendKey, Backend> =
HashMap::<BackendKey, Backend>::with_max_entries(128, 0);

#[map(name = "BLIXT_CONNTRACK")]
static mut BLIXT_CONNTRACK: HashMap<u32, u32> = HashMap::<u32, u32>::with_max_entries(128, 0);
static mut BLIXT_CONNTRACK: HashMap<u32, (u32, u32)> =
HashMap::<u32, (u32, u32)>::with_max_entries(128, 0);

// -----------------------------------------------------------------------------
// Ingress
Expand Down Expand Up @@ -104,6 +105,7 @@ fn try_tc_egress(ctx: TcContext) -> Result<i32, i64> {

match protocol {
IPPROTO_ICMP => handle_icmp_egress(ctx),
IPPROTO_TCP => handle_tcp_egress(ctx),
_ => Ok(TC_ACT_PIPE),
}
}
Expand Down
88 changes: 88 additions & 0 deletions test/integration/tcproute_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
//go:build integration_tests
// +build integration_tests

package integration

import (
"context"
"fmt"
"net/http"
"strings"
"testing"
"time"

"github.com/kong/kubernetes-testing-framework/pkg/clusters"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"

testutils "github.com/kong/blixt/internal/test/utils"
)

const (
tcprouteSampleKustomize = "../../config/tests/tcproute"
tcprouteSampleName = "blixt-tcproute-sample"
)

func TestTCPRouteBasics(t *testing.T) {
tcpRouteBasicsCleanupKey := "tcproutebasics"
defer func() {
testutils.DumpDiagnosticsIfFailed(ctx, t, env.Cluster())
runCleanup(tcpRouteBasicsCleanupKey)
}()

t.Log("deploying config/samples/tcproute kustomize")
require.NoError(t, clusters.KustomizeDeployForCluster(ctx, env.Cluster(), tcprouteSampleKustomize))
addCleanup(tcpRouteBasicsCleanupKey, func(ctx context.Context) error {
cleanupLog("cleaning up config/samples/tcproute kustomize")
return clusters.KustomizeDeleteForCluster(ctx, env.Cluster(), tcprouteSampleKustomize)
})

t.Log("waiting for Gateway to have an address")
var gw *gatewayv1beta1.Gateway
require.Eventually(t, func() bool {
var err error
gw, err = gwclient.GatewayV1beta1().Gateways(corev1.NamespaceDefault).Get(ctx, tcprouteSampleName, metav1.GetOptions{})
require.NoError(t, err)
return len(gw.Status.Addresses) > 0
}, time.Minute, time.Second)
require.NotNil(t, gw.Status.Addresses[0].Type)
require.Equal(t, gatewayv1beta1.IPAddressType, *gw.Status.Addresses[0].Type)
gwaddr := fmt.Sprintf("%s:8080", gw.Status.Addresses[0].Value)

t.Log("waiting for HTTP server to be available")
require.Eventually(t, func() bool {
server, err := env.Cluster().Client().AppsV1().Deployments(corev1.NamespaceDefault).Get(ctx, tcprouteSampleName, metav1.GetOptions{})
require.NoError(t, err)
return server.Status.AvailableReplicas > 0
}, time.Minute, time.Second)

t.Log("verifying HTTP connectivity to the server")
httpc := http.Client{Timeout: time.Second * 10}
require.Eventually(t, func() bool {
resp, err := httpc.Get(fmt.Sprintf("http://%s/status/%d", gwaddr, http.StatusTeapot))
if err != nil {
t.Logf("received error checking HTTP server: [%s], retrying...", err)
return false
}
defer resp.Body.Close()
return resp.StatusCode == http.StatusTeapot
}, time.Minute, time.Second)

t.Log("deleting the TCPRoute and verifying that HTTP traffic stops")
require.NoError(t, gwclient.GatewayV1alpha2().TCPRoutes(corev1.NamespaceDefault).Delete(ctx, tcprouteSampleName, metav1.DeleteOptions{}))
httpc = http.Client{Timeout: time.Second * 3}
require.Eventually(t, func() bool {
resp, err := httpc.Get(fmt.Sprintf("http://%s/status/%d", gwaddr, http.StatusTeapot))
if err != nil {
if strings.Contains(err.Error(), "context deadline exceeded") {
return true
}
t.Logf("received unexpected error waiting for TCPRoute to decomission: %s", err)
return false
}
defer resp.Body.Close()
return false
}, time.Minute, time.Second)
}

0 comments on commit cfde694

Please sign in to comment.