summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/main/java/com/zdjizhi/pojo/Radius.java51
-rw-r--r--src/main/java/com/zdjizhi/pojo/RadiusMassage.java52
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java24
-rw-r--r--src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkAccount.java206
-rw-r--r--src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkFramedip.java206
5 files changed, 539 insertions, 0 deletions
diff --git a/src/main/java/com/zdjizhi/pojo/Radius.java b/src/main/java/com/zdjizhi/pojo/Radius.java
new file mode 100644
index 0000000..0981ded
--- /dev/null
+++ b/src/main/java/com/zdjizhi/pojo/Radius.java
@@ -0,0 +1,51 @@
+package com.zdjizhi.pojo;
+
+public class Radius {
+
+ private String framed_ip;
+ private String account;
+ private Long last_update_time;
+ private Long first_found_time;
+ private Integer acct_status_type;
+
+
+ public String getFramed_ip() {
+ return framed_ip;
+ }
+
+ public void setFramed_ip(String framed_ip) {
+ this.framed_ip = framed_ip;
+ }
+
+ public String getAccount() {
+ return account;
+ }
+
+ public void setAccount(String account) {
+ this.account = account;
+ }
+
+ public Long getLast_update_time() {
+ return last_update_time;
+ }
+
+ public void setLast_update_time(Long last_update_time) {
+ this.last_update_time = last_update_time;
+ }
+
+ public Long getFirst_found_time() {
+ return first_found_time;
+ }
+
+ public void setFirst_found_time(Long first_found_time) {
+ this.first_found_time = first_found_time;
+ }
+
+ public Integer getAcct_status_type() {
+ return acct_status_type;
+ }
+
+ public void setAcct_status_type(Integer acct_status_type) {
+ this.acct_status_type = acct_status_type;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/pojo/RadiusMassage.java b/src/main/java/com/zdjizhi/pojo/RadiusMassage.java
new file mode 100644
index 0000000..102211d
--- /dev/null
+++ b/src/main/java/com/zdjizhi/pojo/RadiusMassage.java
@@ -0,0 +1,52 @@
+package com.zdjizhi.pojo;
+
+public class RadiusMassage {
+
+ private String radius_framed_ip;
+ private String radius_account;
+ private Long radius_event_timestamp;
+ private int radius_acct_status_type;
+ private int radius_packet_type;
+
+ public int getRadius_acct_status_type() {
+ return radius_acct_status_type;
+ }
+
+ public void setRadius_acct_status_type(int radius_acct_status_type) {
+ this.radius_acct_status_type = radius_acct_status_type;
+ }
+
+ public int getRadius_packet_type() {
+ return radius_packet_type;
+ }
+
+ public void setRadius_packet_type(int radius_packet_type) {
+ this.radius_packet_type = radius_packet_type;
+ }
+
+ public String getRadius_framed_ip() {
+ return radius_framed_ip;
+ }
+
+ public void setRadius_framed_ip(String radius_framed_ip) {
+ this.radius_framed_ip = radius_framed_ip;
+ }
+
+ public String getRadius_account() {
+ return radius_account;
+ }
+
+ public void setRadius_account(String radius_account) {
+ this.radius_account = radius_account;
+ }
+
+ public Long getRadius_event_timestamp() {
+ return radius_event_timestamp;
+ }
+
+ public void setRadius_event_timestamp(Long radius_event_timestamp) {
+ this.radius_event_timestamp = radius_event_timestamp;
+ }
+
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java
new file mode 100644
index 0000000..9f98986
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java
@@ -0,0 +1,24 @@
+package com.zdjizhi.utils.functions;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.java.tuple.Tuple6;
+
+public class FilterNullFunction implements FilterFunction<Tuple6<String,String,String,String,Long,Integer>> {
+ private static final Log logger = LogFactory.get();
+
+ @Override
+ public boolean filter(Tuple6<String,String,String,String,Long,Integer> message) {
+ boolean isFilter = false;
+ try {
+ if (!"".equals(message.f0)) {
+ isFilter = true;
+ }
+ } catch (RuntimeException re) {
+ logger.error("Radius日志条件过滤异常,异常信息为:" + re);
+ }
+ return isFilter;
+ }
+}
+
diff --git a/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkAccount.java b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkAccount.java
new file mode 100644
index 0000000..44fd694
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkAccount.java
@@ -0,0 +1,206 @@
+package com.zdjizhi.utils.hbasepackage;
+
+import com.zdjizhi.common.RadiusRelationshipConfig;
+import com.zdjizhi.pojo.Radius;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class HbaseSinkAccount extends RichSinkFunction<Tuple6<String,String,String,String,Long,Integer>> implements Serializable, SinkFunction<Tuple6<String,String,String,String,Long,Integer>> {
+ private Logger log;
+
+ private String hbase_zookeeper_host;
+ public Map<String, Radius> AccountWithIpMap = new ConcurrentHashMap<>(80000);
+
+ private Connection connection;
+ private Admin admin;
+
+ public HbaseSinkAccount(String hbase_zookeeper_host) {
+ this.hbase_zookeeper_host = hbase_zookeeper_host;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ log = Logger.getLogger(HbaseSinkAccount.class);
+
+ org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();
+ configuration.set("hbase.zookeeper.quorum", hbase_zookeeper_host);
+
+ connection = ConnectionFactory.createConnection(configuration);
+ admin = connection.getAdmin();
+
+ try {
+
+ Table table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_ACCOUNT_TABLE_NAME));
+ Scan scan = new Scan();
+ scan.addColumn("radius".getBytes(), "framed_ip".getBytes());
+ scan.addColumn("radius".getBytes(), "account".getBytes());
+ scan.addColumn("radius".getBytes(), "first_found_time".getBytes());
+ scan.addColumn("radius".getBytes(), "last_update_time".getBytes());
+ scan.addColumn("radius".getBytes(), "acct_status_type".getBytes());
+
+ ResultScanner scanner = table.getScanner(scan);
+ for (Result result : scanner) {
+ if( result.containsColumn("radius".getBytes(), "framed_ip".getBytes()) && result.containsColumn("radius".getBytes(), "account".getBytes()) && result.containsColumn("radius".getBytes(), "last_update_time".getBytes()) && result.containsColumn("radius".getBytes(), "first_found_time".getBytes()) && result.containsColumn("radius".getBytes(), "acct_status_type".getBytes()) ) {
+
+ Radius radius = new Radius();
+ String key = Bytes.toString(result.getRow());
+ String framedip = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip"))));
+ String account = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("account"))));
+
+ Long first_found_time = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("first_found_time"))));
+ Long last_update_time = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("last_update_time"))));
+ int acct_status_type = Bytes.toInt(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type"))));
+
+ radius.setAccount(account);
+ radius.setFramed_ip(framedip);
+ radius.setFirst_found_time(first_found_time);
+ radius.setLast_update_time(last_update_time);
+ radius.setAcct_status_type(acct_status_type);
+ AccountWithIpMap.put(key, radius);
+ }
+ }
+ scanner.close();
+ } catch (IOException ioe) {
+ log.error("HBaseUtils getAll() is IOException===>{" + ioe + "}<===");
+ } catch (RuntimeException e) {
+ log.error("HBaseUtils getAll() is Exception===>{" + e + "}<===");
+ }
+ }
+
+ public void invoke(Tuple6<String,String,String,String,Long,Integer> value, Context context) throws Exception {
+ // 按 project:table 归纳
+
+ String key = value.f1;
+ String framedIp = value.f2;
+ String account = value.f3;
+ Long event_time = value.f4;
+ int acct_status_type = value.f5;
+
+ if (AccountWithIpMap.containsKey(key)) {
+ Radius radius = AccountWithIpMap.get(key);
+
+ if (radius.getLast_update_time() < event_time) {
+ if(acct_status_type==radius.getAcct_status_type()) {
+
+ if (!radius.getFramed_ip().equals(framedIp)) {
+ radius.setLast_update_time(event_time);
+ radius.setFramed_ip(framedIp);
+ radius.setAcct_status_type(acct_status_type);
+
+ Table table = null;
+ try {
+ table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_ACCOUNT_TABLE_NAME));
+ Put put = new Put(key.getBytes());
+ put.addColumn("radius".getBytes(), "framed_ip".getBytes(), framedIp.getBytes());
+ put.addColumn("radius".getBytes(), "account".getBytes(), account.getBytes());
+ put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time));
+ put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time()));
+ put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type));
+ table.put(put);
+ // IpWithAccountMap.put(radius.getFramed_ip(),radius);
+ AccountWithIpMap.put(key, radius);
+ } catch (Exception e) {
+ log.error(e.toString());
+ } finally {
+ table.close();
+ }
+ }
+ }
+ else{
+
+
+ radius.setLast_update_time(event_time);
+ radius.setFramed_ip(framedIp);
+ radius.setAcct_status_type(acct_status_type);
+
+ Table table = null;
+ try {
+ table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_ACCOUNT_TABLE_NAME));
+ Put put = new Put(key.getBytes());
+ put.addColumn("radius".getBytes(), "framed_ip".getBytes(), framedIp.getBytes());
+ put.addColumn("radius".getBytes(), "account".getBytes(), account.getBytes());
+ put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time));
+ put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time()));
+ put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type));
+ table.put(put);
+ // IpWithAccountMap.put(radius.getFramed_ip(),radius);
+ AccountWithIpMap.put(key, radius);
+ } catch (Exception e) {
+ log.error(e.toString());
+ } finally {
+ table.close();
+ }
+
+ }
+ }
+ } else {
+ Radius radius = new Radius();
+ radius.setFramed_ip(framedIp);
+ radius.setLast_update_time(event_time);
+ radius.setAccount(account);
+ radius.setFirst_found_time(event_time);
+ radius.setAcct_status_type(acct_status_type);
+
+ Table table = null;
+ try {
+ table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_ACCOUNT_TABLE_NAME));
+ Put put = new Put(key.getBytes());
+ put.addColumn("radius".getBytes(), "framed_ip".getBytes(), framedIp.getBytes());
+ put.addColumn("radius".getBytes(), "account".getBytes(), account.getBytes());
+ put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time));
+ put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(event_time));
+ put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type));
+ table.put(put);
+ AccountWithIpMap.put(key, radius);
+ } catch (Exception e) {
+ log.error(e.toString());
+ } finally {
+ table.close();
+ }
+ }
+ }
+ @Override
+ public void close() throws Exception {
+ super.close();
+ }
+
+
+ /**
+ * 创建 hbase 表
+ */
+ /* private void createTable(String tableName) throws Exception {
+ createNamespace(tableName.split(":")[0]);
+ TableName table = TableName.valueOf(tableName);
+ if (! admin.tableExists(table)) {
+ HTableDescriptor hTableDescriptor = new HTableDescriptor(table);
+ // 固定只有 data 列簇
+ hTableDescriptor.addFamily(new HColumnDescriptor("data"));
+ admin.createTable(hTableDescriptor);
+ }
+ }*/
+
+ /**
+ * 创建命名空间
+ */
+ /*private void createNamespace(String namespace) throws Exception {
+ try {
+ admin.getNamespaceDescriptor(namespace);
+ } catch (NamespaceNotFoundException e) {
+ admin.createNamespace(NamespaceDescriptor.create(namespace).build());
+ }
+ }*/
+}
diff --git a/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkFramedip.java b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkFramedip.java
new file mode 100644
index 0000000..bcb608a
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkFramedip.java
@@ -0,0 +1,206 @@
+package com.zdjizhi.utils.hbasepackage;
+
+import com.zdjizhi.common.RadiusRelationshipConfig;
+import com.zdjizhi.pojo.Radius;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class HbaseSinkFramedip extends RichSinkFunction<Tuple6<String,String,String,String,Long,Integer>> implements Serializable, SinkFunction<Tuple6<String,String,String,String,Long,Integer>> {
+ private Logger log;
+
+ private String hbase_zookeeper_host;
+ public Map<String, Radius> IpWithAccountMap = new ConcurrentHashMap<>(80000);
+
+ private Connection connection;
+ private Admin admin;
+
+ public HbaseSinkFramedip(String hbase_zookeeper_host) {
+ this.hbase_zookeeper_host = hbase_zookeeper_host;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ log = Logger.getLogger(HbaseSinkFramedip.class);
+
+ org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();
+ configuration.set("hbase.zookeeper.quorum", hbase_zookeeper_host);
+
+ connection = ConnectionFactory.createConnection(configuration);
+ admin = connection.getAdmin();
+
+ try {
+
+ Table table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_FRAMEDIP_TABLE_NAME));
+ Scan scan = new Scan();
+ scan.addColumn("radius".getBytes(), "framed_ip".getBytes());
+ scan.addColumn("radius".getBytes(), "account".getBytes());
+ scan.addColumn("radius".getBytes(), "first_found_time".getBytes());
+ scan.addColumn("radius".getBytes(), "last_update_time".getBytes());
+ scan.addColumn("radius".getBytes(), "acct_status_type".getBytes());
+
+ ResultScanner scanner = table.getScanner(scan);
+ for (Result result : scanner) {
+ if( result.containsColumn("radius".getBytes(), "framed_ip".getBytes()) && result.containsColumn("radius".getBytes(), "account".getBytes()) && result.containsColumn("radius".getBytes(), "last_update_time".getBytes()) && result.containsColumn("radius".getBytes(), "first_found_time".getBytes()) && result.containsColumn("radius".getBytes(), "acct_status_type".getBytes()) ) {
+
+ Radius radius = new Radius();
+ String key = Bytes.toString(result.getRow());
+ String framedip = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip"))));
+ String account = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("account"))));
+ int acct_status_type = Bytes.toInt(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type"))));
+
+ Long first_found_time = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("first_found_time"))));
+ Long last_update_time = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("last_update_time"))));
+ radius.setAccount(account);
+ radius.setFramed_ip(framedip);
+ radius.setFirst_found_time(first_found_time);
+ radius.setLast_update_time(last_update_time);
+ radius.setAcct_status_type(acct_status_type);
+ IpWithAccountMap.put(key, radius);
+ }
+ }
+
+ log.warn("HBaseUtils Get fullAmount List size->AccountWithIpMap.size(): " + IpWithAccountMap.size());
+ scanner.close();
+ } catch (IOException ioe) {
+ log.error("HBaseUtils getAll() is IOException===>{" + ioe + "}<===");
+ } catch (RuntimeException e) {
+ log.error("HBaseUtils getAll() is Exception===>{" + e + "}<===");
+ }
+ }
+
+ public void invoke(Tuple6<String,String,String ,String,Long,Integer> value, Context context) throws Exception {
+ // 按 project:table 归纳
+
+ String key = value.f0;
+ String framedIp = value.f2;
+ String account = value.f3;
+ Long event_time = value.f4;
+ int acct_status_type = value.f5;
+
+ if (IpWithAccountMap.containsKey(key)) {
+ Radius radius = IpWithAccountMap.get(key);
+
+ if (radius.getLast_update_time() < event_time) {
+
+ if(acct_status_type==radius.getAcct_status_type()) {
+
+ if (!radius.getAccount().equals(account)) {
+ radius.setLast_update_time(event_time);
+ radius.setAccount(account);
+ radius.setAcct_status_type(acct_status_type);
+
+ Table table = null;
+ try {
+ table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_FRAMEDIP_TABLE_NAME));
+ Put put = new Put(key.getBytes());
+ put.addColumn("radius".getBytes(), "framed_ip".getBytes(), framedIp.getBytes());
+ put.addColumn("radius".getBytes(), "account".getBytes(), account.getBytes());
+ put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time));
+ put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time()));
+ put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type));
+ table.put(put);
+ IpWithAccountMap.put(key, radius);
+ } catch (Exception e) {
+ log.error(e.toString());
+ } finally {
+ table.close();
+ }
+ }
+ }
+ else{
+ radius.setLast_update_time(event_time);
+ radius.setFramed_ip(framedIp);
+ radius.setAcct_status_type(acct_status_type);
+ Table table = null;
+ try {
+ table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_FRAMEDIP_TABLE_NAME));
+ Put put = new Put(key.getBytes());
+ put.addColumn("radius".getBytes(), "framed_ip".getBytes(), framedIp.getBytes());
+ put.addColumn("radius".getBytes(), "account".getBytes(), account.getBytes());
+ put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time));
+ put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time()));
+ put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type));
+ table.put(put);
+ IpWithAccountMap.put(key, radius);
+ } catch (Exception e) {
+ log.error(e.toString());
+ } finally {
+ table.close();
+ }
+
+ }
+
+
+ }
+ }
+ else {
+ Radius radius = new Radius();
+ radius.setFramed_ip(framedIp);
+ radius.setLast_update_time(event_time);
+ radius.setAccount(account);
+ radius.setFirst_found_time(event_time);
+ radius.setAcct_status_type(acct_status_type);
+ Table table = null;
+ try {
+ table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_FRAMEDIP_TABLE_NAME));
+ Put put = new Put(key.getBytes());
+ put.addColumn("radius".getBytes(), "framed_ip".getBytes(), framedIp.getBytes());
+ put.addColumn("radius".getBytes(), "account".getBytes(), account.getBytes());
+ put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time));
+ put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(event_time));
+ put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type));
+ table.put(put);
+ IpWithAccountMap.put(key, radius);
+ } catch (Exception e) {
+ log.error(e.toString());
+ } finally {
+ table.close();
+ }
+ }
+
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ }
+
+ /**
+ * 创建 hbase 表
+ */
+ /* private void createTable(String tableName) throws Exception {
+ createNamespace(tableName.split(":")[0]);
+ TableName table = TableName.valueOf(tableName);
+ if (! admin.tableExists(table)) {
+ HTableDescriptor hTableDescriptor = new HTableDescriptor(table);
+ // 固定只有 data 列簇
+ hTableDescriptor.addFamily(new HColumnDescriptor("data"));
+ admin.createTable(hTableDescriptor);
+ }
+ }*/
+
+ /**
+ * 创建命名空间
+ */
+ /*private void createNamespace(String namespace) throws Exception {
+ try {
+ admin.getNamespaceDescriptor(namespace);
+ } catch (NamespaceNotFoundException e) {
+ admin.createNamespace(NamespaceDescriptor.create(namespace).build());
+ }
+ }*/
+}