summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--dynamic_complement/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeSubscriberApp.java19
1 files changed, 16 insertions, 3 deletions
diff --git a/dynamic_complement/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeSubscriberApp.java b/dynamic_complement/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeSubscriberApp.java
index 98e95dd..9800a86 100644
--- a/dynamic_complement/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeSubscriberApp.java
+++ b/dynamic_complement/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeSubscriberApp.java
@@ -32,10 +32,12 @@ public class FlumeSubscriberApp implements Interceptor {
private static Connection connection;
private String hbaseZookeeperIp;
private String hbaseTableName;
+ private int updateHBaseTime;
- public FlumeSubscriberApp(String hbaseZookeeperIp, String hbaseTableName) {
+ public FlumeSubscriberApp(String hbaseZookeeperIp, String hbaseTableName, int updateHBaseTime) {
this.hbaseZookeeperIp = hbaseZookeeperIp;
this.hbaseTableName = hbaseTableName;
+ this.updateHBaseTime = updateHBaseTime;
}
@@ -70,7 +72,7 @@ public class FlumeSubscriberApp implements Interceptor {
e.printStackTrace();
}
}
- }, 0, 50000);
+ }, 0, updateHBaseTime * 1000);
}
@@ -211,11 +213,12 @@ public class FlumeSubscriberApp implements Interceptor {
public static class FlumeDynamicAppBuilder implements Interceptor.Builder {
private String hbaseZookeeperIp;
private String hbaseTableName;
+ private int updateHBaseTime;
@Override
public Interceptor build() {
- return new FlumeSubscriberApp(hbaseZookeeperIp, hbaseTableName);
+ return new FlumeSubscriberApp(hbaseZookeeperIp, hbaseTableName, updateHBaseTime);
}
@Override
@@ -240,6 +243,16 @@ public class FlumeSubscriberApp implements Interceptor {
logger.error("FlumeSubscriberApp Get hbaseTableName is error : " + e);
}
+ try {
+ this.updateHBaseTime = context.getInteger("updateHBaseTime", 30);
+ Preconditions.checkNotNull("".equals(updateHBaseTime), "updateHBaseTime must be set!!");
+ logger.info("FlumeSubscriberApp Read updateHBaseTime from configuration : " + updateHBaseTime);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("FlumeSubscriberApp updateHBaseTime invalid", e);
+ } catch (Exception e) {
+ logger.error("FlumeSubscriberApp Get updateHBaseTime is error : " + e);
+ }
+
}
}