summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--dependency-reduced-pom.xml2
-rw-r--r--pom.xml80
-rw-r--r--src/main/java/cn/ac/iie/flume/bean/NewConn.java2
-rw-r--r--src/main/java/cn/ac/iie/flume/interceptor/Csv2Json.java105
-rw-r--r--src/main/java/cn/ac/iie/flume/interceptor/File2Json.java98
-rw-r--r--src/main/java/cn/ac/iie/flume/source/MySource.java42
-rw-r--r--src/test/java/EventTest.java21
7 files changed, 287 insertions, 63 deletions
diff --git a/dependency-reduced-pom.xml b/dependency-reduced-pom.xml
index a7d4f5b..dc74c1b 100644
--- a/dependency-reduced-pom.xml
+++ b/dependency-reduced-pom.xml
@@ -32,7 +32,7 @@
<configuration>
<transformers>
<transformer>
- <mainClass>cn.ac.iie.flume.interceptor.Json2Csv</mainClass>
+ <mainClass>cn.ac.iie.flume.interceptor.File2Json</mainClass>
</transformer>
<transformer>
<resource>META-INF/spring.handlers</resource>
diff --git a/pom.xml b/pom.xml
index 7f41270..46ff823 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,6 +7,72 @@
<groupId>cn.ac.iie.flume</groupId>
<artifactId>flume-interceptor</artifactId>
<version>1.0-SNAPSHOT</version>
+ <packaging>jar</packaging>
+
+
+ <build>
+ <plugins>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.4.2</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>cn.ac.iie.flume.interceptor.File2Json</mainClass>
+ </transformer>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+ <resource>META-INF/spring.handlers</resource>
+ </transformer>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+ <resource>META-INF/spring.schemas</resource>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.3.2</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ <resources>
+ <resource>
+ <directory>properties</directory>
+ <includes>
+ <include>**/*.properties</include>
+ </includes>
+ <filtering>false</filtering>
+ </resource>
+ <resource>
+ <directory>properties</directory>
+ <includes>
+ <include>log4j.properties</include>
+ </includes>
+ <filtering>false</filtering>
+ </resource>
+ </resources>
+ </build>
+
+
<dependencies>
<dependency>
@@ -40,18 +106,4 @@
</dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>1.8</source>
- <target>1.8</target>
- <encoding>UTF-8</encoding>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
</project> \ No newline at end of file
diff --git a/src/main/java/cn/ac/iie/flume/bean/NewConn.java b/src/main/java/cn/ac/iie/flume/bean/NewConn.java
index 5408f51..cc80932 100644
--- a/src/main/java/cn/ac/iie/flume/bean/NewConn.java
+++ b/src/main/java/cn/ac/iie/flume/bean/NewConn.java
@@ -133,8 +133,6 @@ public class NewConn {
private String voip_called_number;
private String streaming_media_url;
private String streaming_media_protocol;
-
-
public long getCommon_log_id() {
return common_log_id;
}
diff --git a/src/main/java/cn/ac/iie/flume/interceptor/Csv2Json.java b/src/main/java/cn/ac/iie/flume/interceptor/Csv2Json.java
index 48f66ee..eb11cac 100644
--- a/src/main/java/cn/ac/iie/flume/interceptor/Csv2Json.java
+++ b/src/main/java/cn/ac/iie/flume/interceptor/Csv2Json.java
@@ -11,6 +11,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
public class Csv2Json implements Interceptor {
@@ -34,52 +35,64 @@ public class Csv2Json implements Interceptor {
//TODO 处理单个event的代码
//取出一条csv格式的数据
- body = event.getBody();
- json = new String(body);
-
- String[] strings = json.replaceAll("\t"," " ).replaceAll("\"","" ).split(",");
-
- newConn = new NewConn();
-
- newConn.setCommon_log_id(Long.parseLong("0"));
- newConn.setCommon_policy_id(Integer.parseInt(strings[1]));
- newConn.setCommon_start_time(Integer.parseInt(strings[2]));
- newConn.setCommon_recv_time(Integer.parseInt(strings[3]));
- newConn.setCommon_l4_protocol(strings[4]);
- newConn.setCommon_address_type(Integer.parseInt(strings[5]));
- newConn.setCommon_client_ip(strings[6]);
- newConn.setCommon_server_ip(strings[7]);
- newConn.setCommon_client_port(Integer.parseInt(strings[8]));
- newConn.setCommon_server_port(Integer.parseInt(strings[9]));
- newConn.setCommon_service(Integer.parseInt(strings[10]));
- newConn.setCommon_entrance_id(Integer.parseInt(strings[11]));
- newConn.setCommon_device_id(strings[12]);
- newConn.setCommon_link_id(Integer.parseInt(strings[13]));
- newConn.setCommon_encapsulation(Integer.parseInt(strings[14]));
- newConn.setCommon_direction(Integer.parseInt(strings[15]));
- newConn.setCommon_stream_dir(Integer.parseInt(strings[18]));
- newConn.setCommon_sled_ip(strings[19]);
- newConn.setCommon_address_list(strings[20]);
- newConn.setCommon_server_location(strings[21]);
- newConn.setCommon_client_location(strings[22]);
- newConn.setCommon_server_asn(strings[23]);
- newConn.setCommon_client_asn(strings[24]);
- newConn.setCommon_subscriber_id(strings[26]);
- newConn.setCommon_user_region(strings[27]);
- newConn.setCommon_c2s_pkt_num(Integer.parseInt(strings[30]));
- newConn.setCommon_s2c_pkt_num(Integer.parseInt(strings[31]));
- newConn.setCommon_c2s_byte_num(Integer.parseInt(strings[32]));
- newConn.setCommon_s2c_byte_num(Integer.parseInt(strings[33]));
- newConn.setCommon_protocol_label(strings[34]);
- newConn.setCommon_app_label(strings[35]);
- newConn.setCommon_action(Integer.parseInt(strings[39]));
-
-
- json = JSONObject.toJSONString(newConn);
-
- body = json.getBytes();
- event.setBody(body);
- return event;
+
+ try{
+ body = event.getBody();
+
+ Map<String, String> headers = event.getHeaders();
+
+ json = new String(body);
+
+ String[] strings = json.replaceAll("\t"," " ).replaceAll("\"","" ).split(",");
+
+ newConn = new NewConn();
+
+ newConn.setCommon_log_id(Long.parseLong("0"));
+ newConn.setCommon_policy_id(Integer.parseInt(strings[1]));
+ newConn.setCommon_start_time(Integer.parseInt(strings[2]));
+ newConn.setCommon_recv_time(Integer.parseInt(strings[3]));
+ newConn.setCommon_l4_protocol(strings[4]);
+ newConn.setCommon_address_type(Integer.parseInt(strings[5]));
+ newConn.setCommon_client_ip(strings[6]);
+ newConn.setCommon_server_ip(strings[7]);
+ newConn.setCommon_client_port(Integer.parseInt(strings[8]));
+ newConn.setCommon_server_port(Integer.parseInt(strings[9]));
+ newConn.setCommon_service(Integer.parseInt(strings[10]));
+ newConn.setCommon_entrance_id(Integer.parseInt(strings[11]));
+ newConn.setCommon_device_id(strings[12]);
+ newConn.setCommon_link_id(Integer.parseInt(strings[13]));
+ newConn.setCommon_encapsulation(Integer.parseInt(strings[14]));
+ newConn.setCommon_direction(Integer.parseInt(strings[15]));
+ newConn.setCommon_stream_dir(Integer.parseInt(strings[18]));
+ newConn.setCommon_sled_ip(strings[19]);
+ newConn.setCommon_address_list(strings[20]);
+ newConn.setCommon_server_location(strings[21]);
+ newConn.setCommon_client_location(strings[22]);
+ newConn.setCommon_server_asn(strings[23]);
+ newConn.setCommon_client_asn(strings[24]);
+ newConn.setCommon_subscriber_id(strings[26]);
+ newConn.setCommon_user_region(strings[27]);
+ newConn.setCommon_c2s_pkt_num(Integer.parseInt(strings[30]));
+ newConn.setCommon_s2c_pkt_num(Integer.parseInt(strings[31]));
+ newConn.setCommon_c2s_byte_num(Integer.parseInt(strings[32]));
+ newConn.setCommon_s2c_byte_num(Integer.parseInt(strings[33]));
+ newConn.setCommon_protocol_label(strings[34]);
+ newConn.setCommon_app_label(strings[35]);
+ newConn.setCommon_action(Integer.parseInt(strings[39]));
+
+
+ json = JSONObject.toJSONString(newConn);
+
+ body = json.getBytes();
+ event.setBody(body);
+ return event;
+ }
+ catch (Exception e){
+ logger.error("消息转换异常========");
+ e.printStackTrace();
+ return null;
+ }
+
}
//TODO 处理多个event的函数
diff --git a/src/main/java/cn/ac/iie/flume/interceptor/File2Json.java b/src/main/java/cn/ac/iie/flume/interceptor/File2Json.java
new file mode 100644
index 0000000..6f5fec8
--- /dev/null
+++ b/src/main/java/cn/ac/iie/flume/interceptor/File2Json.java
@@ -0,0 +1,98 @@
+package cn.ac.iie.flume.interceptor;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.interceptor.Interceptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class File2Json implements Interceptor {
+
+ //打印日志,便于测试方法的执行顺序
+ private static final Logger logger = LoggerFactory.getLogger(File2Json.class);
+
+ private static byte[] body;
+
+ private static String json;
+
+
+
+ @Override
+ public void initialize() {
+ logger.info("----------自定义拦截器的initialize方法执行");
+ }
+
+ @Override
+ public Event intercept(Event event) {
+ logger.info("----------intercept(Event event)方法执行,处理单个event");
+
+ //TODO 处理单个event的代码
+ //取出一条csv格式的数据
+
+ try{
+ body = event.getBody();
+
+ Map<String, String> headers = event.getHeaders();
+
+
+ logger.info( "" + headers.keySet().size());
+ for (String key : headers.keySet()) {
+
+ logger.info("[ key: " + key +"," + "value: " + headers.get(key) + "]");
+ }
+
+ event.setBody(body);
+ return event;
+ }
+ catch (Exception e){
+ logger.error("消息转换异常========");
+ e.printStackTrace();
+ return null;
+ }
+
+ }
+
+ //TODO 处理多个event的函数
+ @Override
+ public List<Event> intercept(List<Event> events) {
+
+ List<Event> results = new ArrayList<>();
+ Event event;
+ for (Event e : events) {
+
+ event = intercept(e);
+ if (event != null) {
+ results.add(event);
+ }
+ }
+ return results;
+ }
+
+ @Override
+ public void close() {
+ logger.info("----------自定义拦截器close方法执行");
+ }
+
+ public static class Builder implements Interceptor.Builder {
+
+ @Override
+ public Interceptor build() {
+ logger.info("----------build方法执行");
+ return new File2Json();
+ }
+
+ @Override
+ public void configure(Context context) {
+ logger.info("----------configure方法执行");
+ }
+ }
+
+
+ public static void main(String[] args) {
+
+ }
+}
diff --git a/src/main/java/cn/ac/iie/flume/source/MySource.java b/src/main/java/cn/ac/iie/flume/source/MySource.java
new file mode 100644
index 0000000..2ac7561
--- /dev/null
+++ b/src/main/java/cn/ac/iie/flume/source/MySource.java
@@ -0,0 +1,42 @@
+package cn.ac.iie.flume.source;
+
+import cn.ac.iie.flume.bean.NewConn;
+import org.apache.flume.Context;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.PollableSource;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.source.AbstractSource;
+
+/**
+ * @ClassNameMySource
+ * @Author [email protected]
+ * @Date2020/7/28 18:29
+ * @Version V1.0
+ **/
+public class MySource extends AbstractSource implements Configurable, PollableSource {
+
+
+ @Override
+ public Status process() throws EventDeliveryException {
+
+ NewConn newConn = new NewConn();
+
+
+ return null;
+ }
+
+ @Override
+ public long getBackOffSleepIncrement() {
+ return 0;
+ }
+
+ @Override
+ public long getMaxBackOffSleepInterval() {
+ return 0;
+ }
+
+ @Override
+ public void configure(Context context) {
+
+ }
+}
diff --git a/src/test/java/EventTest.java b/src/test/java/EventTest.java
new file mode 100644
index 0000000..6d4e065
--- /dev/null
+++ b/src/test/java/EventTest.java
@@ -0,0 +1,21 @@
+/**
+ * @ClassNameEventTest
+ * @Author [email protected]
+ * @Date2020/7/29 15:38
+ * @Version V1.0
+ **/
+public class EventTest {
+
+ public static void main(String[] args) {
+
+ String json = "\"0e1178ab-986f-4bbb-91f9-94b1f70ce3a2\",0,1546624800,1546624800,\"IPv4_UDP\",0,\"37.150.25.154\",\"46.20.187.219\",32265,44985,160,2,0,0,0,0,\"\",\"\",1,\"10.3.4.10\",\"48975-14441-46.20.187.219-37.150.25.154\",\"\",\"\",\"42322\",\"9198\",\"\",\"\",\"\",\"\",\"PROTO_ID=0;APP_ID=0;OS_ID=0;BS_ID=0;WEB_ID=0;BEHAV_ID=0;\",3,0,60,0,0,0,0,0,0,0";
+
+ String[] strings = json.replaceAll("\t"," " ).replaceAll("\"","" ).split(",");
+
+ System.out.println(strings[29]);
+ System.out.println(strings[28]);
+ System.out.println(strings[30]);
+
+
+ }
+}