Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Let cluster config support overwritten by ENV #4120

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions trunk/conf/full.conf
Original file line number Diff line number Diff line change
Expand Up @@ -999,26 +999,31 @@ vhost cluster.srs.com {
# The cluster mode, local or remote.
# local: It's an origin server, serve streams itself.
# remote: It's an edge server, fetch or push stream to origin server.
# Overwrite by env SRS_VHOST_CLUSTER_MODE for all vhosts.
# default: local
mode remote;

# For edge(mode remote), user must specifies the origin server
# format as: <server_name|ip>[:port]
# @remark user can specifies multiple origin for error backup, by space,
# for example, 192.168.1.100:1935 192.168.1.101:1935 192.168.1.102:1935
# Overwrite by env SRS_VHOST_CLUSTER_ORIGIN for all vhosts.
# no default value
origin 127.0.0.1:1935 localhost:1935;

# For edge(mode remote), whether open the token traverse mode,
# if token traverse on, all connections of edge will forward to origin to check(auth),
# it's very important for the edge to do the token auth.
# the better way is use http callback to do the token auth by the edge,
# but if user prefer origin check(auth), the token_traverse if better solution.
# Overwrite by env SRS_VHOST_CLUSTER_TOKEN_TRAVERSE for all vhosts.
# default: off
token_traverse off;

# For edge(mode remote), the vhost to transform for edge,
# to fetch from the specified vhost at origin,
# if not specified, use the current vhost of edge in origin, the variable [vhost].
# Overwrite by env SRS_VHOST_CLUSTER_EDGE_TRANSFORM_VHOST for all vhosts.
# default: [vhost]
vhost same.edge.srs.com;

Expand All @@ -1027,30 +1032,35 @@ vhost cluster.srs.com {
# when connect to upnode, it will take the debug info,
# for example, the id, source id, pid.
# please see https://ossrs.net/lts/zh-cn/docs/v4/doc/log
# Overwrite by env SRS_VHOST_CLUSTER_DEBUG_SRS_UPNODE for all vhosts.
# default: on
debug_srs_upnode on;

# For origin(mode local) cluster, turn on the cluster.
# @remark Origin cluster only supports RTMP, use Edge to transmux RTMP to FLV.
# Overwrite by env SRS_VHOST_CLUSTER_ORIGIN_CLUSTER for all vhosts.
# default: off
# TODO: FIXME: Support reload.
origin_cluster off;

# For origin (mode local) cluster, the co-worker's HTTP APIs.
# This origin will connect to co-workers and communicate with them.
# please see https://ossrs.io/lts/en-us/docs/v4/doc/origin-cluster
# Overwrite by env SRS_VHOST_CLUSTER_COWORKERS for all vhosts.
# TODO: FIXME: Support reload.
coworkers 127.0.0.1:9091 127.0.0.1:9092;

# The protocol to connect to origin.
# rtmp, Connect origin by RTMP
# flv, Connect origin by HTTP-FLV
# flvs, Connect origin by HTTPS-FLV
# Overwrite by env SRS_VHOST_CLUSTER_PROTOCOL for all vhosts.
# Default: rtmp
protocol rtmp;

# Whether follow client protocol to connect to origin.
# @remark The FLV might use different signature(in query string) to RTMP.
# Overwrite by env SRS_VHOST_CLUSTER_FOLLOW_CLIENT for all vhosts.
# Default: off
follow_client off;
}
Expand Down
52 changes: 38 additions & 14 deletions trunk/src/app/srs_app_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,19 @@ const char* _srs_version = "XCORE-" RTMP_SIG_SRS_SERVER;
if (dir) return dir; \
}

#define SRS_OVERWRITE_BY_ENV_DIRECTIVE_MULTI_VALUES(key) { \
static SrsConfDirective* dir = NULL; \
if (!dir && !srs_getenv(key).empty()) { \
std::vector<string> vec = srs_string_split(srs_getenv(key), " "); \
dir = new SrsConfDirective(); \
dir->name = key; \
for (size_t i = 0; i < vec.size(); ++i) { \
dir->args.push_back(vec[i]); \
} \
} \
if (dir) return dir; \
}

/**
* dumps the ingest/transcode-engine in @param dir to amf0 object @param engine.
* @param dir the transcode or ingest config directive.
Expand Down Expand Up @@ -4907,6 +4920,8 @@ int SrsConfig::get_gop_cache_max_frames(string vhost)

bool SrsConfig::get_debug_srs_upnode(string vhost)
{
SRS_OVERWRITE_BY_ENV_BOOL2("SRS_VHOST_CLUSTER_DEBUG_SRS_UPNODE"); // SRS_VHOST_CLUSTER_DEBUG_SRS_UPNODE

static bool DEFAULT = true;

SrsConfDirective* conf = get_vhost(vhost);
Expand Down Expand Up @@ -5773,6 +5788,11 @@ bool SrsConfig::get_vhost_is_edge(string vhost)

bool SrsConfig::get_vhost_is_edge(SrsConfDirective* vhost)
{
// override cluster mode by env for all vhost.
if (!srs_getenv("SRS_VHOST_CLUSTER_MODE").empty()) { // SRS_VHOST_CLUSTER_MODE
return "remote" == srs_getenv("SRS_VHOST_CLUSTER_MODE");
}

static bool DEFAULT = false;

SrsConfDirective* conf = vhost;
Expand All @@ -5795,6 +5815,8 @@ bool SrsConfig::get_vhost_is_edge(SrsConfDirective* vhost)

SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost)
{
SRS_OVERWRITE_BY_ENV_DIRECTIVE_MULTI_VALUES("SRS_VHOST_CLUSTER_ORIGIN"); // SRS_VHOST_CLUSTER_ORIGIN

SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return NULL;
Expand All @@ -5810,6 +5832,8 @@ SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost)

string SrsConfig::get_vhost_edge_protocol(string vhost)
{
SRS_OVERWRITE_BY_ENV_STRING("SRS_VHOST_CLUSTER_PROTOCOL"); // SRS_VHOST_CLUSTER_PROTOCOL

static string DEFAULT = "rtmp";

SrsConfDirective* conf = get_vhost(vhost);
Expand All @@ -5832,6 +5856,8 @@ string SrsConfig::get_vhost_edge_protocol(string vhost)

bool SrsConfig::get_vhost_edge_follow_client(string vhost)
{
SRS_OVERWRITE_BY_ENV_BOOL("SRS_VHOST_CLUSTER_FOLLOW_CLIENT"); // SRS_VHOST_CLUSTER_FOLLOW_CLIENT

static bool DEFAULT = false;

SrsConfDirective* conf = get_vhost(vhost);
Expand All @@ -5854,6 +5880,8 @@ bool SrsConfig::get_vhost_edge_follow_client(string vhost)

bool SrsConfig::get_vhost_edge_token_traverse(string vhost)
{
SRS_OVERWRITE_BY_ENV_BOOL("SRS_VHOST_CLUSTER_TOKEN_TRAVERSE"); // SRS_VHOST_CLUSTER_TOKEN_TRAVERSE

static bool DEFAULT = false;

SrsConfDirective* conf = get_vhost(vhost);
Expand All @@ -5876,6 +5904,8 @@ bool SrsConfig::get_vhost_edge_token_traverse(string vhost)

string SrsConfig::get_vhost_edge_transform_vhost(string vhost)
{
SRS_OVERWRITE_BY_ENV_STRING("SRS_VHOST_CLUSTER_EDGE_TRANSFORM_VHOST"); // SRS_VHOST_CLUSTER_EDGE_TRANSFORM_VHOST

static string DEFAULT = "[vhost]";

SrsConfDirective* conf = get_vhost(vhost);
Expand Down Expand Up @@ -5904,6 +5934,8 @@ bool SrsConfig::get_vhost_origin_cluster(string vhost)

bool SrsConfig::get_vhost_origin_cluster(SrsConfDirective* vhost)
{
SRS_OVERWRITE_BY_ENV_BOOL("SRS_VHOST_CLUSTER_ORIGIN_CLUSTER"); // SRS_VHOST_CLUSTER_ORIGIN_CLUSTER

static bool DEFAULT = false;

SrsConfDirective* conf = vhost;
Expand All @@ -5924,29 +5956,21 @@ bool SrsConfig::get_vhost_origin_cluster(SrsConfDirective* vhost)
return SRS_CONF_PREFER_FALSE(conf->arg0());
}

vector<string> SrsConfig::get_vhost_coworkers(string vhost)
SrsConfDirective* SrsConfig::get_vhost_coworkers(string vhost)
{
vector<string> coworkers;

SRS_OVERWRITE_BY_ENV_DIRECTIVE_MULTI_VALUES("SRS_VHOST_CLUSTER_COWORKERS"); // SRS_VHOST_CLUSTER_COWORKERS
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return coworkers;
return NULL;
}

conf = conf->get("cluster");
if (!conf) {
return coworkers;
}

conf = conf->get("coworkers");
if (!conf) {
return coworkers;
}
for (int i = 0; i < (int)conf->args.size(); i++) {
coworkers.push_back(conf->args.at(i));
return NULL;
}

return coworkers;
return conf->get("coworkers");
}

bool SrsConfig::get_security_enabled(string vhost)
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,7 @@ class SrsConfig
virtual bool get_vhost_origin_cluster(SrsConfDirective* conf);
// Get the co-workers of origin cluster.
// @see https://ossrs.net/lts/zh-cn/docs/v4/doc/origin-cluster
virtual std::vector<std::string> get_vhost_coworkers(std::string vhost);
virtual SrsConfDirective* get_vhost_coworkers(std::string vhost);
// vhost security section
public:
// Whether the secrity of vhost enabled.
Expand Down
4 changes: 3 additions & 1 deletion trunk/src/app/srs_app_rtmp_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,9 @@ srs_error_t SrsRtmpConn::playing(SrsSharedPtr<SrsLiveSource> source)
// When origin cluster enabled, try to redirect to the origin which is active.
// A active origin is a server which is delivering stream.
if (!info->edge && _srs_config->get_vhost_origin_cluster(req->vhost) && source->inactive()) {
vector<string> coworkers = _srs_config->get_vhost_coworkers(req->vhost);
SrsConfDirective* conf = _srs_config->get_vhost_coworkers(req->vhost);

vector<string> coworkers = conf ? conf->args : vector<string>();
for (int i = 0; i < (int)coworkers.size(); i++) {
// TODO: FIXME: User may config the server itself as coworker, we must identify and ignore it.
string host; int port = 0; string coworker = coworkers.at(i);
Expand Down
113 changes: 111 additions & 2 deletions trunk/src/utest/srs_utest_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3265,7 +3265,7 @@ VOID TEST(ConfigMainTest, CheckVhostConfig3)
EXPECT_FALSE(conf.get_vhost_edge_token_traverse("ossrs.net"));
EXPECT_STREQ("[vhost]", conf.get_vhost_edge_transform_vhost("ossrs.net").c_str());
EXPECT_FALSE(conf.get_vhost_origin_cluster("ossrs.net"));
EXPECT_EQ(0, (int)conf.get_vhost_coworkers("ossrs.net").size());
EXPECT_TRUE(conf.get_vhost_coworkers("ossrs.net") == NULL);
EXPECT_FALSE(conf.get_security_enabled("ossrs.net"));
EXPECT_TRUE(conf.get_security_rules("ossrs.net") == NULL);
}
Expand All @@ -3285,7 +3285,7 @@ VOID TEST(ConfigMainTest, CheckVhostConfig3)
if (true) {
MockSrsConfig conf;
HELPER_ASSERT_SUCCESS(conf.parse(_MIN_OK_CONF "vhost ossrs.net{cluster{coworkers xxx;}}"));
EXPECT_EQ(1, (int)conf.get_vhost_coworkers("ossrs.net").size());
EXPECT_EQ(1, (int)conf.get_vhost_coworkers("ossrs.net")->args.size());
}

if (true) {
Expand Down Expand Up @@ -5102,3 +5102,112 @@ VOID TEST(ConfigEnvTest, CheckEnvValuesHooks)
}
}

VOID TEST(ConfigEnvTest, CheckEnvValuesCluster)
{
MockSrsConfig conf;

if (true) {
SrsSetEnvConfig(hooks, "SRS_VHOST_CLUSTER_MODE", "remote");
EXPECT_TRUE(conf.get_vhost_is_edge("__defaultVhost__"));
}

if (true) {
SrsSetEnvConfig(hooks, "SRS_VHOST_CLUSTER_MODE", "local");
EXPECT_FALSE(conf.get_vhost_is_edge("__defaultVhost__"));
}

if (true) {
SrsSetEnvConfig(hooks, "SRS_VHOST_CLUSTER_MODE", "xxx");
EXPECT_FALSE(conf.get_vhost_is_edge("__defaultVhost__"));
}

if (true) {
SrsSetEnvConfig(hooks, "SRS_VHOST_CLUSTER_ORIGIN", "127.0.0.1:1935 localhost:1935");
SrsConfDirective* origins = conf.get_vhost_edge_origin("__defaultVhost__");
EXPECT_TRUE(origins != NULL);
EXPECT_EQ(2, origins->args.size());
EXPECT_EQ("127.0.0.1:1935", origins->args[0]);
EXPECT_EQ("localhost:1935", origins->args[1]);
}

if (true) {
SrsSetEnvConfig(hooks, "SRS_VHOST_CLUSTER_TOKEN_TRAVERSE", "on");
EXPECT_TRUE(conf.get_vhost_edge_token_traverse("__defaultVhost__"));
}

if (true) {
SrsSetEnvConfig(hooks, "SRS_VHOST_CLUSTER_TOKEN_TRAVERSE", "off");
EXPECT_FALSE(conf.get_vhost_edge_token_traverse("__defaultVhost__"));
}

if (true) {
SrsSetEnvConfig(hooks, "SRS_VHOST_CLUSTER_TOKEN_TRAVERSE", "xxx");
EXPECT_FALSE(conf.get_vhost_edge_token_traverse("__defaultVhost__"));
}

if (true) {
SrsSetEnvConfig(hooks, "SRS_VHOST_CLUSTER_EDGE_TRANSFORM_VHOST", "xxx");
EXPECT_EQ("xxx", conf.get_vhost_edge_transform_vhost("__defaultVhost__"));
EXPECT_EQ("xxx", conf.get_vhost_edge_transform_vhost("anyHost"));
}

if (true) {
SrsSetEnvConfig(hooks, "SRS_VHOST_CLUSTER_DEBUG_SRS_UPNODE", "off");
EXPECT_FALSE(conf.get_debug_srs_upnode("anyHost"));
}

if (true) {
SrsSetEnvConfig(hooks, "SRS_VHOST_CLUSTER_DEBUG_SRS_UPNODE", "on");
EXPECT_TRUE(conf.get_debug_srs_upnode("anyHost"));
}

if (true) {
SrsSetEnvConfig(hooks, "SRS_VHOST_CLUSTER_DEBUG_SRS_UPNODE", "xx");
EXPECT_TRUE(conf.get_debug_srs_upnode("anyHost"));
}

if (true) {
SrsSetEnvConfig(hooks, "SRS_VHOST_CLUSTER_ORIGIN_CLUSTER", "on");
EXPECT_TRUE(conf.get_vhost_origin_cluster("anyHost"));
}

if (true) {
SrsSetEnvConfig(hooks, "SRS_VHOST_CLUSTER_ORIGIN_CLUSTER", "off");
EXPECT_FALSE(conf.get_vhost_origin_cluster("anyHost"));
}

if (true) {
SrsSetEnvConfig(hooks, "SRS_VHOST_CLUSTER_ORIGIN_CLUSTER", "xxx");
EXPECT_FALSE(conf.get_vhost_origin_cluster("anyHost"));
}

if (true) {
SrsSetEnvConfig(hooks, "SRS_VHOST_CLUSTER_COWORKERS", "127.0.0.1:9091 127.0.0.1:9092");
SrsConfDirective* coworkers = conf.get_vhost_coworkers("anyHost");
EXPECT_TRUE(coworkers != NULL);
EXPECT_EQ(2, coworkers->args.size());
EXPECT_EQ("127.0.0.1:9091", coworkers->args[0]);
EXPECT_EQ("127.0.0.1:9092", coworkers->args[1]);
}

if (true) {
SrsSetEnvConfig(hooks, "SRS_VHOST_CLUSTER_PROTOCOL", "srt");
EXPECT_EQ("srt", conf.get_vhost_edge_protocol("anyHost"));
}

if (true) {
SrsSetEnvConfig(hooks, "SRS_VHOST_CLUSTER_FOLLOW_CLIENT", "on");
EXPECT_TRUE(conf.get_vhost_edge_follow_client("anyHost"));
}

if (true) {
SrsSetEnvConfig(hooks, "SRS_VHOST_CLUSTER_FOLLOW_CLIENT", "off");
EXPECT_FALSE(conf.get_vhost_edge_follow_client("anyHost"));
}

if (true) {
SrsSetEnvConfig(hooks, "SRS_VHOST_CLUSTER_FOLLOW_CLIENT", "xxx");
EXPECT_FALSE(conf.get_vhost_edge_follow_client("anyHost"));
}
}

Loading