Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support spark read performance configuration #73

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ Elasticsearch is used when `STORAGE=elasticsearch`.
If your elasticsearch cluster's data nodes only listen on loopback ip, set this to true.
Defaults to false
* `ES_INDEX_PREFIX`: index prefix of Jaeger indices. By default unset.
* `ES_HTTP_TIMEOUT`: Timeout for HTTP/REST connections to Elasticsearch.
* `ES_HTTP_RETRIES`: Number of retries for establishing a (broken) http connection.
The retries are applied for each conversation with an Elasticsearch node.
Once the retries are depleted, the connection will automatically be re-reouted
to the next available Elasticsearch node (based on the declaration of es.nodes,
followed by the discovered nodes - if enabled).
* `ES_SCROLL_KEEPALIVE`: The maximum duration of result scrolls between query requests.
wuyupengwoaini marked this conversation as resolved.
Show resolved Hide resolved
* `ES_SCROLL_SIZE`: Number of results/items returned by each individual per request.

Example usage:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -54,6 +55,7 @@ public static final class Builder {
Boolean clientNodeOnly = Boolean.parseBoolean(Utils.getEnv("ES_CLIENT_NODE_ONLY", "false"));
String indexPrefix = Utils.getEnv("ES_INDEX_PREFIX", null);


final Map<String, String> sparkProperties = new LinkedHashMap<>();

Builder() {
Expand All @@ -69,6 +71,15 @@ public static final class Builder {
getSystemPropertyAsFileResource("javax.net.ssl.trustStore"));
sparkProperties.put("es.net.ssl.truststore.pass",
System.getProperty("javax.net.ssl.trustStorePassword", ""));

sparkProperties.put(ConfigurationOptions.ES_HTTP_TIMEOUT,
Utils.getEnv("ES_HTTP_TIMEOUT", ConfigurationOptions.ES_HTTP_TIMEOUT_DEFAULT));
sparkProperties.put(ConfigurationOptions.ES_HTTP_RETRIES,
Utils.getEnv("ES_HTTP_RETRIES", ConfigurationOptions.ES_HTTP_RETRIES_DEFAULT));
sparkProperties.put(ConfigurationOptions.ES_SCROLL_KEEPALIVE,
Utils.getEnv("ES_SCROLL_KEEPALIVE", ConfigurationOptions.ES_SCROLL_KEEPALIVE_DEFAULT));
sparkProperties.put(ConfigurationOptions.ES_SCROLL_SIZE,
Utils.getEnv("ES_SCROLL_SIZE", ConfigurationOptions.ES_SCROLL_SIZE_DEFAULT));
}

// local[*] master lets us run & test the job locally without setting a Spark cluster
Expand Down