diff options
Diffstat (limited to 'TWA-PIC/flink')
80 files changed, 5058 insertions, 0 deletions
diff --git a/TWA-PIC/flink/bin/bash-java-utils.jar b/TWA-PIC/flink/bin/bash-java-utils.jar Binary files differnew file mode 100644 index 0000000..5b2e369 --- /dev/null +++ b/TWA-PIC/flink/bin/bash-java-utils.jar diff --git a/TWA-PIC/flink/bin/config.sh b/TWA-PIC/flink/bin/config.sh new file mode 100644 index 0000000..208b2d1 --- /dev/null +++ b/TWA-PIC/flink/bin/config.sh @@ -0,0 +1,560 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +constructFlinkClassPath() { + local FLINK_DIST + local FLINK_CLASSPATH + + while read -d '' -r jarfile ; do + if [[ "$jarfile" =~ .*/flink-dist[^/]*.jar$ ]]; then + FLINK_DIST="$FLINK_DIST":"$jarfile" + elif [[ "$FLINK_CLASSPATH" == "" ]]; then + FLINK_CLASSPATH="$jarfile"; + else + FLINK_CLASSPATH="$FLINK_CLASSPATH":"$jarfile" + fi + done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0 | sort -z) + + if [[ "$FLINK_DIST" == "" ]]; then + # write error message to stderr since stdout is stored as the classpath + (>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_LIB_DIR.") + + # exit function with empty classpath to force process failure + exit 1 + fi + + echo "$FLINK_CLASSPATH""$FLINK_DIST" +} + +findFlinkDistJar() { + local FLINK_DIST="`find "$FLINK_LIB_DIR" -name 'flink-dist*.jar'`" + + if [[ "$FLINK_DIST" == "" ]]; then + # write error message to stderr since stdout is stored as the classpath + (>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_LIB_DIR.") + + # exit function with empty classpath to force process failure + exit 1 + fi + + echo "$FLINK_DIST" +} + +# These are used to mangle paths that are passed to java when using +# cygwin. Cygwin paths are like linux paths, i.e. /path/to/somewhere +# but the windows java version expects them in Windows Format, i.e. C:\bla\blub. +# "cygpath" can do the conversion. +manglePath() { + UNAME=$(uname -s) + if [ "${UNAME:0:6}" == "CYGWIN" ]; then + echo `cygpath -w "$1"` + else + echo $1 + fi +} + +manglePathList() { + UNAME=$(uname -s) + # a path list, for example a java classpath + if [ "${UNAME:0:6}" == "CYGWIN" ]; then + echo `cygpath -wp "$1"` + else + echo $1 + fi +} + +# Looks up a config value by key from a simple YAML-style key-value map. +# $1: key to look up +# $2: default value to return if key does not exist +# $3: config file to read from +readFromConfig() { + local key=$1 + local defaultValue=$2 + local configFile=$3 + + # first extract the value with the given key (1st sed), then trim the result (2nd sed) + # if a key exists multiple times, take the "last" one (tail) + local value=`sed -n "s/^[ ]*${key}[ ]*: \([^#]*\).*$/\1/p" "${configFile}" | sed "s/^ *//;s/ *$//" | tail -n 1` + + [ -z "$value" ] && echo "$defaultValue" || echo "$value" +} + +######################################################################################################################## +# DEFAULT CONFIG VALUES: These values will be used when nothing has been specified in conf/flink-conf.yaml +# -or- the respective environment variables are not set. +######################################################################################################################## + + +# WARNING !!! , these values are only used if there is nothing else is specified in +# conf/flink-conf.yaml + +DEFAULT_ENV_PID_DIR="$(cd "`dirname "$0"`"/..; pwd)/tmp" # Directory to store *.pid files to +DEFAULT_ENV_LOG_MAX=10 # Maximum number of old log files to keep +DEFAULT_ENV_JAVA_OPTS="" # Optional JVM args +DEFAULT_ENV_JAVA_OPTS_JM="" # Optional JVM args (JobManager) +DEFAULT_ENV_JAVA_OPTS_TM="" # Optional JVM args (TaskManager) +DEFAULT_ENV_JAVA_OPTS_HS="" # Optional JVM args (HistoryServer) +DEFAULT_ENV_JAVA_OPTS_CLI="" # Optional JVM args (Client) +DEFAULT_ENV_SSH_OPTS="" # Optional SSH parameters running in cluster mode +DEFAULT_YARN_CONF_DIR="" # YARN Configuration Directory, if necessary +DEFAULT_HADOOP_CONF_DIR="" # Hadoop Configuration Directory, if necessary +DEFAULT_HBASE_CONF_DIR="" # HBase Configuration Directory, if necessary + +######################################################################################################################## +# CONFIG KEYS: The default values can be overwritten by the following keys in conf/flink-conf.yaml +######################################################################################################################## + +KEY_TASKM_COMPUTE_NUMA="taskmanager.compute.numa" + +KEY_ENV_PID_DIR="env.pid.dir" +KEY_ENV_LOG_DIR="env.log.dir" +KEY_ENV_LOG_MAX="env.log.max" +KEY_ENV_YARN_CONF_DIR="env.yarn.conf.dir" +KEY_ENV_HADOOP_CONF_DIR="env.hadoop.conf.dir" +KEY_ENV_HBASE_CONF_DIR="env.hbase.conf.dir" +KEY_ENV_JAVA_HOME="env.java.home" +KEY_ENV_JAVA_OPTS="env.java.opts" +KEY_ENV_JAVA_OPTS_JM="env.java.opts.jobmanager" +KEY_ENV_JAVA_OPTS_TM="env.java.opts.taskmanager" +KEY_ENV_JAVA_OPTS_HS="env.java.opts.historyserver" +KEY_ENV_JAVA_OPTS_CLI="env.java.opts.client" +KEY_ENV_SSH_OPTS="env.ssh.opts" +KEY_HIGH_AVAILABILITY="high-availability" +KEY_ZK_HEAP_MB="zookeeper.heap.mb" + +######################################################################################################################## +# PATHS AND CONFIG +######################################################################################################################## + +target="$0" +# For the case, the executable has been directly symlinked, figure out +# the correct bin path by following its symlink up to an upper bound. +# Note: we can't use the readlink utility here if we want to be POSIX +# compatible. +iteration=0 +while [ -L "$target" ]; do + if [ "$iteration" -gt 100 ]; then + echo "Cannot resolve path: You have a cyclic symlink in $target." + break + fi + ls=`ls -ld -- "$target"` + target=`expr "$ls" : '.* -> \(.*\)$'` + iteration=$((iteration + 1)) +done + +# Convert relative path to absolute path and resolve directory symlinks +bin=`dirname "$target"` +SYMLINK_RESOLVED_BIN=`cd "$bin"; pwd -P` + +# Define the main directory of the flink installation +# If config.sh is called by pyflink-shell.sh in python bin directory(pip installed), then do not need to set the FLINK_HOME here. +if [ -z "$_FLINK_HOME_DETERMINED" ]; then + FLINK_HOME=`dirname "$SYMLINK_RESOLVED_BIN"` +fi +if [ -z "$FLINK_LIB_DIR" ]; then FLINK_LIB_DIR=$FLINK_HOME/lib; fi +if [ -z "$FLINK_PLUGINS_DIR" ]; then FLINK_PLUGINS_DIR=$FLINK_HOME/plugins; fi +if [ -z "$FLINK_OPT_DIR" ]; then FLINK_OPT_DIR=$FLINK_HOME/opt; fi + + +# These need to be mangled because they are directly passed to java. +# The above lib path is used by the shell script to retrieve jars in a +# directory, so it needs to be unmangled. +FLINK_HOME_DIR_MANGLED=`manglePath "$FLINK_HOME"` +if [ -z "$FLINK_CONF_DIR" ]; then FLINK_CONF_DIR=$FLINK_HOME_DIR_MANGLED/conf; fi +FLINK_BIN_DIR=$FLINK_HOME_DIR_MANGLED/bin +DEFAULT_FLINK_LOG_DIR=$FLINK_HOME_DIR_MANGLED/log +FLINK_CONF_FILE="flink-conf.yaml" +YAML_CONF=${FLINK_CONF_DIR}/${FLINK_CONF_FILE} + +### Exported environment variables ### +export FLINK_CONF_DIR +export FLINK_BIN_DIR +export FLINK_PLUGINS_DIR +# export /lib dir to access it during deployment of the Yarn staging files +export FLINK_LIB_DIR +# export /opt dir to access it for the SQL client +export FLINK_OPT_DIR + +######################################################################################################################## +# ENVIRONMENT VARIABLES +######################################################################################################################## + +# read JAVA_HOME from config with no default value +MY_JAVA_HOME=$(readFromConfig ${KEY_ENV_JAVA_HOME} "" "${YAML_CONF}") +# check if config specified JAVA_HOME +if [ -z "${MY_JAVA_HOME}" ]; then + # config did not specify JAVA_HOME. Use system JAVA_HOME + MY_JAVA_HOME="${JAVA_HOME}" +fi +# check if we have a valid JAVA_HOME and if java is not available +if [ -z "${MY_JAVA_HOME}" ] && ! type java > /dev/null 2> /dev/null; then + echo "Please specify JAVA_HOME. Either in Flink config ./conf/flink-conf.yaml or as system-wide JAVA_HOME." + exit 1 +else + JAVA_HOME="${MY_JAVA_HOME}" +fi + +UNAME=$(uname -s) +if [ "${UNAME:0:6}" == "CYGWIN" ]; then + JAVA_RUN=java +else + if [[ -d "$JAVA_HOME" ]]; then + JAVA_RUN="$JAVA_HOME"/bin/java + else + JAVA_RUN=java + fi +fi + +# Define HOSTNAME if it is not already set +if [ -z "${HOSTNAME}" ]; then + HOSTNAME=`hostname` +fi + +IS_NUMBER="^[0-9]+$" + +# Verify that NUMA tooling is available +command -v numactl >/dev/null 2>&1 +if [[ $? -ne 0 ]]; then + FLINK_TM_COMPUTE_NUMA="false" +else + # Define FLINK_TM_COMPUTE_NUMA if it is not already set + if [ -z "${FLINK_TM_COMPUTE_NUMA}" ]; then + FLINK_TM_COMPUTE_NUMA=$(readFromConfig ${KEY_TASKM_COMPUTE_NUMA} "false" "${YAML_CONF}") + fi +fi + +if [ -z "${MAX_LOG_FILE_NUMBER}" ]; then + MAX_LOG_FILE_NUMBER=$(readFromConfig ${KEY_ENV_LOG_MAX} ${DEFAULT_ENV_LOG_MAX} "${YAML_CONF}") + export MAX_LOG_FILE_NUMBER +fi + +if [ -z "${FLINK_LOG_DIR}" ]; then + FLINK_LOG_DIR=$(readFromConfig ${KEY_ENV_LOG_DIR} "${DEFAULT_FLINK_LOG_DIR}" "${YAML_CONF}") +fi + +if [ -z "${YARN_CONF_DIR}" ]; then + YARN_CONF_DIR=$(readFromConfig ${KEY_ENV_YARN_CONF_DIR} "${DEFAULT_YARN_CONF_DIR}" "${YAML_CONF}") +fi + +if [ -z "${HADOOP_CONF_DIR}" ]; then + HADOOP_CONF_DIR=$(readFromConfig ${KEY_ENV_HADOOP_CONF_DIR} "${DEFAULT_HADOOP_CONF_DIR}" "${YAML_CONF}") +fi + +if [ -z "${HBASE_CONF_DIR}" ]; then + HBASE_CONF_DIR=$(readFromConfig ${KEY_ENV_HBASE_CONF_DIR} "${DEFAULT_HBASE_CONF_DIR}" "${YAML_CONF}") +fi + +if [ -z "${FLINK_PID_DIR}" ]; then + FLINK_PID_DIR=$(readFromConfig ${KEY_ENV_PID_DIR} "${DEFAULT_ENV_PID_DIR}" "${YAML_CONF}") +fi + +if [ -z "${FLINK_ENV_JAVA_OPTS}" ]; then + FLINK_ENV_JAVA_OPTS=$(readFromConfig ${KEY_ENV_JAVA_OPTS} "${DEFAULT_ENV_JAVA_OPTS}" "${YAML_CONF}") + + # Remove leading and ending double quotes (if present) of value + FLINK_ENV_JAVA_OPTS="$( echo "${FLINK_ENV_JAVA_OPTS}" | sed -e 's/^"//' -e 's/"$//' )" +fi + +if [ -z "${FLINK_ENV_JAVA_OPTS_JM}" ]; then + FLINK_ENV_JAVA_OPTS_JM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_JM} "${DEFAULT_ENV_JAVA_OPTS_JM}" "${YAML_CONF}") + # Remove leading and ending double quotes (if present) of value + FLINK_ENV_JAVA_OPTS_JM="$( echo "${FLINK_ENV_JAVA_OPTS_JM}" | sed -e 's/^"//' -e 's/"$//' )" +fi + +if [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then + FLINK_ENV_JAVA_OPTS_TM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_TM} "${DEFAULT_ENV_JAVA_OPTS_TM}" "${YAML_CONF}") + # Remove leading and ending double quotes (if present) of value + FLINK_ENV_JAVA_OPTS_TM="$( echo "${FLINK_ENV_JAVA_OPTS_TM}" | sed -e 's/^"//' -e 's/"$//' )" +fi + +if [ -z "${FLINK_ENV_JAVA_OPTS_HS}" ]; then + FLINK_ENV_JAVA_OPTS_HS=$(readFromConfig ${KEY_ENV_JAVA_OPTS_HS} "${DEFAULT_ENV_JAVA_OPTS_HS}" "${YAML_CONF}") + # Remove leading and ending double quotes (if present) of value + FLINK_ENV_JAVA_OPTS_HS="$( echo "${FLINK_ENV_JAVA_OPTS_HS}" | sed -e 's/^"//' -e 's/"$//' )" +fi + +if [ -z "${FLINK_ENV_JAVA_OPTS_CLI}" ]; then + FLINK_ENV_JAVA_OPTS_CLI=$(readFromConfig ${KEY_ENV_JAVA_OPTS_CLI} "${DEFAULT_ENV_JAVA_OPTS_CLI}" "${YAML_CONF}") + # Remove leading and ending double quotes (if present) of value + FLINK_ENV_JAVA_OPTS_CLI="$( echo "${FLINK_ENV_JAVA_OPTS_CLI}" | sed -e 's/^"//' -e 's/"$//' )" +fi + +if [ -z "${FLINK_SSH_OPTS}" ]; then + FLINK_SSH_OPTS=$(readFromConfig ${KEY_ENV_SSH_OPTS} "${DEFAULT_ENV_SSH_OPTS}" "${YAML_CONF}") +fi + +# Define ZK_HEAP if it is not already set +if [ -z "${ZK_HEAP}" ]; then + ZK_HEAP=$(readFromConfig ${KEY_ZK_HEAP_MB} 0 "${YAML_CONF}") +fi + +# High availability +if [ -z "${HIGH_AVAILABILITY}" ]; then + HIGH_AVAILABILITY=$(readFromConfig ${KEY_HIGH_AVAILABILITY} "" "${YAML_CONF}") + if [ -z "${HIGH_AVAILABILITY}" ]; then + # Try deprecated value + DEPRECATED_HA=$(readFromConfig "recovery.mode" "" "${YAML_CONF}") + if [ -z "${DEPRECATED_HA}" ]; then + HIGH_AVAILABILITY="none" + elif [ ${DEPRECATED_HA} == "standalone" ]; then + # Standalone is now 'none' + HIGH_AVAILABILITY="none" + else + HIGH_AVAILABILITY=${DEPRECATED_HA} + fi + fi +fi + +# Arguments for the JVM. Used for job and task manager JVMs. +# DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys +# JobManagerOptions#TOTAL_PROCESS_MEMORY and TaskManagerOptions#TOTAL_PROCESS_MEMORY for that! +if [ -z "${JVM_ARGS}" ]; then + JVM_ARGS="" +fi + +# Check if deprecated HADOOP_HOME is set, and specify config path to HADOOP_CONF_DIR if it's empty. +if [ -z "$HADOOP_CONF_DIR" ]; then + if [ -n "$HADOOP_HOME" ]; then + # HADOOP_HOME is set. Check if its a Hadoop 1.x or 2.x HADOOP_HOME path + if [ -d "$HADOOP_HOME/conf" ]; then + # It's Hadoop 1.x + HADOOP_CONF_DIR="$HADOOP_HOME/conf" + fi + if [ -d "$HADOOP_HOME/etc/hadoop" ]; then + # It's Hadoop 2.2+ + HADOOP_CONF_DIR="$HADOOP_HOME/etc/hadoop" + fi + fi +fi + +# if neither HADOOP_CONF_DIR nor HADOOP_CLASSPATH are set, use some common default (if available) +if [ -z "$HADOOP_CONF_DIR" ] && [ -z "$HADOOP_CLASSPATH" ]; then + if [ -d "/etc/hadoop/conf" ]; then + echo "Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or HADOOP_CLASSPATH was set." + HADOOP_CONF_DIR="/etc/hadoop/conf" + fi +fi + +# Check if deprecated HBASE_HOME is set, and specify config path to HBASE_CONF_DIR if it's empty. +if [ -z "$HBASE_CONF_DIR" ]; then + if [ -n "$HBASE_HOME" ]; then + # HBASE_HOME is set. + if [ -d "$HBASE_HOME/conf" ]; then + HBASE_CONF_DIR="$HBASE_HOME/conf" + fi + fi +fi + +# try and set HBASE_CONF_DIR to some common default if it's not set +if [ -z "$HBASE_CONF_DIR" ]; then + if [ -d "/etc/hbase/conf" ]; then + echo "Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set." + HBASE_CONF_DIR="/etc/hbase/conf" + fi +fi + +INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}" + +if [ -n "${HBASE_CONF_DIR}" ]; then + INTERNAL_HADOOP_CLASSPATHS="${INTERNAL_HADOOP_CLASSPATHS}:${HBASE_CONF_DIR}" +fi + +# Auxilliary function which extracts the name of host from a line which +# also potentially includes topology information and the taskManager type +extractHostName() { + # handle comments: extract first part of string (before first # character) + WORKER=`echo $1 | cut -d'#' -f 1` + + # Extract the hostname from the network hierarchy + if [[ "$WORKER" =~ ^.*/([0-9a-zA-Z.-]+)$ ]]; then + WORKER=${BASH_REMATCH[1]} + fi + + echo $WORKER +} + +readMasters() { + MASTERS_FILE="${FLINK_CONF_DIR}/masters" + + if [[ ! -f "${MASTERS_FILE}" ]]; then + echo "No masters file. Please specify masters in 'conf/masters'." + exit 1 + fi + + MASTERS=() + WEBUIPORTS=() + + MASTERS_ALL_LOCALHOST=true + GOON=true + while $GOON; do + read line || GOON=false + HOSTWEBUIPORT=$( extractHostName $line) + + if [ -n "$HOSTWEBUIPORT" ]; then + HOST=$(echo $HOSTWEBUIPORT | cut -f1 -d:) + WEBUIPORT=$(echo $HOSTWEBUIPORT | cut -s -f2 -d:) + MASTERS+=(${HOST}) + + if [ -z "$WEBUIPORT" ]; then + WEBUIPORTS+=(0) + else + WEBUIPORTS+=(${WEBUIPORT}) + fi + + if [ "${HOST}" != "localhost" ] && [ "${HOST}" != "127.0.0.1" ] ; then + MASTERS_ALL_LOCALHOST=false + fi + fi + done < "$MASTERS_FILE" +} + +readWorkers() { + WORKERS_FILE="${FLINK_CONF_DIR}/workers" + + if [[ ! -f "$WORKERS_FILE" ]]; then + echo "No workers file. Please specify workers in 'conf/workers'." + exit 1 + fi + + WORKERS=() + + WORKERS_ALL_LOCALHOST=true + GOON=true + while $GOON; do + read line || GOON=false + HOST=$( extractHostName $line) + if [ -n "$HOST" ] ; then + WORKERS+=(${HOST}) + if [ "${HOST}" != "localhost" ] && [ "${HOST}" != "127.0.0.1" ] ; then + WORKERS_ALL_LOCALHOST=false + fi + fi + done < "$WORKERS_FILE" +} + +# starts or stops TMs on all workers +# TMWorkers start|stop +TMWorkers() { + CMD=$1 + + readWorkers + + if [ ${WORKERS_ALL_LOCALHOST} = true ] ; then + # all-local setup + for worker in ${WORKERS[@]}; do + "${FLINK_BIN_DIR}"/taskmanager.sh "${CMD}" + done + else + # non-local setup + # start/stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) when available + command -v pdsh >/dev/null 2>&1 + if [[ $? -ne 0 ]]; then + for worker in ${WORKERS[@]}; do + ssh -n $FLINK_SSH_OPTS $worker -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\" &" + done + else + PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w $(IFS=, ; echo "${WORKERS[*]}") \ + "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\"" + fi + fi +} + +runBashJavaUtilsCmd() { + local cmd=$1 + local conf_dir=$2 + local class_path=$3 + local dynamic_args=${@:4} + class_path=`manglePathList "${class_path}"` + + local output=`"${JAVA_RUN}" -classpath "${class_path}" org.apache.flink.runtime.util.bash.BashJavaUtils ${cmd} --configDir "${conf_dir}" $dynamic_args 2>&1 | tail -n 1000` + if [[ $? -ne 0 ]]; then + echo "[ERROR] Cannot run BashJavaUtils to execute command ${cmd}." 1>&2 + # Print the output in case the user redirect the log to console. + echo "$output" 1>&2 + exit 1 + fi + + echo "$output" +} + +extractExecutionResults() { + local output="$1" + local expected_lines="$2" + local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:" + local execution_results + local num_lines + + execution_results=$(echo "${output}" | grep ${EXECUTION_PREFIX}) + num_lines=$(echo "${execution_results}" | wc -l) + # explicit check for empty result, becuase if execution_results is empty, then wc returns 1 + if [[ -z ${execution_results} ]]; then + echo "[ERROR] The execution result is empty." 1>&2 + exit 1 + fi + if [[ ${num_lines} -ne ${expected_lines} ]]; then + echo "[ERROR] The execution results has unexpected number of lines, expected: ${expected_lines}, actual: ${num_lines}." 1>&2 + echo "[ERROR] An execution result line is expected following the prefix '${EXECUTION_PREFIX}'" 1>&2 + echo "$output" 1>&2 + exit 1 + fi + + echo "${execution_results//${EXECUTION_PREFIX}/}" +} + +extractLoggingOutputs() { + local output="$1" + local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:" + + echo "${output}" | grep -v ${EXECUTION_PREFIX} +} + +parseResourceParamsAndExportLogs() { + local cmd=$1 + java_utils_output=$(runBashJavaUtilsCmd ${cmd} "${FLINK_CONF_DIR}" "${FLINK_BIN_DIR}/bash-java-utils.jar:$(findFlinkDistJar)" "$@") + logging_output=$(extractLoggingOutputs "${java_utils_output}") + params_output=$(extractExecutionResults "${java_utils_output}" 2) + + if [[ $? -ne 0 ]]; then + echo "[ERROR] Could not get JVM parameters and dynamic configurations properly." + echo "[ERROR] Raw output from BashJavaUtils:" + echo "$java_utils_output" + exit 1 + fi + + jvm_params=$(echo "${params_output}" | head -n1) + export JVM_ARGS="${JVM_ARGS} ${jvm_params}" + export DYNAMIC_PARAMETERS=$(IFS=" " echo "${params_output}" | tail -n1) + + export FLINK_INHERITED_LOGS=" +$FLINK_INHERITED_LOGS + +RESOURCE_PARAMS extraction logs: +jvm_params: $jvm_params +dynamic_configs: $DYNAMIC_PARAMETERS +logs: $logging_output +" +} + +parseJmArgsAndExportLogs() { + parseResourceParamsAndExportLogs GET_JM_RESOURCE_PARAMS +} + +parseTmArgsAndExportLogs() { + parseResourceParamsAndExportLogs GET_TM_RESOURCE_PARAMS +} diff --git a/TWA-PIC/flink/bin/find-flink-home.sh b/TWA-PIC/flink/bin/find-flink-home.sh new file mode 100644 index 0000000..e0fe95f --- /dev/null +++ b/TWA-PIC/flink/bin/find-flink-home.sh @@ -0,0 +1,28 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +CURRENT_DIR="$( cd "$(dirname "$0")" ; pwd -P )" +FIND_FLINK_HOME_PYTHON_SCRIPT="$CURRENT_DIR/find_flink_home.py" + +if [ ! -f "$FIND_FLINK_HOME_PYTHON_SCRIPT" ]; then + export FLINK_HOME="$( cd "$CURRENT_DIR"/.. ; pwd -P )" +else + PYFLINK_PYTHON="${PYFLINK_PYTHON:-"python"}" + export FLINK_HOME=$("$FIND_FLINK_HOME_PYTHON_SCRIPT") +fi diff --git a/TWA-PIC/flink/bin/flink b/TWA-PIC/flink/bin/flink new file mode 100644 index 0000000..3413463 --- /dev/null +++ b/TWA-PIC/flink/bin/flink @@ -0,0 +1,55 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +target="$0" +# For the case, the executable has been directly symlinked, figure out +# the correct bin path by following its symlink up to an upper bound. +# Note: we can't use the readlink utility here if we want to be POSIX +# compatible. +iteration=0 +while [ -L "$target" ]; do + if [ "$iteration" -gt 100 ]; then + echo "Cannot resolve path: You have a cyclic symlink in $target." + break + fi + ls=`ls -ld -- "$target"` + target=`expr "$ls" : '.* -> \(.*\)$'` + iteration=$((iteration + 1)) +done + +# Convert relative path to absolute path +bin=`dirname "$target"` + +# get flink config +. "$bin"/config.sh + +if [ "$FLINK_IDENT_STRING" = "" ]; then + FLINK_IDENT_STRING="$USER" +fi + +CC_CLASSPATH=`constructFlinkClassPath` + +log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-client-$HOSTNAME.log +log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml) + +# Add Client-specific JVM options +FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_CLI}" + +# Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems +exec "${JAVA_RUN}" $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@" diff --git a/TWA-PIC/flink/bin/flink-console.sh b/TWA-PIC/flink/bin/flink-console.sh new file mode 100644 index 0000000..6ebe2ac --- /dev/null +++ b/TWA-PIC/flink/bin/flink-console.sh @@ -0,0 +1,111 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Start a Flink service as a console application. Must be stopped with Ctrl-C +# or with SIGTERM by kill or the controlling process. +USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|kubernetes-session|kubernetes-application|kubernetes-taskmanager) [args]" + +SERVICE=$1 +ARGS=("${@:2}") # get remaining arguments as array + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +case $SERVICE in + (taskexecutor) + CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner + ;; + + (historyserver) + CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer + ;; + + (zookeeper) + CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer + ;; + + (standalonesession) + CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint + ;; + + (standalonejob) + CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint + ;; + + (kubernetes-session) + CLASS_TO_RUN=org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint + ;; + + (kubernetes-application) + CLASS_TO_RUN=org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint + ;; + + (kubernetes-taskmanager) + CLASS_TO_RUN=org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner + ;; + + (*) + echo "Unknown service '${SERVICE}'. $USAGE." + exit 1 + ;; +esac + +FLINK_TM_CLASSPATH=`constructFlinkClassPath` + +if [ "$FLINK_IDENT_STRING" = "" ]; then + FLINK_IDENT_STRING="$USER" +fi + +pid=$FLINK_PID_DIR/flink-$FLINK_IDENT_STRING-$SERVICE.pid +mkdir -p "$FLINK_PID_DIR" +# The lock needs to be released after use because this script is started foreground +command -v flock >/dev/null 2>&1 +flock_exist=$? +if [[ ${flock_exist} -eq 0 ]]; then + exec 200<"$FLINK_PID_DIR" + flock 200 +fi +# Remove the pid file when all the processes are dead +if [ -f "$pid" ]; then + all_dead=0 + while read each_pid; do + # Check whether the process is still running + kill -0 $each_pid > /dev/null 2>&1 + [[ $? -eq 0 ]] && all_dead=1 + done < "$pid" + [ ${all_dead} -eq 0 ] && rm $pid +fi +id=$([ -f "$pid" ] && echo $(wc -l < "$pid") || echo "0") + +FLINK_LOG_PREFIX="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-${SERVICE}-${id}-${HOSTNAME}" +log="${FLINK_LOG_PREFIX}.log" + +log_setting=("-Dlog.file=${log}" "-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlog4j.configurationFile=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml") + +echo "Starting $SERVICE as a console application on host $HOSTNAME." + +# Add the current process id to pid file +echo $$ >> "$pid" 2>/dev/null + +# Release the lock because the java process runs in the foreground and would block other processes from modifying the pid file +[[ ${flock_exist} -eq 0 ]] && flock -u 200 + +exec "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" diff --git a/TWA-PIC/flink/bin/flink-daemon.sh b/TWA-PIC/flink/bin/flink-daemon.sh new file mode 100644 index 0000000..67fe698 --- /dev/null +++ b/TWA-PIC/flink/bin/flink-daemon.sh @@ -0,0 +1,194 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Start/stop a Flink daemon. +USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]" + +STARTSTOP=$1 +DAEMON=$2 +ARGS=("${@:3}") # get remaining arguments as array + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +case $DAEMON in + (taskexecutor) + CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner + ;; + + (zookeeper) + CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer + ;; + + (historyserver) + CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer + ;; + + (standalonesession) + CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint + ;; + + (standalonejob) + CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint + ;; + + (*) + echo "Unknown daemon '${DAEMON}'. $USAGE." + exit 1 + ;; +esac + +if [ "$FLINK_IDENT_STRING" = "" ]; then + FLINK_IDENT_STRING="$USER" +fi + +FLINK_TM_CLASSPATH=`constructFlinkClassPath` + +pid=$FLINK_PID_DIR/flink-$FLINK_IDENT_STRING-$DAEMON.pid + +mkdir -p "$FLINK_PID_DIR" + +# Log files for daemons are indexed from the process ID's position in the PID +# file. The following lock prevents a race condition during daemon startup +# when multiple daemons read, index, and write to the PID file concurrently. +# The lock is created on the PID directory since a lock file cannot be safely +# removed. The daemon is started with the lock closed and the lock remains +# active in this script until the script exits. +command -v flock >/dev/null 2>&1 +if [[ $? -eq 0 ]]; then + exec 200<"$FLINK_PID_DIR" + flock 200 +fi + +# Ascending ID depending on number of lines in pid file. +# This allows us to start multiple daemon of each type. +id=$([ -f "$pid" ] && echo $(wc -l < "$pid") || echo "0") + +FLINK_LOG_PREFIX="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-${DAEMON}-${id}-${HOSTNAME}" +log="${FLINK_LOG_PREFIX}.log" +out="${FLINK_LOG_PREFIX}.out" + +log_setting=("-Dlog.file=${log}" "-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j.properties" "-Dlog4j.configurationFile=file:${FLINK_CONF_DIR}/log4j.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback.xml") + +function guaranteed_kill { + to_stop_pid=$1 + daemon=$2 + + # send sigterm for graceful shutdown + kill $to_stop_pid + # if timeout exists, use it + if command -v timeout &> /dev/null ; then + # wait 10 seconds for process to stop. By default, Flink kills the JVM 5 seconds after sigterm. + timeout 10 tail --pid=$to_stop_pid -f /dev/null + if [ "$?" -eq 124 ]; then + echo "Daemon $daemon didn't stop within 10 seconds. Killing it." + # send sigkill + kill -9 $to_stop_pid + fi + fi +} + +case $STARTSTOP in + + (start) + + # Print a warning if daemons are already running on host + if [ -f "$pid" ]; then + active=() + while IFS='' read -r p || [[ -n "$p" ]]; do + kill -0 $p >/dev/null 2>&1 + if [ $? -eq 0 ]; then + active+=($p) + fi + done < "${pid}" + + count="${#active[@]}" + + if [ ${count} -gt 0 ]; then + echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME." + fi + fi + + # Evaluate user options for local variable expansion + FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS}) + + echo "Starting $DAEMON daemon on host $HOSTNAME." + "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 200<&- 2>&1 < /dev/null & + + mypid=$! + + # Add to pid file if successful start + if [[ ${mypid} =~ ${IS_NUMBER} ]] && kill -0 $mypid > /dev/null 2>&1 ; then + echo $mypid >> "$pid" + else + echo "Error starting $DAEMON daemon." + exit 1 + fi + ;; + + (stop) + if [ -f "$pid" ]; then + # Remove last in pid file + to_stop=$(tail -n 1 "$pid") + + if [ -z $to_stop ]; then + rm "$pid" # If all stopped, clean up pid file + echo "No $DAEMON daemon to stop on host $HOSTNAME." + else + sed \$d "$pid" > "$pid.tmp" # all but last line + + # If all stopped, clean up pid file + [ $(wc -l < "$pid.tmp") -eq 0 ] && rm "$pid" "$pid.tmp" || mv "$pid.tmp" "$pid" + + if kill -0 $to_stop > /dev/null 2>&1; then + echo "Stopping $DAEMON daemon (pid: $to_stop) on host $HOSTNAME." + guaranteed_kill $to_stop $DAEMON + else + echo "No $DAEMON daemon (pid: $to_stop) is running anymore on $HOSTNAME." + fi + fi + else + echo "No $DAEMON daemon to stop on host $HOSTNAME." + fi + ;; + + (stop-all) + if [ -f "$pid" ]; then + mv "$pid" "${pid}.tmp" + + while read to_stop; do + if kill -0 $to_stop > /dev/null 2>&1; then + echo "Stopping $DAEMON daemon (pid: $to_stop) on host $HOSTNAME." + guaranteed_kill $to_stop $DAEMON + else + echo "Skipping $DAEMON daemon (pid: $to_stop), because it is not running anymore on $HOSTNAME." + fi + done < "${pid}.tmp" + rm "${pid}.tmp" + fi + ;; + + (*) + echo "Unexpected argument '$STARTSTOP'. $USAGE." + exit 1 + ;; + +esac diff --git a/TWA-PIC/flink/bin/historyserver.sh b/TWA-PIC/flink/bin/historyserver.sh new file mode 100644 index 0000000..3bc3049 --- /dev/null +++ b/TWA-PIC/flink/bin/historyserver.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Start/stop a Flink HistoryServer +USAGE="Usage: historyserver.sh (start|start-foreground|stop)" + +STARTSTOP=$1 + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then + export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_HS}" + args=("--configDir" "${FLINK_CONF_DIR}") +fi + +if [[ $STARTSTOP == "start-foreground" ]]; then + exec "${FLINK_BIN_DIR}"/flink-console.sh historyserver "${args[@]}" +else + "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP historyserver "${args[@]}" +fi diff --git a/TWA-PIC/flink/bin/jobmanager.sh b/TWA-PIC/flink/bin/jobmanager.sh new file mode 100644 index 0000000..35fbe2c --- /dev/null +++ b/TWA-PIC/flink/bin/jobmanager.sh @@ -0,0 +1,64 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Start/stop a Flink JobManager. +USAGE="Usage: jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all" + +STARTSTOP=$1 +HOST=$2 # optional when starting multiple instances +WEBUIPORT=$3 # optional when starting multiple instances + +if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then + echo $USAGE + exit 1 +fi + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +ENTRYPOINT=standalonesession + +if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then + # Add JobManager-specific JVM options + export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}" + parseJmArgsAndExportLogs "${ARGS[@]}" + + args=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "cluster") + if [ ! -z $HOST ]; then + args+=("--host") + args+=("${HOST}") + fi + + if [ ! -z $WEBUIPORT ]; then + args+=("--webui-port") + args+=("${WEBUIPORT}") + fi + + if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then + args+=(${DYNAMIC_PARAMETERS[@]}) + fi +fi + +if [[ $STARTSTOP == "start-foreground" ]]; then + exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${args[@]}" +else + "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${args[@]}" +fi diff --git a/TWA-PIC/flink/bin/kubernetes-jobmanager.sh b/TWA-PIC/flink/bin/kubernetes-jobmanager.sh new file mode 100644 index 0000000..0513123 --- /dev/null +++ b/TWA-PIC/flink/bin/kubernetes-jobmanager.sh @@ -0,0 +1,42 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Start a Flink JobManager for native Kubernetes. +# NOTE: This script is not meant to be started manually. It will be used by native Kubernetes integration. + +USAGE="Usage: kubernetes-jobmanager.sh kubernetes-session|kubernetes-application [args]" + +ENTRY_POINT_NAME=$1 + +ARGS=("${@:2}") + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +# Add JobManager specific JVM options +export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}" +parseJmArgsAndExportLogs "${ARGS[@]}" + +if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then + ARGS=(${DYNAMIC_PARAMETERS[@]} "${ARGS[@]}") +fi + +exec "${FLINK_BIN_DIR}"/flink-console.sh ${ENTRY_POINT_NAME} "${ARGS[@]}" diff --git a/TWA-PIC/flink/bin/kubernetes-session.sh b/TWA-PIC/flink/bin/kubernetes-session.sh new file mode 100644 index 0000000..559a776 --- /dev/null +++ b/TWA-PIC/flink/bin/kubernetes-session.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +# get Flink config +. "$bin"/config.sh + +if [ "$FLINK_IDENT_STRING" = "" ]; then + FLINK_IDENT_STRING="$USER" +fi + +JVM_ARGS="$JVM_ARGS -Xmx512m" + +CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS` + +log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-k8s-session-$HOSTNAME.log +log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-session.properties -Dlog4j.configurationFile=file:"$FLINK_CONF_DIR"/log4j-session.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback-session.xml" + +export FLINK_CONF_DIR + +"$JAVA_RUN" $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.kubernetes.cli.KubernetesSessionCli "$@" diff --git a/TWA-PIC/flink/bin/kubernetes-taskmanager.sh b/TWA-PIC/flink/bin/kubernetes-taskmanager.sh new file mode 100644 index 0000000..b11fb89 --- /dev/null +++ b/TWA-PIC/flink/bin/kubernetes-taskmanager.sh @@ -0,0 +1,45 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Start a Flink TaskManager for native Kubernetes. +# NOTE: This script is not meant to be started manually. It will be used by native Kubernetes integration. + +USAGE="Usage: kubernetes-taskmanager.sh [args]" + +ENTRYPOINT=kubernetes-taskmanager + +ARGS=("${@:1}") + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +# if no other JVM options are set, set the GC to G1 +if [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then + export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC" +fi + +# Add TaskManager specific JVM options +export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}" +export JVM_ARGS="$JVM_ARGS $FLINK_TM_JVM_MEM_OPTS" + +ARGS=("--configDir" "${FLINK_CONF_DIR}" "${ARGS[@]}") + +exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${ARGS[@]}" diff --git a/TWA-PIC/flink/bin/mesos-appmaster-job.sh b/TWA-PIC/flink/bin/mesos-appmaster-job.sh new file mode 100644 index 0000000..5ae7396 --- /dev/null +++ b/TWA-PIC/flink/bin/mesos-appmaster-job.sh @@ -0,0 +1,23 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +bin=$(dirname "$0") +bin=$(cd "${bin}" || exit; pwd) + +exec "${bin}"/mesos-jobmanager.sh "org.apache.flink.mesos.entrypoint.MesosJobClusterEntrypoint" "$@" diff --git a/TWA-PIC/flink/bin/mesos-appmaster.sh b/TWA-PIC/flink/bin/mesos-appmaster.sh new file mode 100644 index 0000000..2939e31 --- /dev/null +++ b/TWA-PIC/flink/bin/mesos-appmaster.sh @@ -0,0 +1,23 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +bin=$(dirname "$0") +bin=$(cd "${bin}" || exit; pwd) + +exec "${bin}"/mesos-jobmanager.sh "org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint" "$@" diff --git a/TWA-PIC/flink/bin/mesos-jobmanager.sh b/TWA-PIC/flink/bin/mesos-jobmanager.sh new file mode 100644 index 0000000..b786e18 --- /dev/null +++ b/TWA-PIC/flink/bin/mesos-jobmanager.sh @@ -0,0 +1,52 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +ENTRY_POINT=$1 +ARGS=("${@:2}") + +bin=$(dirname "$0") +bin=$(cd "${bin}" || exit; pwd) + +# get Flink config +. "${bin}"/config.sh + +parseJmArgsAndExportLogs "${ARGS[@]}" + +if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then + ARGS=(${DYNAMIC_PARAMETERS[@]} "${ARGS[@]}") +fi + +if [ "$FLINK_IDENT_STRING" = "" ]; then + FLINK_IDENT_STRING="$USER" +fi + +CC_CLASSPATH=$(manglePathList "$(constructFlinkClassPath):${INTERNAL_HADOOP_CLASSPATHS}") + +log="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-mesos-appmaster-${HOSTNAME}.log" +log_setting="-Dlog.file=${log} -Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j.properties -Dlog4j.configurationFile=file:${FLINK_CONF_DIR}/log4j.properties -Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback.xml" + +"${JAVA_RUN}" ${JVM_ARGS} -classpath ${CC_CLASSPATH} ${log_setting} ${ENTRY_POINT} "${ARGS[@]}" + +rc=$? + +if [[ ${rc} -ne 0 ]]; then + echo "Error while starting the mesos application master. Please check ${log} for more details." +fi + +exit ${rc} diff --git a/TWA-PIC/flink/bin/mesos-taskmanager.sh b/TWA-PIC/flink/bin/mesos-taskmanager.sh new file mode 100644 index 0000000..6c65c2a --- /dev/null +++ b/TWA-PIC/flink/bin/mesos-taskmanager.sh @@ -0,0 +1,43 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +# get Flink config +. "$bin"/config.sh + +CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS` + +log=flink-taskmanager.log +log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlog4j.configurationFile=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml" + +# Add precomputed memory JVM options +if [ -z "${FLINK_ENV_JAVA_OPTS_MEM}" ]; then + FLINK_ENV_JAVA_OPTS_MEM="" +fi +export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_MEM}" + +# Add TaskManager-specific JVM options +export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}" + +ENTRY_POINT=org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner + +exec "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} -classpath "$CC_CLASSPATH" $log_setting ${ENTRY_POINT} "$@" + diff --git a/TWA-PIC/flink/bin/pyflink-shell.sh b/TWA-PIC/flink/bin/pyflink-shell.sh new file mode 100644 index 0000000..a616abb --- /dev/null +++ b/TWA-PIC/flink/bin/pyflink-shell.sh @@ -0,0 +1,84 @@ +#!/bin/bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` +. "$bin"/find-flink-home.sh + +_FLINK_HOME_DETERMINED=1 + +. "$FLINK_HOME"/bin/config.sh + +FLINK_CLASSPATH=`constructFlinkClassPath` +PYTHON_JAR_PATH=`echo "$FLINK_OPT_DIR"/flink-python*.jar` + + +PYFLINK_PYTHON="${PYFLINK_PYTHON:-"python"}" + +# So that python can find out Flink's Jars +export FLINK_BIN_DIR=$FLINK_BIN_DIR +export FLINK_HOME + +# Add pyflink & py4j & cloudpickle to PYTHONPATH +export PYTHONPATH="$FLINK_OPT_DIR/python/pyflink.zip:$PYTHONPATH" +PY4J_ZIP=`echo "$FLINK_OPT_DIR"/python/py4j-*-src.zip` +CLOUDPICKLE_ZIP=`echo "$FLINK_OPT_DIR"/python/cloudpickle-*-src.zip` +export PYTHONPATH="$PY4J_ZIP:$CLOUDPICKLE_ZIP:$PYTHONPATH" + +PARSER="org.apache.flink.client.python.PythonShellParser" +function parse_options() { + "${JAVA_RUN}" ${JVM_ARGS} -cp ${FLINK_CLASSPATH}:${PYTHON_JAR_PATH} ${PARSER} "$@" + printf "%d\0" $? +} + +# Turn off posix mode since it does not allow process substitution +set +o posix +# If the command has option --help | -h, the script will directly +# run the PythonShellParser program to stdout the help message. +if [[ "$@" =~ '--help' ]] || [[ "$@" =~ '-h' ]]; then + "${JAVA_RUN}" ${JVM_ARGS} -cp ${FLINK_CLASSPATH}:${PYTHON_JAR_PATH} ${PARSER} "$@" + exit 0 +fi +OPTIONS=() +while IFS= read -d '' -r ARG; do + OPTIONS+=("$ARG") +done < <(parse_options "$@") + +COUNT=${#OPTIONS[@]} +LAST=$((COUNT - 1)) +LAUNCHER_EXIT_CODE=${OPTIONS[$LAST]} + +# Certain JVM failures result in errors being printed to stdout (instead of stderr), which causes +# the code that parses the output of the launcher to get confused. In those cases, check if the +# exit code is an integer, and if it's not, handle it as a special error case. +if ! [[ ${LAUNCHER_EXIT_CODE} =~ ^[0-9]+$ ]]; then + echo "${OPTIONS[@]}" | head -n-1 1>&2 + exit 1 +fi + +if [[ ${LAUNCHER_EXIT_CODE} != 0 ]]; then + exit ${LAUNCHER_EXIT_CODE} +fi + +OPTIONS=("${OPTIONS[@]:0:$LAST}") + +export SUBMIT_ARGS=${OPTIONS[@]} + +# -i: interactive +# -m: execute shell.py in the zip package +${PYFLINK_PYTHON} -i -m pyflink.shell diff --git a/TWA-PIC/flink/bin/set_flink_yarn_env.sh b/TWA-PIC/flink/bin/set_flink_yarn_env.sh new file mode 100644 index 0000000..70314f9 --- /dev/null +++ b/TWA-PIC/flink/bin/set_flink_yarn_env.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +echo -e "\n#flink\nexport FLINK_HOME=/home/tsg/olap/flink-1.13.1\nexport PATH=\$FLINK_HOME/bin:\$PATH" >> /etc/profile.d/flink.sh +chmod +x /etc/profile.d/flink.sh +source /etc/profile + + diff --git a/TWA-PIC/flink/bin/sql-client.sh b/TWA-PIC/flink/bin/sql-client.sh new file mode 100644 index 0000000..759f0c6 --- /dev/null +++ b/TWA-PIC/flink/bin/sql-client.sh @@ -0,0 +1,88 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +################################################################################ +# Adopted from "flink" bash script +################################################################################ + +target="$0" +# For the case, the executable has been directly symlinked, figure out +# the correct bin path by following its symlink up to an upper bound. +# Note: we can't use the readlink utility here if we want to be POSIX +# compatible. +iteration=0 +while [ -L "$target" ]; do + if [ "$iteration" -gt 100 ]; then + echo "Cannot resolve path: You have a cyclic symlink in $target." + break + fi + ls=`ls -ld -- "$target"` + target=`expr "$ls" : '.* -> \(.*\)$'` + iteration=$((iteration + 1)) +done + +# Convert relative path to absolute path +bin=`dirname "$target"` + +# get flink config +. "$bin"/config.sh + +if [ "$FLINK_IDENT_STRING" = "" ]; then + FLINK_IDENT_STRING="$USER" +fi + +CC_CLASSPATH=`constructFlinkClassPath` + +################################################################################ +# SQL client specific logic +################################################################################ + +log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-sql-client-$HOSTNAME.log +log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml) + +# get path of jar in /opt if it exist +FLINK_SQL_CLIENT_JAR=$(find "$FLINK_OPT_DIR" -regex ".*flink-sql-client.*.jar") + +# add flink-python jar to the classpath +if [[ ! "$CC_CLASSPATH" =~ .*flink-python.*.jar ]]; then + FLINK_PYTHON_JAR=$(find "$FLINK_OPT_DIR" -regex ".*flink-python.*.jar") + if [ -n "$FLINK_PYTHON_JAR" ]; then + CC_CLASSPATH="$CC_CLASSPATH:$FLINK_PYTHON_JAR" + fi +fi + +# check if SQL client is already in classpath and must not be shipped manually +if [[ "$CC_CLASSPATH" =~ .*flink-sql-client.*.jar ]]; then + + # start client without jar + exec "$JAVA_RUN" $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.table.client.SqlClient "$@" + +# check if SQL client jar is in /opt +elif [ -n "$FLINK_SQL_CLIENT_JAR" ]; then + + # start client with jar + exec "$JAVA_RUN" $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS:$FLINK_SQL_CLIENT_JAR"`" org.apache.flink.table.client.SqlClient "$@" --jar "`manglePath $FLINK_SQL_CLIENT_JAR`" + +# write error message to stderr +else + (>&2 echo "[ERROR] Flink SQL Client JAR file 'flink-sql-client*.jar' neither found in classpath nor /opt directory should be located in $FLINK_OPT_DIR.") + + # exit to force process failure + exit 1 +fi diff --git a/TWA-PIC/flink/bin/standalone-job.sh b/TWA-PIC/flink/bin/standalone-job.sh new file mode 100644 index 0000000..b4cfa20 --- /dev/null +++ b/TWA-PIC/flink/bin/standalone-job.sh @@ -0,0 +1,55 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Start/stop a Flink JobManager. +USAGE="Usage: standalone-job.sh ((start|start-foreground))|stop [args]" + +STARTSTOP=$1 +ENTRY_POINT_NAME="standalonejob" + +if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]]; then + echo $USAGE + exit 1 +fi + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +# Startup parameters +ARGS=("${@:2}") + +if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then + # Add cluster entry point specific JVM options + export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}" + parseJmArgsAndExportLogs "${ARGS[@]}" + + if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then + ARGS=(${DYNAMIC_PARAMETERS[@]} "${ARGS[@]}") + fi +fi + +ARGS=("--configDir" "${FLINK_CONF_DIR}" "${ARGS[@]}") + +if [[ $STARTSTOP == "start-foreground" ]]; then + exec "${FLINK_BIN_DIR}"/flink-console.sh ${ENTRY_POINT_NAME} "${ARGS[@]}" +else + "${FLINK_BIN_DIR}"/flink-daemon.sh ${STARTSTOP} ${ENTRY_POINT_NAME} "${ARGS[@]}" +fi diff --git a/TWA-PIC/flink/bin/start-cluster.sh b/TWA-PIC/flink/bin/start-cluster.sh new file mode 100644 index 0000000..720b33c --- /dev/null +++ b/TWA-PIC/flink/bin/start-cluster.sh @@ -0,0 +1,53 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +# Start the JobManager instance(s) +shopt -s nocasematch +if [[ $HIGH_AVAILABILITY == "zookeeper" ]]; then + # HA Mode + readMasters + + echo "Starting HA cluster with ${#MASTERS[@]} masters." + + for ((i=0;i<${#MASTERS[@]};++i)); do + master=${MASTERS[i]} + webuiport=${WEBUIPORTS[i]} + + if [ ${MASTERS_ALL_LOCALHOST} = true ] ; then + "${FLINK_BIN_DIR}"/jobmanager.sh start "${master}" "${webuiport}" + else + ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/jobmanager.sh\" start ${master} ${webuiport} &" + fi + done + +else + echo "Starting cluster." + + # Start single JobManager on this machine + "$FLINK_BIN_DIR"/jobmanager.sh start +fi +shopt -u nocasematch + +# Start TaskManager instance(s) +TMWorkers start diff --git a/TWA-PIC/flink/bin/start-zookeeper-quorum.sh b/TWA-PIC/flink/bin/start-zookeeper-quorum.sh new file mode 100644 index 0000000..d5a7593 --- /dev/null +++ b/TWA-PIC/flink/bin/start-zookeeper-quorum.sh @@ -0,0 +1,46 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +# Starts a ZooKeeper quorum as configured in $FLINK_CONF/zoo.cfg + +ZK_CONF="$FLINK_CONF_DIR/zoo.cfg" +if [ ! -f "$ZK_CONF" ]; then + echo "[ERROR] No ZooKeeper configuration file found in '$ZK_CONF'." + exit 1 +fi + +# Extract server.X from ZooKeeper config and start instances +while read server ; do + server=$(echo -e "${server}" | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//') # trim + + # match server.id=address[:port[:port]] + if [[ $server =~ ^server\.([0-9]+)[[:space:]]*\=[[:space:]]*([^: \#]+) ]]; then + id=${BASH_REMATCH[1]} + address=${BASH_REMATCH[2]} + + ssh -n $FLINK_SSH_OPTS $address -- "nohup /bin/bash -l $FLINK_BIN_DIR/zookeeper.sh start $id &" + else + echo "[WARN] Parse error. Skipping config entry '$server'." + fi +done < <(grep "^server\." "$ZK_CONF") diff --git a/TWA-PIC/flink/bin/stop-cluster.sh b/TWA-PIC/flink/bin/stop-cluster.sh new file mode 100644 index 0000000..d29b4f3 --- /dev/null +++ b/TWA-PIC/flink/bin/stop-cluster.sh @@ -0,0 +1,47 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +# Stop TaskManager instance(s) +TMWorkers stop + +# Stop JobManager instance(s) +shopt -s nocasematch +if [[ $HIGH_AVAILABILITY == "zookeeper" ]]; then + # HA Mode + readMasters + + if [ ${MASTERS_ALL_LOCALHOST} = true ] ; then + for master in ${MASTERS[@]}; do + "$FLINK_BIN_DIR"/jobmanager.sh stop + done + else + for master in ${MASTERS[@]}; do + ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/jobmanager.sh\" stop &" + done + fi + +else + "$FLINK_BIN_DIR"/jobmanager.sh stop +fi +shopt -u nocasematch diff --git a/TWA-PIC/flink/bin/stop-zookeeper-quorum.sh b/TWA-PIC/flink/bin/stop-zookeeper-quorum.sh new file mode 100644 index 0000000..ad79de8 --- /dev/null +++ b/TWA-PIC/flink/bin/stop-zookeeper-quorum.sh @@ -0,0 +1,46 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +# Stops a ZooKeeper quorum as configured in $FLINK_CONF/zoo.cfg + +ZK_CONF="$FLINK_CONF_DIR/zoo.cfg" +if [ ! -f "$ZK_CONF" ]; then + echo "[ERROR] No ZooKeeper configuration file found in '$ZK_CONF'." + exit 1 +fi + +# Extract server.X from ZooKeeper config and stop instances +while read server ; do + server=$(echo -e "${server}" | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//') # trim + + # match server.id=address[:port[:port]] + if [[ $server =~ ^server\.([0-9]+)[[:space:]]*\=[[:space:]]*([^: \#]+) ]]; then + id=${BASH_REMATCH[1]} + server=${BASH_REMATCH[2]} + + ssh -n $FLINK_SSH_OPTS $server -- "nohup /bin/bash -l $FLINK_BIN_DIR/zookeeper.sh stop &" + else + echo "[WARN] Parse error. Skipping config entry '$server'." + fi +done < <(grep "^server\." "$ZK_CONF") diff --git a/TWA-PIC/flink/bin/taskmanager.sh b/TWA-PIC/flink/bin/taskmanager.sh new file mode 100644 index 0000000..fdc6514 --- /dev/null +++ b/TWA-PIC/flink/bin/taskmanager.sh @@ -0,0 +1,80 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Start/stop a Flink TaskManager. +USAGE="Usage: taskmanager.sh (start|start-foreground|stop|stop-all)" + +STARTSTOP=$1 + +ARGS=("${@:2}") + +if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then + echo $USAGE + exit 1 +fi + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +ENTRYPOINT=taskexecutor + +if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then + + # if no other JVM options are set, set the GC to G1 + if [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then + export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC" + fi + + # Add TaskManager-specific JVM options + export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}" + + # Startup parameters + + parseTmArgsAndExportLogs "${ARGS[@]}" + + if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then + ARGS=(${DYNAMIC_PARAMETERS[@]} "${ARGS[@]}") + fi + + ARGS=("--configDir" "${FLINK_CONF_DIR}" "${ARGS[@]}") +fi + +if [[ $STARTSTOP == "start-foreground" ]]; then + exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${ARGS[@]}" +else + if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then + # Start a single TaskManager + "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}" + else + # Example output from `numactl --show` on an AWS c4.8xlarge: + # policy: default + # preferred node: current + # physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 + # cpubind: 0 1 + # nodebind: 0 1 + # membind: 0 1 + read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ") + for NODE_ID in "${NODE_LIST[@]:1}"; do + # Start a TaskManager for each NUMA node + numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}" + done + fi +fi diff --git a/TWA-PIC/flink/bin/yarn-session.sh b/TWA-PIC/flink/bin/yarn-session.sh new file mode 100644 index 0000000..f36ca34 --- /dev/null +++ b/TWA-PIC/flink/bin/yarn-session.sh @@ -0,0 +1,38 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +# get Flink config +. "$bin"/config.sh + +if [ "$FLINK_IDENT_STRING" = "" ]; then + FLINK_IDENT_STRING="$USER" +fi + +JVM_ARGS="$JVM_ARGS -Xmx512m" + +CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS` + +log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-yarn-session-$HOSTNAME.log +log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-session.properties -Dlog4j.configurationFile=file:"$FLINK_CONF_DIR"/log4j-session.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback-session.xml" + +"$JAVA_RUN" $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.yarn.cli.FlinkYarnSessionCli -j "$FLINK_LIB_DIR"/flink-dist*.jar "$@" + diff --git a/TWA-PIC/flink/bin/zookeeper.sh b/TWA-PIC/flink/bin/zookeeper.sh new file mode 100644 index 0000000..9297401 --- /dev/null +++ b/TWA-PIC/flink/bin/zookeeper.sh @@ -0,0 +1,68 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Start/stop a ZooKeeper quorum peer. +USAGE="Usage: zookeeper.sh ((start|start-foreground) peer-id)|stop|stop-all" + +STARTSTOP=$1 +PEER_ID=$2 + +if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then + echo $USAGE + exit 1 +fi + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +ZK_CONF="$FLINK_CONF_DIR/zoo.cfg" +if [ ! -f "$ZK_CONF" ]; then + echo "[ERROR] No ZooKeeper configuration file found in '$ZK_CONF'." + exit 1 +fi + +if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then + if [ -z $PEER_ID ]; then + echo "[ERROR] Missing peer id argument. $USAGE." + exit 1 + fi + + if [[ ! ${ZK_HEAP} =~ ${IS_NUMBER} ]]; then + echo "[ERROR] Configured ZooKeeper JVM heap size is not a number. Please set '$KEY_ZK_HEAP_MB' in $FLINK_CONF_FILE." + exit 1 + fi + + if [ "$ZK_HEAP" -gt 0 ]; then + export JVM_ARGS="$JVM_ARGS -Xms"$ZK_HEAP"m -Xmx"$ZK_HEAP"m" + fi + + # Startup parameters + args=("--zkConfigFile" "${ZK_CONF}" "--peerId" "${PEER_ID}") +fi + +# the JMX log4j integration in ZK 3.4 does not work log4j 2 +export JVM_ARGS="$JVM_ARGS -Dzookeeper.jmx.log4j.disable=true" + +if [[ $STARTSTOP == "start-foreground" ]]; then + "${FLINK_BIN_DIR}"/flink-console.sh zookeeper "${args[@]}" +else + "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP zookeeper "${args[@]}" +fi diff --git a/TWA-PIC/flink/conf/core-site.xml b/TWA-PIC/flink/conf/core-site.xml new file mode 100644 index 0000000..62c06ae --- /dev/null +++ b/TWA-PIC/flink/conf/core-site.xml @@ -0,0 +1,58 @@ +<?xml version="1.0" encoding="UTF-8"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + +<!-- Put site-specific property overrides in this file. --> + +<configuration> + <property> + <name>fs.defaultFS</name> + <value>hdfs://ns1</value> + </property> + <property> + <name>hadoop.tmp.dir</name> + <value>file:/home/tsg/olap/hadoop/tmp</value> + </property> + <property> + <name>io.file.buffer.size</name> + <value>131702</value> + </property> + <property> + <name>hadoop.proxyuser.root.hosts</name> + <value>*</value> + </property> + <property> + <name>hadoop.proxyuser.root.groups</name> + <value>*</value> + </property> + <property> + <name>hadoop.logfile.size</name> + <value>10000000</value> + <description>The max size of each log file</description> + </property> + <property> + <name>hadoop.logfile.count</name> + <value>1</value> + <description>The max number of log files</description> + </property> + <property> + <name>ha.zookeeper.quorum</name> + <value>192.168.30.193:2181,192.168.30.194:2181,192.168.30.195:2181</value> + </property> + <property> + <name>ipc.client.connect.timeout</name> + <value>90000</value> + </property> +</configuration> diff --git a/TWA-PIC/flink/conf/flink-conf.yaml b/TWA-PIC/flink/conf/flink-conf.yaml new file mode 100644 index 0000000..645422f --- /dev/null +++ b/TWA-PIC/flink/conf/flink-conf.yaml @@ -0,0 +1,207 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + + +#============================================================================== +# Common +#============================================================================== + +# The external address of the host on which the JobManager runs and can be +# reached by the TaskManagers and any clients which want to connect. This setting +# is only used in Standalone mode and may be overwritten on the JobManager side +# by specifying the --host <hostname> parameter of the bin/jobmanager.sh executable. +# In high availability mode, if you use the bin/start-cluster.sh script and setup +# the conf/masters file, this will be taken care of automatically. Yarn/Mesos +# automatically configure the host name based on the hostname of the node where the +# JobManager runs. + +jobmanager.rpc.address: 192.168.30.193 + +#JVM 相关配置 +#env.java.opts: "-XX:+UseG1GC -XX:NewRatio=2 -XX:MaxGCPauseMillis=300 -XX:InitiatingHeapOccupancyPercent=35 -Xloggc:/home/tsg/olap/flink-1.13.1/log/gc.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=20 -XX:GCLogFileSize=20M" + +#jobmanager rpc 端口 +jobmanager.rpc.port: 6123 + +#允许任务在所有taskmanager上均匀分布 +cluster.evenly-spread-out-slots: true + +#避免报出metaspace oom而是flink jvm进程挂掉 +classloader.fail-on-metaspace-oom-error: false + +#规避第三方库堆栈泄漏问题 +classloader.check-leaked-classloader: false + +#避免由于task不能正常取消而使taskmanager服务挂掉 +task.cancellation.timeout: 0 + +#JobManager进程占用的所有与Flink相关的内存 +jobmanager.memory.process.size: 1024M + +#TaskManager进程占用的所有与Flink相关的内存 +taskmanager.memory.process.size: 1024M + +#taskmanager使用的堆外内存的大小 +taskmanager.memory.managed.size: 10M + +#taskmanager.memory.off-heap默认为false,主要指的是Flink Managed Memory使用Heap还是Non-heap, +#默认使用Heap,如果开启使用Non-heap将再减少一部分资源 +taskmanager.memory.off-heap: false + +#堆外部分(Framework Off-Heap),以直接内存形式分配 +taskmanager.memory.framework.off-heap.size: 128M + +#taskmanager元数据大小 默认256M +taskmanager.memory.jvm-metaspace.size: 384M + +#每个排序合并阻塞结果分区所需的最小网络缓冲区数,默认64。对于生产使用,建议将该配置值增加到2048,以提高数据压缩比并减少较小的网络数据包。增加该参数值,需要增加总网络内存大小。 +taskmanager.network.sort-shuffle.min-buffers: 64 + +#用于读取shuffle数据的内存大小(目前只用于排序合并shuffle)。该内存参数占用framework.off-heap.size内存,默认32M,当更改该参数时,需要增加framework.off-heap.size内存大小。 +taskmanager.memory.framework.off-heap.batch-shuffle.size: 8M + +#每个通道可以使用的最大缓冲区数,默认为10。该参数可以通过防止在数据倾斜和配置的浮动缓冲区数量高的情况下缓冲的动态数据的过度增长来加速检查点对齐。 +taskmanager.network.memory.max-buffers-per-channel: 10 + +# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline. +taskmanager.numberOfTaskSlots: 1 + +# The parallelism used for programs that did not specify and other parallelism. +parallelism.default: 1 + +# The default file system scheme and authority. +# +# By default file paths without scheme are interpreted relative to the local +# root file system 'file:///'. Use this to override the default and interpret +# relative paths relative to a different file system, +# for example 'hdfs://mynamenode:12345' +# +# fs.default-scheme + +#============================================================================== +# NetWork +#============================================================================== + +#网络缓冲区数目,默认为8。帮助缓解由于子分区之间的数据分布不均匀造成的背压。 +taskmanager.network.memory.floating-buffers-per-gate: 8 + +#输入/输出通道使用的独占网络缓冲区的数量。至少配置2。 +taskmanager.network.memory.buffers-per-channel: 2 + +#用于TaskManager之间(shuffle、广播等)及与外部组件的数据传输 +#Min +taskmanager.memory.network.min: 64M +#Max +taskmanager.memory.network.max: 128M + +#============================================================================== +# High Availability +#============================================================================== + +# The high-availability mode. Possible options are 'NONE' or 'zookeeper'. +# +# high-availability: zookeeper + +# The path where metadata for master recovery is persisted. While ZooKeeper stores +# the small ground truth for checkpoint and leader election, this location stores +# the larger objects, like persisted dataflow graphs. +# +# Must be a durable file system that is accessible from all nodes +# (like HDFS, S3, Ceph, nfs, ...) +# +# high-availability.storageDir: hdfs:///flink/ha/ + +# The list of ZooKeeper quorum peers that coordinate the high-availability +# setup. This must be a list of the form: +# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181) + +#high-availability: zookeeper +#high-availability.zookeeper.quorum: 192.168.30.193:2181,192.168.30.194:2181,192.168.30.195:2181 +#high-availability.zookeeper.path.root: /flink +#high-availability.zookeeper.client.connection-timeout: 150000 +#high-availability.zookeeper.client.max-retry-attempts: 10 +#high-availability.zookeeper.client.retry-wait: 10000 +#high-availability.zookeeper.client.session-timeout: 240000 + +#读取本地Hadoop配置文件 +#fs.hdfs.hadoopconf: /home/tsg/olap/flink-1.13.1/conf/ +#high-availability.cluster-id: /flink_cluster +#important: customize per cluster +#high-availability.storageDir: hdfs:///flink/recover + +heartbeat.timeout: 180000 +heartbeat.interval: 20000 +akka.ask.timeout: 300 s + +# ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes +# It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE) +# The default value is "open" and it can be changed to "creator" if ZK security is enabled +# +# high-availability.zookeeper.client.acl: open + +# The failover strategy, i.e., how the job computation recovers from task failures. +# Only restart tasks that may have been affected by the task failure, which typically includes +# downstream tasks and potentially upstream tasks if their produced data is no longer available for consumption. +jobmanager.execution.failover-strategy: region + +#rest.port: 8080 + +restart-strategy: fixed-delay + +#重启策略 +#21.12 version value is 9999 +#22.01 version value change to INT_MAX +restart-strategy.fixed-delay.attempts: 2147483647 + +yarn.application-attempts: 10000 + +restart-strategy.fixed-delay.delay: 5 s + +jobmanager.web.upload.dir: /home/tsg/olap/flink-1.13.1/flink-web + +#============================================================================== +# Advanced +#============================================================================== + +# Override the directories for temporary files. If not specified, the +# system-specific Java temporary directory (java.io.tmpdir property) is taken. +# +# For framework setups on Yarn or Mesos, Flink will automatically pick up the +# containers' temp directories without any need for configuration. +# +# Add a delimited list for multiple directories, using the system directory +# delimiter (colon ':' on unix) or a comma, e.g.: +# /data1/tmp:/data2/tmp:/data3/tmp +# +# Note: Each directory entry is read from and written to by a different I/O +# thread. You can include the same directory multiple times in order to create +# multiple I/O threads against that directory. This is for example relevant for +# high-throughput RAIDs. +# +# io.tmp.dirs: /tmp + +# The classloading resolve order. Possible values are 'child-first' (Flink's default) +# and 'parent-first' (Java's default). +# +# Child first classloading allows users to use different dependency/library +# versions in their application than those in the classpath. Switching back +# to 'parent-first' may help with debugging dependency issues. +# +# classloader.resolve-order: child-first +classloader.resolve-order: parent-first + diff --git a/TWA-PIC/flink/conf/hdfs-site.xml b/TWA-PIC/flink/conf/hdfs-site.xml new file mode 100644 index 0000000..cf332eb --- /dev/null +++ b/TWA-PIC/flink/conf/hdfs-site.xml @@ -0,0 +1,142 @@ +<?xml version="1.0" encoding="UTF-8"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + +<!-- Put site-specific property overrides in this file. --> + +<configuration> + <property> + <name>dfs.namenode.name.dir</name> + <value>file:/home/tsg/olap/hadoop/dfs/name</value> + </property> + <property> + <name>dfs.datanode.data.dir</name> + <value>file:/home/tsg/olap/hadoop/dfs/data</value> + </property> + <property> + <name>dfs.replication</name> + <value>2</value> + </property> + <property> + <name>dfs.webhdfs.enabled</name> + <value>true</value> + </property> + <property> + <name>dfs.permissions</name> + <value>false</value> + </property> + <property> + <name>dfs.permissions.enabled</name> + <value>false</value> + </property> + <property> + <name>dfs.nameservices</name> + <value>ns1</value> + </property> + <property> + <name>dfs.blocksize</name> + <value>134217728</value> + </property> + <property> + <name>dfs.ha.namenodes.ns1</name> + <value>nn1,nn2</value> + </property> + <!-- nn1的RPC通信地址,nn1所在地址 --> + <property> + <name>dfs.namenode.rpc-address.ns1.nn1</name> + <value>192.168.30.193:9000</value> + </property> + <!-- nn1的http通信地址,外部访问地址 --> + <property> + <name>dfs.namenode.http-address.ns1.nn1</name> + <value>192.168.30.193:50070</value> + </property> + <!-- nn2的RPC通信地址,nn2所在地址 --> + <property> + <name>dfs.namenode.rpc-address.ns1.nn2</name> + <value>192.168.30.194:9000</value> + </property> + <!-- nn2的http通信地址,外部访问地址 --> + <property> + <name>dfs.namenode.http-address.ns1.nn2</name> + <value>192.168.30.194:50070</value> + </property> + <!-- 指定NameNode的元数据在JournalNode日志上的存放位置(一般和zookeeper部署在一起) --> + <property> + <name>dfs.namenode.shared.edits.dir</name> + <value>qjournal://192.168.30.193:8485;192.168.30.194:8485;192.168.30.195:8485/ns1</value> + </property> + <!-- 指定JournalNode在本地磁盘存放数据的位置 --> + <property> + <name>dfs.journalnode.edits.dir</name> + <value>/home/tsg/olap/hadoop/journal</value> + </property> + <!--客户端通过代理访问namenode,访问文件系统,HDFS 客户端与Active 节点通信的Java 类,使用其确定Active 节点是否活跃 --> + <property> + <name>dfs.client.failover.proxy.provider.ns1</name> + <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value> + </property> + <!--这是配置自动切换的方法,有多种使用方法,具体可以看官网,在文末会给地址,这里是远程登录杀死的方法 --> + <property> + <name>dfs.ha.fencing.methods</name> + <value>sshfence</value> + <value>shell(true)</value> + </property> + <!-- 这个是使用sshfence隔离机制时才需要配置ssh免登陆 --> + <property> + <name>dfs.ha.fencing.ssh.private-key-files</name> + <value>/root/.ssh/id_rsa</value> + </property> + <!-- 配置sshfence隔离机制超时时间,这个属性同上,如果你是用脚本的方法切换,这个应该是可以不配置的 --> + <property> + <name>dfs.ha.fencing.ssh.connect-timeout</name> + <value>30000</value> + </property> + <!-- 这个是开启自动故障转移,如果你没有自动故障转移,这个可以先不配 --> + <property> + <name>dfs.ha.automatic-failover.enabled</name> + <value>true</value> + </property> + <property> + <name>dfs.datanode.max.transfer.threads</name> + <value>8192</value> + </property> + <!-- namenode处理RPC请求线程数,增大该值资源占用不大 --> + <property> + <name>dfs.namenode.handler.count</name> + <value>30</value> + </property> + <!-- datanode处理RPC请求线程数,增大该值会占用更多内存 --> + <property> + <name>dfs.datanode.handler.count</name> + <value>40</value> + </property> + <!-- balance时可占用的带宽 --> + <property> + <name>dfs.balance.bandwidthPerSec</name> + <value>104857600</value> + </property> + <!-- 磁盘预留空间,该空间不会被hdfs占用,单位字节--> + <property> + <name>dfs.datanode.du.reserved</name> + <value>53687091200</value> + </property> + <!-- datanode与namenode连接超时时间,单位毫秒 2 * heartbeat.recheck.interval + 30000 --> + <property> + <name>heartbeat.recheck.interval</name> + <value>100000</value> + </property> +</configuration> + diff --git a/TWA-PIC/flink/conf/log4j-cli.properties b/TWA-PIC/flink/conf/log4j-cli.properties new file mode 100644 index 0000000..e7add42 --- /dev/null +++ b/TWA-PIC/flink/conf/log4j-cli.properties @@ -0,0 +1,67 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Allows this configuration to be modified at runtime. The file will be checked every 30 seconds. +monitorInterval=30 + +rootLogger.level = INFO +rootLogger.appenderRef.file.ref = FileAppender + +# Log all infos in the given file +appender.file.name = FileAppender +appender.file.type = FILE +appender.file.append = false +appender.file.fileName = ${sys:log.file} +appender.file.layout.type = PatternLayout +appender.file.layout.pattern = [%d{yyyy-MM-dd HH:mm:ssZ}{UTC}] %-5p %-60c %x - %m%n + +# Log output from org.apache.flink.yarn to the console. This is used by the +# CliFrontend class when using a per-job YARN cluster. +logger.yarn.name = org.apache.flink.yarn +logger.yarn.level = INFO +logger.yarn.appenderRef.console.ref = ConsoleAppender +logger.yarncli.name = org.apache.flink.yarn.cli.FlinkYarnSessionCli +logger.yarncli.level = INFO +logger.yarncli.appenderRef.console.ref = ConsoleAppender +logger.hadoop.name = org.apache.hadoop +logger.hadoop.level = INFO +logger.hadoop.appenderRef.console.ref = ConsoleAppender + +# Make sure hive logs go to the file. +logger.hive.name = org.apache.hadoop.hive +logger.hive.level = INFO +logger.hive.additivity = false +logger.hive.appenderRef.file.ref = FileAppender + +# Log output from org.apache.flink.kubernetes to the console. +logger.kubernetes.name = org.apache.flink.kubernetes +logger.kubernetes.level = INFO +logger.kubernetes.appenderRef.console.ref = ConsoleAppender + +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = [%d{yyyy-MM-dd HH:mm:ssZ}{UTC}] %-5p %-60c %x - %m%n + +# suppress the warning that hadoop native libraries are not loaded (irrelevant for the client) +logger.hadoopnative.name = org.apache.hadoop.util.NativeCodeLoader +logger.hadoopnative.level = OFF + +# Suppress the irrelevant (wrong) warnings from the Netty channel handler +logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline +logger.netty.level = OFF diff --git a/TWA-PIC/flink/conf/log4j-console.properties b/TWA-PIC/flink/conf/log4j-console.properties new file mode 100644 index 0000000..499839e --- /dev/null +++ b/TWA-PIC/flink/conf/log4j-console.properties @@ -0,0 +1,66 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Allows this configuration to be modified at runtime. The file will be checked every 30 seconds. +monitorInterval=30 + +# This affects logging for both user code and Flink +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender +rootLogger.appenderRef.rolling.ref = RollingFileAppender + +# Uncomment this if you want to _only_ change Flink's logging +#logger.flink.name = org.apache.flink +#logger.flink.level = INFO + +# The following lines keep the log level of common libraries/connectors on +# log level INFO. The root logger does not override this. You have to manually +# change the log levels here. +logger.akka.name = akka +logger.akka.level = INFO +logger.kafka.name= org.apache.kafka +logger.kafka.level = INFO +logger.hadoop.name = org.apache.hadoop +logger.hadoop.level = INFO +logger.zookeeper.name = org.apache.zookeeper +logger.zookeeper.level = INFO + +# Log all infos to the console +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = [%d{yyyy-MM-dd HH:mm:ssZ}{UTC}] %-5p %-60c %x - %m%n + +# Log all infos in the given rolling file +appender.rolling.name = RollingFileAppender +appender.rolling.type = RollingFile +appender.rolling.append = true +appender.rolling.fileName = ${sys:log.file} +appender.rolling.filePattern = ${sys:log.file}.%i +appender.rolling.layout.type = PatternLayout +appender.rolling.layout.pattern = [%d{yyyy-MM-dd HH:mm:ssZ}{UTC}] %-5p %-60c %x - %m%n +appender.rolling.policies.type = Policies +appender.rolling.policies.size.type = SizeBasedTriggeringPolicy +appender.rolling.policies.size.size=100MB +appender.rolling.policies.startup.type = OnStartupTriggeringPolicy +appender.rolling.strategy.type = DefaultRolloverStrategy +appender.rolling.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10} + +# Suppress the irrelevant (wrong) warnings from the Netty channel handler +logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline +logger.netty.level = OFF diff --git a/TWA-PIC/flink/conf/log4j-session.properties b/TWA-PIC/flink/conf/log4j-session.properties new file mode 100644 index 0000000..9044140 --- /dev/null +++ b/TWA-PIC/flink/conf/log4j-session.properties @@ -0,0 +1,40 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Allows this configuration to be modified at runtime. The file will be checked every 30 seconds. +monitorInterval=30 + +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender + +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = [%d{yyyy-MM-dd HH:mm:ssZ}{UTC}] %-5p %-60c %x - %m%n + +# Suppress the irrelevant (wrong) warnings from the Netty channel handler +logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline +logger.netty.level = OFF +logger.zookeeper.name = org.apache.zookeeper +logger.zookeeper.level = WARN +logger.curator.name = org.apache.flink.shaded.org.apache.curator.framework +logger.curator.level = WARN +logger.runtimeutils.name= org.apache.flink.runtime.util.ZooKeeperUtils +logger.runtimeutils.level = WARN +logger.runtimeleader.name = org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver +logger.runtimeleader.level = WARN diff --git a/TWA-PIC/flink/conf/log4j.properties b/TWA-PIC/flink/conf/log4j.properties new file mode 100644 index 0000000..64293a9 --- /dev/null +++ b/TWA-PIC/flink/conf/log4j.properties @@ -0,0 +1,59 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Allows this configuration to be modified at runtime. The file will be checked every 30 seconds. +monitorInterval=30 + +# This affects logging for both user code and Flink +rootLogger.level = ERROR +rootLogger.appenderRef.file.ref = MainAppender + +# Uncomment this if you want to _only_ change Flink's logging +#logger.flink.name = org.apache.flink +#logger.flink.level = INFO + +# The following lines keep the log level of common libraries/connectors on +# log level INFO. The root logger does not override this. You have to manually +# change the log levels here. +logger.akka.name = akka +logger.akka.level = INFO +logger.kafka.name= org.apache.kafka +logger.kafka.level = INFO +logger.hadoop.name = org.apache.hadoop +logger.hadoop.level = INFO +logger.zookeeper.name = org.apache.zookeeper +logger.zookeeper.level = INFO + +# Log all infos in the given file +appender.main.name = MainAppender +appender.main.type = RollingFile +appender.main.append = true +appender.main.fileName = ${sys:log.file} +appender.main.filePattern = ${sys:log.file}.%i +appender.main.layout.type = PatternLayout +appender.main.layout.pattern = [%d{yyyy-MM-dd HH:mm:ssZ}{UTC}] %-5p %-60c %x - %m%n +appender.main.policies.type = Policies +appender.main.policies.size.type = SizeBasedTriggeringPolicy +appender.main.policies.size.size = 100MB +appender.main.policies.startup.type = OnStartupTriggeringPolicy +appender.main.strategy.type = DefaultRolloverStrategy +appender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10} + +# Suppress the irrelevant (wrong) warnings from the Netty channel handler +logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline +logger.netty.level = OFF diff --git a/TWA-PIC/flink/conf/log4j2.component.properties b/TWA-PIC/flink/conf/log4j2.component.properties new file mode 100644 index 0000000..2d5d906 --- /dev/null +++ b/TWA-PIC/flink/conf/log4j2.component.properties @@ -0,0 +1,2 @@ +#此文件放在flink安装目录配置文件conf/下 +log4j2.formatMsgNoLookups=true diff --git a/TWA-PIC/flink/conf/logback-console.xml b/TWA-PIC/flink/conf/logback-console.xml new file mode 100644 index 0000000..62963f3 --- /dev/null +++ b/TWA-PIC/flink/conf/logback-console.xml @@ -0,0 +1,64 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="console" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>[%d{yyyy-MM-dd HH:mm:ssZ,UTC}] [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> + </encoder> + </appender> + + <appender name="rolling" class="ch.qos.logback.core.rolling.RollingFileAppender"> + <file>${log.file}</file> + <append>false</append> + + <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy"> + <fileNamePattern>${log.file}.%i</fileNamePattern> + <minIndex>1</minIndex> + <maxIndex>10</maxIndex> + </rollingPolicy> + + <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + <maxFileSize>100MB</maxFileSize> + </triggeringPolicy> + + <encoder> + <pattern>[%d{yyyy-MM-dd HH:mm:ssZ,UTC}] [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> + </encoder> + </appender> + + <!-- This affects logging for both user code and Flink --> + <root level="INFO"> + <appender-ref ref="console"/> + <appender-ref ref="rolling"/> + </root> + + <!-- Uncomment this if you want to only change Flink's logging --> + <!--<logger name="org.apache.flink" level="INFO"/>--> + + <!-- The following lines keep the log level of common libraries/connectors on + log level INFO. The root logger does not override this. You have to manually + change the log levels here. --> + <logger name="akka" level="INFO"/> + <logger name="org.apache.kafka" level="INFO"/> + <logger name="org.apache.hadoop" level="INFO"/> + <logger name="org.apache.zookeeper" level="INFO"/> + + <!-- Suppress the irrelevant (wrong) warnings from the Netty channel handler --> + <logger name="org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline" level="ERROR"/> +</configuration> diff --git a/TWA-PIC/flink/conf/logback-session.xml b/TWA-PIC/flink/conf/logback-session.xml new file mode 100644 index 0000000..7c07147 --- /dev/null +++ b/TWA-PIC/flink/conf/logback-session.xml @@ -0,0 +1,39 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="file" class="ch.qos.logback.core.FileAppender"> + <file>${log.file}</file> + <append>false</append> + <encoder> + <pattern>[%d{yyyy-MM-dd HH:mm:ssZ,UTC}] [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> + </encoder> + </appender> + + <appender name="console" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>[%d{yyyy-MM-dd HH:mm:ssZ,UTC}] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> + </encoder> + </appender> + + <logger name="ch.qos.logback" level="WARN" /> + <root level="INFO"> + <appender-ref ref="file"/> + <appender-ref ref="console"/> + </root> +</configuration> diff --git a/TWA-PIC/flink/conf/logback.xml b/TWA-PIC/flink/conf/logback.xml new file mode 100644 index 0000000..e1c0d7c --- /dev/null +++ b/TWA-PIC/flink/conf/logback.xml @@ -0,0 +1,58 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="file" class="ch.qos.logback.core.FileAppender"> + <file>${log.file}</file> + <append>false</append> + <encoder> + <pattern>[%d{yyyy-MM-dd HH:mm:ssZ,UTC}] [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> + </encoder> + </appender> + + <!-- This affects logging for both user code and Flink --> + <root level="INFO"> + <appender-ref ref="file"/> + </root> + + <!-- Uncomment this if you want to only change Flink's logging --> + <!--<logger name="org.apache.flink" level="INFO">--> + <!--<appender-ref ref="file"/>--> + <!--</logger>--> + + <!-- The following lines keep the log level of common libraries/connectors on + log level INFO. The root logger does not override this. You have to manually + change the log levels here. --> + <logger name="akka" level="INFO"> + <appender-ref ref="file"/> + </logger> + <logger name="org.apache.kafka" level="INFO"> + <appender-ref ref="file"/> + </logger> + <logger name="org.apache.hadoop" level="INFO"> + <appender-ref ref="file"/> + </logger> + <logger name="org.apache.zookeeper" level="INFO"> + <appender-ref ref="file"/> + </logger> + + <!-- Suppress the irrelevant (wrong) warnings from the Netty channel handler --> + <logger name="org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline" level="ERROR"> + <appender-ref ref="file"/> + </logger> +</configuration> diff --git a/TWA-PIC/flink/conf/masters b/TWA-PIC/flink/conf/masters new file mode 100644 index 0000000..0664d83 --- /dev/null +++ b/TWA-PIC/flink/conf/masters @@ -0,0 +1,2 @@ +192.168.30.193:8080 +192.168.30.194:8080 diff --git a/TWA-PIC/flink/conf/workers b/TWA-PIC/flink/conf/workers new file mode 100644 index 0000000..a1c43aa --- /dev/null +++ b/TWA-PIC/flink/conf/workers @@ -0,0 +1 @@ +192.168.30.195 diff --git a/TWA-PIC/flink/conf/yarn-site.xml b/TWA-PIC/flink/conf/yarn-site.xml new file mode 100644 index 0000000..fd262c9 --- /dev/null +++ b/TWA-PIC/flink/conf/yarn-site.xml @@ -0,0 +1,224 @@ +<?xml version="1.0"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> +<configuration> + + <property> + <name>yarn.nodemanager.aux-services</name> + <value>mapreduce_shuffle</value> + </property> + + <property> + <name>yarn.resourcemanager.ha.enabled</name> + <value>true</value> + </property> + + <!--声明两台resourcemanager的地址--> + <property> + <name>yarn.resourcemanager.cluster-id</name> + <value>rsmcluster</value> + </property> + + <property> + <name>yarn.resourcemanager.ha.rm-ids</name> + <value>rsm1,rsm2</value> + </property> + + <!-- 配置rm1--> + <!-- 配置rm1 hostname--> + <property> + <name>yarn.resourcemanager.hostname.rsm1</name> + <value>192.168.30.193</value> + </property> + + <!-- 配置rm1 web application--> + <property> + <name>yarn.resourcemanager.webapp.address.rsm1</name> + <value>192.168.30.193:8080</value> + </property> + + <!-- 配置rm1 调度端口,默认8030--> + <property> + <name>yarn.resourcemanager.scheduler.address.rsm1</name> + <value>192.168.30.193:8030</value> + </property> + + <!-- 默认端口8031--> + <property> + <name>yarn.resourcemanager.resource-tracker.address.rsm1</name> + <value>192.168.30.193:8031</value> + </property> + + <!-- 配置rm1 应用程序管理器接口的地址端口,默认8032--> + <property> + <name>yarn.resourcemanager.address.rsm1</name> + <value>192.168.30.193:8032</value> + </property> + + <!-- 配置rm1 管理端口,默认8033--> + <property> + <name>yarn.resourcemanager.admin.address.rsm1</name> + <value>192.168.30.193:8033</value> + </property> + + <property> + <name>yarn.resourcemanager.ha.admin.address.rsm1</name> + <value>192.168.30.193:23142</value> + </property> + + <!-- 配置rm2--> + <property> + <name>yarn.resourcemanager.hostname.rsm2</name> + <value>192.168.30.194</value> + </property> + + <property> + <name>yarn.resourcemanager.webapp.address.rsm2</name> + <value>192.168.30.194:8080</value> + </property> + + <property> + <name>yarn.resourcemanager.scheduler.address.rsm2</name> + <value>192.168.30.194:8030</value> + </property> + + <property> + <name>yarn.resourcemanager.resource-tracker.address.rsm2</name> + <value>192.168.30.194:8031</value> + </property> + + <property> + <name>yarn.resourcemanager.address.rsm2</name> + <value>192.168.30.194:8032</value> + </property> + + <property> + <name>yarn.resourcemanager.admin.address.rsm2</name> + <value>192.168.30.194:8033</value> + </property> + + <property> + <name>yarn.resourcemanager.ha.admin.address.rsm2</name> + <value>192.168.30.194:23142</value> + </property> + + <!--指定zookeeper集群的地址--> + <property> + <name>yarn.resourcemanager.zk-address</name> + <value>192.168.30.193:2181,192.168.30.194:2181,192.168.30.195:2181</value> + </property> + + <!--启用自动恢复,当任务进行一半,rm坏掉,就要启动自动恢复,默认是false--> + <property> + <name>yarn.resourcemanager.recovery.enabled</name> + <value>true</value> + </property> + + <!--启用Nodemanager自动恢复,默认是false--> + <property> + <name>yarn.nodemanager.recovery.enabled</name> + <value>true</value> + </property> + + <!--配置NodeManager保存运行状态的本地文件系统目录路径 --> + <property> + <name>yarn.nodemanager.recovery.dir</name> + <value>/home/tsg/olap/hadoop-2.7.1/yarn</value> + </property> + + <property> + <name>yarn.resourcemanager.store.class</name> + <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value> + </property> + + <!--配置nm可用的RPC地址,默认${yarn.nodemanager.hostname}:0,为临时端口。集群重启后,nm与rm连接的端口会变化,这里指定端口,保障nm restart功能 --> + <property> + <name>yarn.nodemanager.address</name> + <value>${yarn.nodemanager.hostname}:9923</value> + </property> + + <property> + <name>yarn.log-aggregation-enable</name> + <value>true</value> + </property> + + <property> + <name>yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds</name> + <value>3600</value> + </property> + + <property> + <name>yarn.nodemanager.remote-app-log-dir</name> + <value>/home/tsg/olap/hadoop-2.7.1/logs/app-logs/</value> + </property> + + <!--NM可以为容器分配的物理内存量,以MB为单位 ,默认8192--> + <property> + <name>yarn.nodemanager.resource.memory-mb</name> + <value>61440</value> + </property> + + <!-- RM上每个容器请求的最小分配,以mb为单位,默认1024--> + <property> + <name>yarn.scheduler.minimum-allocation-mb</name> + <value>1024</value> + </property> + + <!-- RM上每个容器请求的最大分配,以mb为单位,一般设置为 yarn.nodemanager.resource.memory-mb 一致,默认8192--> + <property> + <name>yarn.scheduler.maximum-allocation-mb</name> + <value>61440</value> + </property> + + <!--可为容器分配的vcore数。RM调度器在为容器分配资源时使用它。这不是用来限制YARN容器使用的物理内核的数量,默认8,一般配置为服务器cpu总核数一致 --> + <property> + <name>yarn.nodemanager.resource.cpu-vcores</name> + <value>48</value> + </property> + + <!--RM上每个容器请求的最小分配(以虚拟CPU内核为单位) ,默认1--> + <property> + <name>yarn.scheduler.minimum-allocation-vcores</name> + <value>1</value> + </property> + + <!--RM上每个容器请求的最大分配(以虚拟CPU内核为单位) ,默认32,一般配置为略小于yarn.nodemanager.resource.cpu-vcores,同时指定任务的slot不应超过该值--> + <property> + <name>yarn.scheduler.maximum-allocation-vcores</name> + <value>48</value> + </property> + + <property> + <name>yarn.nodemanager.vmem-check-enabled</name> + <value>false</value> + </property> + + <property> + <name>yarn.nodemanager.pmem-check-enabled</name> + <value>false</value> + </property> + + <!--ApplicationMaster重启次数,配置HA后默认为2,生产环境可增大该值--> + <property> + <name>yarn.resourcemanager.am.max-attempts</name> + <value>10000</value> + </property> + + <property> + <name>yarn.log.server.url</name> + <value>http://192.168.30.193:19888/jobhistory/logs</value> + </property> + +</configuration> + diff --git a/TWA-PIC/flink/conf/zoo.cfg b/TWA-PIC/flink/conf/zoo.cfg new file mode 100644 index 0000000..f598997 --- /dev/null +++ b/TWA-PIC/flink/conf/zoo.cfg @@ -0,0 +1,36 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# The number of milliseconds of each tick +tickTime=2000 + +# The number of ticks that the initial synchronization phase can take +initLimit=10 + +# The number of ticks that can pass between sending a request and getting an acknowledgement +syncLimit=5 + +# The directory where the snapshot is stored. +# dataDir=/tmp/zookeeper + +# The port at which the clients will connect +clientPort=2181 + +# ZooKeeper quorum peers +server.1=localhost:2888:3888 +# server.2=host:peer-port:leader-port diff --git a/TWA-PIC/flink/topology/completion/config/ETL-ACTIVE-DEFENCE-EVENT-COMPLETED b/TWA-PIC/flink/topology/completion/config/ETL-ACTIVE-DEFENCE-EVENT-COMPLETED new file mode 100644 index 0000000..c5c221a --- /dev/null +++ b/TWA-PIC/flink/topology/completion/config/ETL-ACTIVE-DEFENCE-EVENT-COMPLETED @@ -0,0 +1,78 @@ +#--------------------------------地址配置------------------------------# +#管理kafka地址 +source.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#管理输出kafka地址 +sink.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#用于分配log_id、连接hbase的zookeeper地址 +zookeeper.servers=192.168.30.193:2181,192.168.30.194:2181,192.168.30.195:2181 + +#hdfs地址用于获取定位库 +hdfs.servers=192.168.30.193:9000,192.168.30.194:9000 + +#--------------------------------HTTP/定位库------------------------------# +#定位库存储文件系统类型,hdfs or local +knowledgebase.file.storage.type=hdfs + +#定位库地址,根据file.system.type配置填写对应地址路径。 +knowledgebase.file.storage.path=/knowledgebase/ETL-ACTIVE-DEFENCE-EVENT-COMPLETED/ + +#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高 +knowledgebase.type.list=ip_location,asn + +#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name +knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6 + +#工具库地址,存放秘钥文件等。 +tools.library=/home/tsg/olap/topology/data/ + +#--------------------------------nacos配置------------------------------# +#nacos 地址 +nacos.server=192.168.20.252:8848 + +#schema namespace名称 +nacos.schema.namespace=MSH + +#schema data id名称 +nacos.schema.data.id=active_defence_event.json + +#knowledgebase namespace名称 +nacos.knowledgebase.namespace= + +#knowledgebase data id名称 +nacos.knowledgebase.data.id=knowledge_base.json + +#--------------------------------Kafka消费/生产配置------------------------------# +#kafka 接收数据topic +source.kafka.topic=ACTIVE-DEFENCE-EVENT + +#补全数据 输出 topic +sink.kafka.topic=ACTIVE-DEFENCE-EVENT-COMPLETED + +#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +group.id=active-defence-log-20220408-1 + +#--------------------------------topology配置------------------------------# +#consumer 并行度 +source.parallelism=3 + +#转换函数并行度 +transform.parallelism=3 + +#kafka producer 并行度 +sink.parallelism=3 + +#数据中心,取值范围(0-31) +data.center.id.num=1 + +#hbase 更新时间,如填写0则不更新缓存 +hbase.tick.tuple.freq.secs=180 + +#--------------------------------默认值配置------------------------------# + +#0不需要补全原样输出日志,1需要补全 +log.need.complete=1 + +#生产者压缩模式 none or snappy +producer.kafka.compression.type=snappy diff --git a/TWA-PIC/flink/topology/completion/config/ETL-BGP-RECORD-COMPLETED b/TWA-PIC/flink/topology/completion/config/ETL-BGP-RECORD-COMPLETED new file mode 100644 index 0000000..fefad20 --- /dev/null +++ b/TWA-PIC/flink/topology/completion/config/ETL-BGP-RECORD-COMPLETED @@ -0,0 +1,78 @@ +#--------------------------------地址配置------------------------------# +#管理kafka地址 +source.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#管理输出kafka地址 +sink.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#用于分配log_id、连接hbase的zookeeper地址 +zookeeper.servers=192.168.30.193:2181,192.168.30.194:2181,192.168.30.195:2181 + +#hdfs地址用于获取定位库 +hdfs.servers=192.168.30.193:9000,192.168.30.194:9000 + +#--------------------------------HTTP/定位库------------------------------# +#定位库存储文件系统类型,hdfs or local +knowledgebase.file.storage.type=hdfs + +#定位库地址,根据file.system.type配置填写对应地址路径。 +knowledgebase.file.storage.path=/knowledgebase/ETL-BGP-RECORD-COMPLETED/ + +#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高 +knowledgebase.type.list=ip_location,asn + +#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name +knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6 + +#工具库地址,存放秘钥文件等。 +tools.library=/home/tsg/olap/topology/data/ + +#--------------------------------nacos配置------------------------------# +#nacos 地址 +nacos.server=192.168.20.252:8848 + +#schema namespace名称 +nacos.schema.namespace=MSH + +#schema data id名称 +nacos.schema.data.id=bgp_record.json + +#knowledgebase namespace名称 +nacos.knowledgebase.namespace= + +#knowledgebase data id名称 +nacos.knowledgebase.data.id=knowledge_base.json + +#--------------------------------Kafka消费/生产配置------------------------------# +#kafka 接收数据topic +source.kafka.topic=BGP-RECORD + +#补全数据 输出 topic +sink.kafka.topic=BGP-RECORD-COMPLETED + +#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +group.id=bgp-record-20220801-1 + +#--------------------------------topology配置------------------------------# +#consumer 并行度 +source.parallelism=3 + +#转换函数并行度 +transform.parallelism=3 + +#kafka producer 并行度 +sink.parallelism=3 + +#数据中心,取值范围(0-31) +data.center.id.num=2 + +#hbase 更新时间,如填写0则不更新缓存 +hbase.tick.tuple.freq.secs=180 + +#--------------------------------默认值配置------------------------------# + +#0不需要补全原样输出日志,1需要补全 +log.need.complete=1 + +#生产者压缩模式 none or snappy +producer.kafka.compression.type=snappy diff --git a/TWA-PIC/flink/topology/completion/config/ETL-GTPC-RECORD-COMPLETED b/TWA-PIC/flink/topology/completion/config/ETL-GTPC-RECORD-COMPLETED new file mode 100644 index 0000000..e18cdec --- /dev/null +++ b/TWA-PIC/flink/topology/completion/config/ETL-GTPC-RECORD-COMPLETED @@ -0,0 +1,78 @@ +#--------------------------------地址配置------------------------------# +#管理kafka地址 +source.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#管理输出kafka地址 +sink.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#用于分配log_id、连接hbase的zookeeper地址 +zookeeper.servers=192.168.30.193:2181,192.168.30.194:2181,192.168.30.195:2181 + +#hdfs地址用于获取定位库 +hdfs.servers=192.168.30.193:9000,192.168.30.194:9000 + +#--------------------------------HTTP/定位库------------------------------# +#定位库存储文件系统类型,hdfs or local +knowledgebase.file.storage.type=hdfs + +#定位库地址,根据file.system.type配置填写对应地址路径。 +knowledgebase.file.storage.path=/knowledgebase/ETL-GTPC-RECORD-COMPLETED/ + +#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高 +knowledgebase.type.list=ip_location,asn + +#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name +knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6 + +#工具库地址,存放秘钥文件等。 +tools.library=/home/tsg/olap/topology/data/ + +#--------------------------------nacos配置------------------------------# +#nacos 地址 +nacos.server=192.168.20.252:8848 + +#schema namespace名称 +nacos.schema.namespace=MSH + +#schema data id名称 +nacos.schema.data.id=gtpc_record.json + +#knowledgebase namespace名称 +nacos.knowledgebase.namespace= + +#knowledgebase data id名称 +nacos.knowledgebase.data.id=knowledge_base.json + +#--------------------------------Kafka消费/生产配置------------------------------# +#kafka 接收数据topic +source.kafka.topic=GTPC-RECORD + +#补全数据 输出 topic +sink.kafka.topic=GTPC-RECORD-COMPLETED + +#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +group.id=gtpc-record-log-20220408-1 + +#--------------------------------topology配置------------------------------# +#consumer 并行度 +source.parallelism=3 + +#转换函数并行度 +transform.parallelism=3 + +#kafka producer 并行度 +sink.parallelism=3 + +#数据中心,取值范围(0-31) +data.center.id.num=3 + +#hbase 更新时间,如填写0则不更新缓存 +hbase.tick.tuple.freq.secs=180 + +#--------------------------------默认值配置------------------------------# + +#0不需要补全原样输出日志,1需要补全 +log.need.complete=1 + +#生产者压缩模式 none or snappy +producer.kafka.compression.type=snappy diff --git a/TWA-PIC/flink/topology/completion/config/ETL-INTERIM-SESSION-RECORD-COMPLETED b/TWA-PIC/flink/topology/completion/config/ETL-INTERIM-SESSION-RECORD-COMPLETED new file mode 100644 index 0000000..cbe3f5d --- /dev/null +++ b/TWA-PIC/flink/topology/completion/config/ETL-INTERIM-SESSION-RECORD-COMPLETED @@ -0,0 +1,78 @@ +#--------------------------------地址配置------------------------------# +#管理kafka地址 +source.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#管理输出kafka地址 +sink.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#用于分配log_id、连接hbase的zookeeper地址 +zookeeper.servers=192.168.30.193:2181,192.168.30.194:2181,192.168.30.195:2181 + +#hdfs地址用于获取定位库 +hdfs.servers=192.168.30.193:9000,192.168.30.194:9000 + +#--------------------------------HTTP/定位库------------------------------# +#定位库存储文件系统类型,hdfs or local +knowledgebase.file.storage.type=hdfs + +#定位库地址,根据file.system.type配置填写对应地址路径。 +knowledgebase.file.storage.path=/knowledgebase/ETL-INTERIM-SESSION-RECORD-COMPLETED/ + +#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高 +knowledgebase.type.list=ip_location,asn + +#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name +knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6 + +#工具库地址,存放秘钥文件等。 +tools.library=/home/tsg/olap/topology/data/ + +#--------------------------------nacos配置------------------------------# +#nacos 地址 +nacos.server=192.168.20.252:8848 + +#schema namespace名称 +nacos.schema.namespace=MSH + +#schema data id名称 +nacos.schema.data.id=interim_session_record.json + +#knowledgebase namespace名称 +nacos.knowledgebase.namespace= + +#knowledgebase data id名称 +nacos.knowledgebase.data.id=knowledge_base.json + +#--------------------------------Kafka消费/生产配置------------------------------# +#kafka 接收数据topic +source.kafka.topic=INTERIM-SESSION-RECORD + +#补全数据 输出 topic +sink.kafka.topic=INTERIM-SESSION-RECORD-COMPLETED + +#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +group.id=linterim-session-record-20220408-1 + +#--------------------------------topology配置------------------------------# +#consumer 并行度 +source.parallelism=3 + +#转换函数并行度 +transform.parallelism=3 + +#kafka producer 并行度 +sink.parallelism=3 + +#数据中心,取值范围(0-31) +data.center.id.num=4 + +#hbase 更新时间,如填写0则不更新缓存 +hbase.tick.tuple.freq.secs=180 + +#--------------------------------默认值配置------------------------------# + +#0不需要补全原样输出日志,1需要补全 +log.need.complete=1 + +#生产者压缩模式 none or snappy +producer.kafka.compression.type=snappy diff --git a/TWA-PIC/flink/topology/completion/config/ETL-PROXY-EVENT-COMPLETED b/TWA-PIC/flink/topology/completion/config/ETL-PROXY-EVENT-COMPLETED new file mode 100644 index 0000000..7638c0d --- /dev/null +++ b/TWA-PIC/flink/topology/completion/config/ETL-PROXY-EVENT-COMPLETED @@ -0,0 +1,78 @@ +#--------------------------------地址配置------------------------------# +#管理kafka地址 +source.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#管理输出kafka地址 +sink.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#用于分配log_id、连接hbase的zookeeper地址 +zookeeper.servers=192.168.30.193:2181,192.168.30.194:2181,192.168.30.195:2181 + +#hdfs地址用于获取定位库 +hdfs.servers=192.168.30.193:9000,192.168.30.194:9000 + +#--------------------------------HTTP/定位库------------------------------# +#定位库存储文件系统类型,hdfs or local +knowledgebase.file.storage.type=hdfs + +#定位库地址,根据file.system.type配置填写对应地址路径。 +knowledgebase.file.storage.path=/knowledgebase/ETL-PROXY-EVENT-COMPLETED/ + +#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高 +knowledgebase.type.list=ip_location,asn + +#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name +knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6 + +#工具库地址,存放秘钥文件等。 +tools.library=/home/tsg/olap/topology/data/ + +#--------------------------------nacos配置------------------------------# +#nacos 地址 +nacos.server=192.168.20.252:8848 + +#schema namespace名称 +nacos.schema.namespace=MSH + +#schema data id名称 +nacos.schema.data.id=proxy_event.json + +#knowledgebase namespace名称 +nacos.knowledgebase.namespace= + +#knowledgebase data id名称 +nacos.knowledgebase.data.id=knowledge_base.json + +#--------------------------------Kafka消费/生产配置------------------------------# +#kafka 接收数据topic +source.kafka.topic=PROXY-EVENT + +#补全数据 输出 topic +sink.kafka.topic=PROXY-EVENT-COMPLETED + +#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +group.id=proxy-event-20220408-1 + +#--------------------------------topology配置------------------------------# +#consumer 并行度 +source.parallelism=24 + +#转换函数并行度 +transform.parallelism=24 + +#kafka producer 并行度 +sink.parallelism=24 + +#数据中心,取值范围(0-31) +data.center.id.num=5 + +#hbase 更新时间,如填写0则不更新缓存 +hbase.tick.tuple.freq.secs=180 + +#--------------------------------默认值配置------------------------------# + +#0不需要补全原样输出日志,1需要补全 +log.need.complete=1 + +#生产者压缩模式 none or snappy +producer.kafka.compression.type=snappy diff --git a/TWA-PIC/flink/topology/completion/config/ETL-RADIUS-RECORD-COMPLETED b/TWA-PIC/flink/topology/completion/config/ETL-RADIUS-RECORD-COMPLETED new file mode 100644 index 0000000..cdddc51 --- /dev/null +++ b/TWA-PIC/flink/topology/completion/config/ETL-RADIUS-RECORD-COMPLETED @@ -0,0 +1,78 @@ +#--------------------------------地址配置------------------------------# +#管理kafka地址 +source.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#管理输出kafka地址 +sink.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#用于分配log_id、连接hbase的zookeeper地址 +zookeeper.servers=192.168.30.193:2181,192.168.30.194:2181,192.168.30.195:2181 + +#hdfs地址用于获取定位库 +hdfs.servers=192.168.30.193:9000,192.168.30.194:9000 + +#--------------------------------HTTP/定位库------------------------------# +#定位库存储文件系统类型,hdfs or local +knowledgebase.file.storage.type=hdfs + +#定位库地址,根据file.system.type配置填写对应地址路径。 +knowledgebase.file.storage.path=/knowledgebase/ETL-RADIUS-RECORD-COMPLETED/ + +#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高 +knowledgebase.type.list=ip_location,asn + +#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name +knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6 + +#工具库地址,存放秘钥文件等。 +tools.library=/home/tsg/olap/topology/data/ + +#--------------------------------nacos配置------------------------------# +#nacos 地址 +nacos.server=192.168.20.252:8848 + +#schema namespace名称 +nacos.schema.namespace=MSH + +#schema data id名称 +nacos.schema.data.id=radius_record.json + +#knowledgebase namespace名称 +nacos.knowledgebase.namespace= + +#knowledgebase data id名称 +nacos.knowledgebase.data.id=knowledge_base.json + +#--------------------------------Kafka消费/生产配置------------------------------# +#kafka 接收数据topic +source.kafka.topic=RADIUS-RECORD + +#补全数据 输出 topic +sink.kafka.topic=RADIUS-RECORD-COMPLETED + +#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +group.id=radius-record-log-20220408-1 + +#--------------------------------topology配置------------------------------# +#consumer 并行度 +source.parallelism=3 + +#转换函数并行度 +transform.parallelism=3 + +#kafka producer 并行度 +sink.parallelism=3 + +#数据中心,取值范围(0-31) +data.center.id.num=6 + +#hbase 更新时间,如填写0则不更新缓存 +hbase.tick.tuple.freq.secs=180 + +#--------------------------------默认值配置------------------------------# + +#0不需要补全原样输出日志,1需要补全 +log.need.complete=1 + +#生产者压缩模式 none or snappy +producer.kafka.compression.type=snappy diff --git a/TWA-PIC/flink/topology/completion/config/ETL-SECURITY-EVENT-COMPLETED b/TWA-PIC/flink/topology/completion/config/ETL-SECURITY-EVENT-COMPLETED new file mode 100644 index 0000000..2e32ae6 --- /dev/null +++ b/TWA-PIC/flink/topology/completion/config/ETL-SECURITY-EVENT-COMPLETED @@ -0,0 +1,78 @@ +#--------------------------------地址配置------------------------------# +#管理kafka地址 +source.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#管理输出kafka地址 +sink.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#用于分配log_id、连接hbase的zookeeper地址 +zookeeper.servers=192.168.30.193:2181,192.168.30.194:2181,192.168.30.195:2181 + +#hdfs地址用于获取定位库 +hdfs.servers=192.168.30.193:9000,192.168.30.194:9000 + +#--------------------------------HTTP/定位库------------------------------# +#定位库存储文件系统类型,hdfs or local +knowledgebase.file.storage.type=hdfs + +#定位库地址,根据file.system.type配置填写对应地址路径。 +knowledgebase.file.storage.path=/knowledgebase/ETL-SECURITY-EVENT-COMPLETED/ + +#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高 +knowledgebase.type.list=ip_location,asn + +#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name +knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6 + +#工具库地址,存放秘钥文件等。 +tools.library=/home/tsg/olap/topology/data/ + +#--------------------------------nacos配置------------------------------# +#nacos 地址 +nacos.server=192.168.20.252:8848 + +#schema namespace名称 +nacos.schema.namespace=MSH + +#schema data id名称 +nacos.schema.data.id=security_event.json + +#knowledgebase namespace名称 +nacos.knowledgebase.namespace= + +#knowledgebase data id名称 +nacos.knowledgebase.data.id=knowledge_base.json + +#--------------------------------Kafka消费/生产配置------------------------------# +#kafka 接收数据topic +source.kafka.topic=SECURITY-EVENT + +#补全数据 输出 topic +sink.kafka.topic=SECURITY-EVENT-COMPLETED + +#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +group.id=security-event-log-20230718-A + +#--------------------------------topology配置------------------------------# +#consumer 并行度 +source.parallelism=24 + +#转换函数并行度 +transform.parallelism=24 + +#kafka producer 并行度 +sink.parallelism=24 + +#数据中心,取值范围(0-31) +data.center.id.num=7 + +#hbase 更新时间,如填写0则不更新缓存 +hbase.tick.tuple.freq.secs=180 + +#--------------------------------默认值配置------------------------------# + +#0不需要补全原样输出日志,1需要补全 +log.need.complete=1 + +#生产者压缩模式 none or snappy +producer.kafka.compression.type=snappy diff --git a/TWA-PIC/flink/topology/completion/config/ETL-SESSION-RECORD-COMPLETED b/TWA-PIC/flink/topology/completion/config/ETL-SESSION-RECORD-COMPLETED new file mode 100644 index 0000000..bce4e40 --- /dev/null +++ b/TWA-PIC/flink/topology/completion/config/ETL-SESSION-RECORD-COMPLETED @@ -0,0 +1,78 @@ +#--------------------------------地址配置------------------------------# +#管理kafka地址 +source.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#管理输出kafka地址 +sink.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#用于分配log_id、连接hbase的zookeeper地址 +zookeeper.servers=192.168.30.193:2181,192.168.30.194:2181,192.168.30.195:2181 + +#hdfs地址用于获取定位库 +hdfs.servers=192.168.30.193:9000,192.168.30.194:9000 + +#--------------------------------HTTP/定位库------------------------------# +#定位库存储文件系统类型,hdfs or local +knowledgebase.file.storage.type=hdfs + +#定位库地址,根据file.system.type配置填写对应地址路径。 +knowledgebase.file.storage.path=/knowledgebase/ETL-SESSION-RECORD-COMPLETED/ + +#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高 +knowledgebase.type.list=ip_location,asn + +#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name +knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6 + +#工具库地址,存放秘钥文件等。 +tools.library=/home/tsg/olap/topology/data/ + +#--------------------------------nacos配置------------------------------# +#nacos 地址 +nacos.server=192.168.20.252:8848 + +#schema namespace名称 +nacos.schema.namespace=MSH + +#schema data id名称 +nacos.schema.data.id=session_record.json + +#knowledgebase namespace名称 +nacos.knowledgebase.namespace= + +#knowledgebase data id名称 +nacos.knowledgebase.data.id=knowledge_base.json + +#--------------------------------Kafka消费/生产配置------------------------------# +#kafka 接收数据topic +source.kafka.topic=SESSION-RECORD + +#补全数据 输出 topic +sink.kafka.topic=SESSION-RECORD-COMPLETED + +#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +group.id=session-record-log-20220408-1 + +#--------------------------------topology配置------------------------------# +#consumer 并行度 +source.parallelism=3 + +#转换函数并行度 +transform.parallelism=3 + +#kafka producer 并行度 +sink.parallelism=3 + +#数据中心,取值范围(0-31) +data.center.id.num=8 + +#hbase 更新时间,如填写0则不更新缓存 +hbase.tick.tuple.freq.secs=180 + +#--------------------------------默认值配置------------------------------# + +#0不需要补全原样输出日志,1需要补全 +log.need.complete=1 + +#生产者压缩模式 none or snappy +producer.kafka.compression.type=snappy diff --git a/TWA-PIC/flink/topology/completion/config/ETL-TRANSACTION-RECORD-COMPLETED b/TWA-PIC/flink/topology/completion/config/ETL-TRANSACTION-RECORD-COMPLETED new file mode 100644 index 0000000..8f60344 --- /dev/null +++ b/TWA-PIC/flink/topology/completion/config/ETL-TRANSACTION-RECORD-COMPLETED @@ -0,0 +1,78 @@ +#--------------------------------地址配置------------------------------# +#管理kafka地址 +source.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#管理输出kafka地址 +sink.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#用于分配log_id、连接hbase的zookeeper地址 +zookeeper.servers=192.168.30.193:2181,192.168.30.194:2181,192.168.30.195:2181 + +#hdfs地址用于获取定位库 +hdfs.servers=192.168.30.193:9000,192.168.30.194:9000 + +#--------------------------------HTTP/定位库------------------------------# +#定位库存储文件系统类型,hdfs or local +knowledgebase.file.storage.type=hdfs + +#定位库地址,根据file.system.type配置填写对应地址路径。 +knowledgebase.file.storage.path=/knowledgebase/ETL-TRANSACTION-RECORD-COMPLETED/ + +#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高 +knowledgebase.type.list=ip_location,asn + +#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name +knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6 + +#工具库地址,存放秘钥文件等。 +tools.library=/home/tsg/olap/topology/data/ + +#--------------------------------nacos配置------------------------------# +#nacos 地址 +nacos.server=192.168.20.252:8848 + +#schema namespace名称 +nacos.schema.namespace=MSH + +#schema data id名称 +nacos.schema.data.id=transaction_record.json + +#knowledgebase namespace名称 +nacos.knowledgebase.namespace= + +#knowledgebase data id名称 +nacos.knowledgebase.data.id=knowledge_base.json + +#--------------------------------Kafka消费/生产配置------------------------------# +#kafka 接收数据topic +source.kafka.topic=TRANSACTION-RECORD + +#补全数据 输出 topic +sink.kafka.topic=TRANSACTION-RECORD-COMPLETED + +#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +group.id=transaction-record-20220408-1 + +#--------------------------------topology配置------------------------------# +#consumer 并行度 +source.parallelism=3 + +#转换函数并行度 +transform.parallelism=3 + +#kafka producer 并行度 +sink.parallelism=3 + +#数据中心,取值范围(0-31) +data.center.id.num=10 + +#hbase 更新时间,如填写0则不更新缓存 +hbase.tick.tuple.freq.secs=180 + +#--------------------------------默认值配置------------------------------# + +#0不需要补全原样输出日志,1需要补全 +log.need.complete=1 + +#生产者压缩模式 none or snappy +producer.kafka.compression.type=snappy diff --git a/TWA-PIC/flink/topology/completion/config/MIRROR-DOS-SKETCH-RECORD b/TWA-PIC/flink/topology/completion/config/MIRROR-DOS-SKETCH-RECORD new file mode 100644 index 0000000..eb944d7 --- /dev/null +++ b/TWA-PIC/flink/topology/completion/config/MIRROR-DOS-SKETCH-RECORD @@ -0,0 +1,78 @@ +#--------------------------------地址配置------------------------------# +#管理kafka地址 +source.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#管理输出kafka地址 +sink.kafka.servers=192.168.20.223:9094,192.168.20.224:9094,192.168.20.225:9094 + +#用于分配log_id、连接hbase的zookeeper地址 +zookeeper.servers=192.168.30.193:2181,192.168.30.194:2181,192.168.30.195:2181 + +#hdfs地址用于获取定位库 +hdfs.servers=192.168.30.193:9000,192.168.30.194:9000 + +#--------------------------------HTTP/定位库------------------------------# +#定位库存储文件系统类型,hdfs or local +knowledgebase.file.storage.type=hdfs + +#定位库地址,根据file.system.type配置填写对应地址路径。 +knowledgebase.file.storage.path=/knowledgebase/ETL-ACTIVE-DEFENCE-EVENT-COMPLETED/ + +#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高 +knowledgebase.type.list=ip_location,asn + +#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name +knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6 + +#工具库地址,存放秘钥文件等。 +tools.library=/home/tsg/olap/topology/data/ + +#--------------------------------nacos配置------------------------------# +#nacos 地址 +nacos.server=192.168.20.252:8848 + +#schema namespace名称 +nacos.schema.namespace=MSH + +#schema data id名称 +nacos.schema.data.id=active_defence_event.json + +#knowledgebase namespace名称 +nacos.knowledgebase.namespace= + +#knowledgebase data id名称 +nacos.knowledgebase.data.id=knowledge_base.json + +#--------------------------------Kafka消费/生产配置------------------------------# +#kafka 接收数据topic +source.kafka.topic=DOS-SKETCH-RECORD + +#补全数据 输出 topic +sink.kafka.topic=DOS-SKETCH-RECORD + +#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +group.id=dos-sketch-record-20230629-1 + +#--------------------------------topology配置------------------------------# +#consumer 并行度 +source.parallelism=3 + +#转换函数并行度 +transform.parallelism=3 + +#kafka producer 并行度 +sink.parallelism=3 + +#数据中心,取值范围(0-31) +data.center.id.num=1 + +#hbase 更新时间,如填写0则不更新缓存 +hbase.tick.tuple.freq.secs=180 + +#--------------------------------默认值配置------------------------------# + +#0不需要补全原样输出日志,1需要补全 +log.need.complete=0 + +#生产者压缩模式 none or snappy +producer.kafka.compression.type=snappy diff --git a/TWA-PIC/flink/topology/completion/config/MIRROR-GTPC-RECORD-COMPLETED b/TWA-PIC/flink/topology/completion/config/MIRROR-GTPC-RECORD-COMPLETED new file mode 100644 index 0000000..cf6d12a --- /dev/null +++ b/TWA-PIC/flink/topology/completion/config/MIRROR-GTPC-RECORD-COMPLETED @@ -0,0 +1,78 @@ +#--------------------------------地址配置------------------------------# +#管理kafka地址 +source.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#管理输出kafka地址 +sink.kafka.servers=192.168.20.223:9094,192.168.20.224:9094,192.168.20.225:9094 + +#用于分配log_id、连接hbase的zookeeper地址 +zookeeper.servers=192.168.30.193:2181,192.168.30.194:2181,192.168.30.195:2181 + +#hdfs地址用于获取定位库 +hdfs.servers=192.168.30.193:9000,192.168.30.194:9000 + +#--------------------------------HTTP/定位库------------------------------# +#定位库存储文件系统类型,hdfs or local +knowledgebase.file.storage.type=hdfs + +#定位库地址,根据file.system.type配置填写对应地址路径。 +knowledgebase.file.storage.path=/knowledgebase/ETL-ACTIVE-DEFENCE-EVENT-COMPLETED/ + +#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高 +knowledgebase.type.list=ip_location,asn + +#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name +knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6 + +#工具库地址,存放秘钥文件等。 +tools.library=/home/tsg/olap/topology/data/ + +#--------------------------------nacos配置------------------------------# +#nacos 地址 +nacos.server=192.168.20.252:8848 + +#schema namespace名称 +nacos.schema.namespace=MSH + +#schema data id名称 +nacos.schema.data.id=active_defence_event.json + +#knowledgebase namespace名称 +nacos.knowledgebase.namespace= + +#knowledgebase data id名称 +nacos.knowledgebase.data.id=knowledge_base.json + +#--------------------------------Kafka消费/生产配置------------------------------# +#kafka 接收数据topic +source.kafka.topic=GTPC-RECORD-COMPLETED + +#补全数据 输出 topic +sink.kafka.topic=GTPC-RECORD-COMPLETED + +#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +group.id=gtpc-record-completed-20230629-1 + +#--------------------------------topology配置------------------------------# +#consumer 并行度 +source.parallelism=3 + +#转换函数并行度 +transform.parallelism=3 + +#kafka producer 并行度 +sink.parallelism=3 + +#数据中心,取值范围(0-31) +data.center.id.num=1 + +#hbase 更新时间,如填写0则不更新缓存 +hbase.tick.tuple.freq.secs=180 + +#--------------------------------默认值配置------------------------------# + +#0不需要补全原样输出日志,1需要补全 +log.need.complete=0 + +#生产者压缩模式 none or snappy +producer.kafka.compression.type=snappy diff --git a/TWA-PIC/flink/topology/completion/config/MIRROR-INTERNAL-PACKET-CAPTURE-EVENT b/TWA-PIC/flink/topology/completion/config/MIRROR-INTERNAL-PACKET-CAPTURE-EVENT new file mode 100644 index 0000000..ca9cfcf --- /dev/null +++ b/TWA-PIC/flink/topology/completion/config/MIRROR-INTERNAL-PACKET-CAPTURE-EVENT @@ -0,0 +1,78 @@ +#--------------------------------地址配置------------------------------# +#管理kafka地址 +source.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#管理输出kafka地址 +sink.kafka.servers=192.168.20.223:9094,192.168.20.224:9094,192.168.20.225:9094 + +#用于分配log_id、连接hbase的zookeeper地址 +zookeeper.servers=192.168.30.193:2181,192.168.30.194:2181,192.168.30.195:2181 + +#hdfs地址用于获取定位库 +hdfs.servers=192.168.30.193:9000,192.168.30.194:9000 + +#--------------------------------HTTP/定位库------------------------------# +#定位库存储文件系统类型,hdfs or local +knowledgebase.file.storage.type=hdfs + +#定位库地址,根据file.system.type配置填写对应地址路径。 +knowledgebase.file.storage.path=/knowledgebase/ETL-ACTIVE-DEFENCE-EVENT-COMPLETED/ + +#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高 +knowledgebase.type.list=ip_location,asn + +#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name +knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6 + +#工具库地址,存放秘钥文件等。 +tools.library=/home/tsg/olap/topology/data/ + +#--------------------------------nacos配置------------------------------# +#nacos 地址 +nacos.server=192.168.20.252:8848 + +#schema namespace名称 +nacos.schema.namespace=MSH + +#schema data id名称 +nacos.schema.data.id=active_defence_event.json + +#knowledgebase namespace名称 +nacos.knowledgebase.namespace= + +#knowledgebase data id名称 +nacos.knowledgebase.data.id=knowledge_base.json + +#--------------------------------Kafka消费/生产配置------------------------------# +#kafka 接收数据topic +source.kafka.topic=INTERNAL-PACKET-CAPTURE-EVENT + +#补全数据 输出 topic +sink.kafka.topic=INTERNAL-PACKET-CAPTURE-EVENT + +#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +group.id=internal-packet-capture-event-20230629-1 + +#--------------------------------topology配置------------------------------# +#consumer 并行度 +source.parallelism=3 + +#转换函数并行度 +transform.parallelism=3 + +#kafka producer 并行度 +sink.parallelism=3 + +#数据中心,取值范围(0-31) +data.center.id.num=1 + +#hbase 更新时间,如填写0则不更新缓存 +hbase.tick.tuple.freq.secs=180 + +#--------------------------------默认值配置------------------------------# + +#0不需要补全原样输出日志,1需要补全 +log.need.complete=0 + +#生产者压缩模式 none or snappy +producer.kafka.compression.type=snappy diff --git a/TWA-PIC/flink/topology/completion/config/MIRROR-NETWORK-TRAFFIC-METRICS b/TWA-PIC/flink/topology/completion/config/MIRROR-NETWORK-TRAFFIC-METRICS new file mode 100644 index 0000000..e368848 --- /dev/null +++ b/TWA-PIC/flink/topology/completion/config/MIRROR-NETWORK-TRAFFIC-METRICS @@ -0,0 +1,78 @@ +#--------------------------------地址配置------------------------------# +#管理kafka地址 +source.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#管理输出kafka地址 +sink.kafka.servers=192.168.20.223:9094,192.168.20.224:9094,192.168.20.225:9094 + +#用于分配log_id、连接hbase的zookeeper地址 +zookeeper.servers=192.168.30.193:2181,192.168.30.194:2181,192.168.30.195:2181 + +#hdfs地址用于获取定位库 +hdfs.servers=192.168.30.193:9000,192.168.30.194:9000 + +#--------------------------------HTTP/定位库------------------------------# +#定位库存储文件系统类型,hdfs or local +knowledgebase.file.storage.type=hdfs + +#定位库地址,根据file.system.type配置填写对应地址路径。 +knowledgebase.file.storage.path=/knowledgebase/ETL-ACTIVE-DEFENCE-EVENT-COMPLETED/ + +#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高 +knowledgebase.type.list=ip_location,asn + +#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name +knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6 + +#工具库地址,存放秘钥文件等。 +tools.library=/home/tsg/olap/topology/data/ + +#--------------------------------nacos配置------------------------------# +#nacos 地址 +nacos.server=192.168.20.252:8848 + +#schema namespace名称 +nacos.schema.namespace=MSH + +#schema data id名称 +nacos.schema.data.id=active_defence_event.json + +#knowledgebase namespace名称 +nacos.knowledgebase.namespace= + +#knowledgebase data id名称 +nacos.knowledgebase.data.id=knowledge_base.json + +#--------------------------------Kafka消费/生产配置------------------------------# +#kafka 接收数据topic +source.kafka.topic=NETWORK-TRAFFIC-METRICS + +#补全数据 输出 topic +sink.kafka.topic=NETWORK-TRAFFIC-METRICS + +#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +group.id=network-traffic-metrics-20230629-1 + +#--------------------------------topology配置------------------------------# +#consumer 并行度 +source.parallelism=3 + +#转换函数并行度 +transform.parallelism=3 + +#kafka producer 并行度 +sink.parallelism=3 + +#数据中心,取值范围(0-31) +data.center.id.num=1 + +#hbase 更新时间,如填写0则不更新缓存 +hbase.tick.tuple.freq.secs=180 + +#--------------------------------默认值配置------------------------------# + +#0不需要补全原样输出日志,1需要补全 +log.need.complete=0 + +#生产者压缩模式 none or snappy +producer.kafka.compression.type=snappy diff --git a/TWA-PIC/flink/topology/completion/config/MIRROR-POLICY-RULE-METRICS b/TWA-PIC/flink/topology/completion/config/MIRROR-POLICY-RULE-METRICS new file mode 100644 index 0000000..f6ab683 --- /dev/null +++ b/TWA-PIC/flink/topology/completion/config/MIRROR-POLICY-RULE-METRICS @@ -0,0 +1,78 @@ +#--------------------------------地址配置------------------------------# +#管理kafka地址 +source.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#管理输出kafka地址 +sink.kafka.servers=192.168.20.223:9094,192.168.20.224:9094,192.168.20.225:9094 + +#用于分配log_id、连接hbase的zookeeper地址 +zookeeper.servers=192.168.30.193:2181,192.168.30.194:2181,192.168.30.195:2181 + +#hdfs地址用于获取定位库 +hdfs.servers=192.168.30.193:9000,192.168.30.194:9000 + +#--------------------------------HTTP/定位库------------------------------# +#定位库存储文件系统类型,hdfs or local +knowledgebase.file.storage.type=hdfs + +#定位库地址,根据file.system.type配置填写对应地址路径。 +knowledgebase.file.storage.path=/knowledgebase/ETL-ACTIVE-DEFENCE-EVENT-COMPLETED/ + +#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高 +knowledgebase.type.list=ip_location,asn + +#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name +knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6 + +#工具库地址,存放秘钥文件等。 +tools.library=/home/tsg/olap/topology/data/ + +#--------------------------------nacos配置------------------------------# +#nacos 地址 +nacos.server=192.168.20.252:8848 + +#schema namespace名称 +nacos.schema.namespace=MSH + +#schema data id名称 +nacos.schema.data.id=active_defence_event.json + +#knowledgebase namespace名称 +nacos.knowledgebase.namespace= + +#knowledgebase data id名称 +nacos.knowledgebase.data.id=knowledge_base.json + +#--------------------------------Kafka消费/生产配置------------------------------# +#kafka 接收数据topic +source.kafka.topic=POLICY-RULE-METRICS + +#补全数据 输出 topic +sink.kafka.topic=POLICY-RULE-METRICS + +#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +group.id=policy-rule-metrics-20230629-1 + +#--------------------------------topology配置------------------------------# +#consumer 并行度 +source.parallelism=3 + +#转换函数并行度 +transform.parallelism=3 + +#kafka producer 并行度 +sink.parallelism=3 + +#数据中心,取值范围(0-31) +data.center.id.num=1 + +#hbase 更新时间,如填写0则不更新缓存 +hbase.tick.tuple.freq.secs=180 + +#--------------------------------默认值配置------------------------------# + +#0不需要补全原样输出日志,1需要补全 +log.need.complete=0 + +#生产者压缩模式 none or snappy +producer.kafka.compression.type=snappy diff --git a/TWA-PIC/flink/topology/completion/config/MIRROR-PXY-EXCH-INTERMEDIA-CERT b/TWA-PIC/flink/topology/completion/config/MIRROR-PXY-EXCH-INTERMEDIA-CERT new file mode 100644 index 0000000..77b0196 --- /dev/null +++ b/TWA-PIC/flink/topology/completion/config/MIRROR-PXY-EXCH-INTERMEDIA-CERT @@ -0,0 +1,78 @@ +#--------------------------------地址配置------------------------------# +#管理kafka地址 +source.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#管理输出kafka地址 +sink.kafka.servers=192.168.20.223:9094,192.168.20.224:9094,192.168.20.225:9094 + +#用于分配log_id、连接hbase的zookeeper地址 +zookeeper.servers=192.168.30.193:2181,192.168.30.194:2181,192.168.30.195:2181 + +#hdfs地址用于获取定位库 +hdfs.servers=192.168.30.193:9000,192.168.30.194:9000 + +#--------------------------------HTTP/定位库------------------------------# +#定位库存储文件系统类型,hdfs or local +knowledgebase.file.storage.type=hdfs + +#定位库地址,根据file.system.type配置填写对应地址路径。 +knowledgebase.file.storage.path=/knowledgebase/ETL-ACTIVE-DEFENCE-EVENT-COMPLETED/ + +#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高 +knowledgebase.type.list=ip_location,asn + +#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name +knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6 + +#工具库地址,存放秘钥文件等。 +tools.library=/home/tsg/olap/topology/data/ + +#--------------------------------nacos配置------------------------------# +#nacos 地址 +nacos.server=192.168.20.252:8848 + +#schema namespace名称 +nacos.schema.namespace=MSH + +#schema data id名称 +nacos.schema.data.id=active_defence_event.json + +#knowledgebase namespace名称 +nacos.knowledgebase.namespace= + +#knowledgebase data id名称 +nacos.knowledgebase.data.id=knowledge_base.json + +#--------------------------------Kafka消费/生产配置------------------------------# +#kafka 接收数据topic +source.kafka.topic=PXY-EXCH-INTERMEDIA-CERT + +#补全数据 输出 topic +sink.kafka.topic=PXY-EXCH-INTERMEDIA-CERT + +#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +group.id=pxy-exch-intermedia-cert-1 + +#--------------------------------topology配置------------------------------# +#consumer 并行度 +source.parallelism=3 + +#转换函数并行度 +transform.parallelism=3 + +#kafka producer 并行度 +sink.parallelism=3 + +#数据中心,取值范围(0-31) +data.center.id.num=1 + +#hbase 更新时间,如填写0则不更新缓存 +hbase.tick.tuple.freq.secs=180 + +#--------------------------------默认值配置------------------------------# + +#0不需要补全原样输出日志,1需要补全 +log.need.complete=0 + +#生产者压缩模式 none or snappy +producer.kafka.compression.type=snappy diff --git a/TWA-PIC/flink/topology/completion/config/MIRROR-SYS-PACKET-CAPTURE-EVENT b/TWA-PIC/flink/topology/completion/config/MIRROR-SYS-PACKET-CAPTURE-EVENT new file mode 100644 index 0000000..1533eea --- /dev/null +++ b/TWA-PIC/flink/topology/completion/config/MIRROR-SYS-PACKET-CAPTURE-EVENT @@ -0,0 +1,78 @@ +#--------------------------------地址配置------------------------------# +#管理kafka地址 +source.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#管理输出kafka地址 +sink.kafka.servers=192.168.20.223:9094,192.168.20.224:9094,192.168.20.225:9094 + +#用于分配log_id、连接hbase的zookeeper地址 +zookeeper.servers=192.168.30.193:2181,192.168.30.194:2181,192.168.30.195:2181 + +#hdfs地址用于获取定位库 +hdfs.servers=192.168.30.193:9000,192.168.30.194:9000 + +#--------------------------------HTTP/定位库------------------------------# +#定位库存储文件系统类型,hdfs or local +knowledgebase.file.storage.type=hdfs + +#定位库地址,根据file.system.type配置填写对应地址路径。 +knowledgebase.file.storage.path=/knowledgebase/ETL-ACTIVE-DEFENCE-EVENT-COMPLETED/ + +#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高 +knowledgebase.type.list=ip_location,asn + +#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name +knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6 + +#工具库地址,存放秘钥文件等。 +tools.library=/home/tsg/olap/topology/data/ + +#--------------------------------nacos配置------------------------------# +#nacos 地址 +nacos.server=192.168.20.252:8848 + +#schema namespace名称 +nacos.schema.namespace=MSH + +#schema data id名称 +nacos.schema.data.id=active_defence_event.json + +#knowledgebase namespace名称 +nacos.knowledgebase.namespace= + +#knowledgebase data id名称 +nacos.knowledgebase.data.id=knowledge_base.json + +#--------------------------------Kafka消费/生产配置------------------------------# +#kafka 接收数据topic +source.kafka.topic=SYS-PACKET-CAPTURE-EVENT + +#补全数据 输出 topic +sink.kafka.topic=SYS-PACKET-CAPTURE-EVENT + +#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +group.id=sys-packet-capture-event-20230629-1 + +#--------------------------------topology配置------------------------------# +#consumer 并行度 +source.parallelism=3 + +#转换函数并行度 +transform.parallelism=3 + +#kafka producer 并行度 +sink.parallelism=3 + +#数据中心,取值范围(0-31) +data.center.id.num=1 + +#hbase 更新时间,如填写0则不更新缓存 +hbase.tick.tuple.freq.secs=180 + +#--------------------------------默认值配置------------------------------# + +#0不需要补全原样输出日志,1需要补全 +log.need.complete=0 + +#生产者压缩模式 none or snappy +producer.kafka.compression.type=snappy diff --git a/TWA-PIC/flink/topology/completion/config/MIRROR-TRAFFIC-TOP-METRICS b/TWA-PIC/flink/topology/completion/config/MIRROR-TRAFFIC-TOP-METRICS new file mode 100644 index 0000000..8e203cd --- /dev/null +++ b/TWA-PIC/flink/topology/completion/config/MIRROR-TRAFFIC-TOP-METRICS @@ -0,0 +1,78 @@ +#--------------------------------地址配置------------------------------# +#管理kafka地址 +source.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#管理输出kafka地址 +sink.kafka.servers=192.168.20.223:9094,192.168.20.224:9094,192.168.20.225:9094 + +#用于分配log_id、连接hbase的zookeeper地址 +zookeeper.servers=192.168.30.193:2181,192.168.30.194:2181,192.168.30.195:2181 + +#hdfs地址用于获取定位库 +hdfs.servers=192.168.30.193:9000,192.168.30.194:9000 + +#--------------------------------HTTP/定位库------------------------------# +#定位库存储文件系统类型,hdfs or local +knowledgebase.file.storage.type=hdfs + +#定位库地址,根据file.system.type配置填写对应地址路径。 +knowledgebase.file.storage.path=/knowledgebase/ETL-ACTIVE-DEFENCE-EVENT-COMPLETED/ + +#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高 +knowledgebase.type.list=ip_location,asn + +#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name +knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6 + +#工具库地址,存放秘钥文件等。 +tools.library=/home/tsg/olap/topology/data/ + +#--------------------------------nacos配置------------------------------# +#nacos 地址 +nacos.server=192.168.20.252:8848 + +#schema namespace名称 +nacos.schema.namespace=MSH + +#schema data id名称 +nacos.schema.data.id=active_defence_event.json + +#knowledgebase namespace名称 +nacos.knowledgebase.namespace= + +#knowledgebase data id名称 +nacos.knowledgebase.data.id=knowledge_base.json + +#--------------------------------Kafka消费/生产配置------------------------------# +#kafka 接收数据topic +source.kafka.topic=TRAFFIC-TOP-METRICS + +#补全数据 输出 topic +sink.kafka.topic=TRAFFIC-TOP-METRICS + +#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +group.id=traffic-top-metrics-20230629-1 + +#--------------------------------topology配置------------------------------# +#consumer 并行度 +source.parallelism=3 + +#转换函数并行度 +transform.parallelism=3 + +#kafka producer 并行度 +sink.parallelism=3 + +#数据中心,取值范围(0-31) +data.center.id.num=1 + +#hbase 更新时间,如填写0则不更新缓存 +hbase.tick.tuple.freq.secs=180 + +#--------------------------------默认值配置------------------------------# + +#0不需要补全原样输出日志,1需要补全 +log.need.complete=0 + +#生产者压缩模式 none or snappy +producer.kafka.compression.type=snappy diff --git a/TWA-PIC/flink/topology/completion/config/MIRROR-VOIP-RECORD b/TWA-PIC/flink/topology/completion/config/MIRROR-VOIP-RECORD new file mode 100644 index 0000000..7318622 --- /dev/null +++ b/TWA-PIC/flink/topology/completion/config/MIRROR-VOIP-RECORD @@ -0,0 +1,78 @@ +#--------------------------------地址配置------------------------------# +#管理kafka地址 +source.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#管理输出kafka地址 +sink.kafka.servers=192.168.20.223:9094,192.168.20.224:9094,192.168.20.225:9094 + +#用于分配log_id、连接hbase的zookeeper地址 +zookeeper.servers=192.168.30.193:2181,192.168.30.194:2181,192.168.30.195:2181 + +#hdfs地址用于获取定位库 +hdfs.servers=192.168.30.193:9000,192.168.30.194:9000 + +#--------------------------------HTTP/定位库------------------------------# +#定位库存储文件系统类型,hdfs or local +knowledgebase.file.storage.type=hdfs + +#定位库地址,根据file.system.type配置填写对应地址路径。 +knowledgebase.file.storage.path=/knowledgebase/ETL-ACTIVE-DEFENCE-EVENT-COMPLETED/ + +#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高 +knowledgebase.type.list=ip_location,asn + +#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name +knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6 + +#工具库地址,存放秘钥文件等。 +tools.library=/home/tsg/olap/topology/data/ + +#--------------------------------nacos配置------------------------------# +#nacos 地址 +nacos.server=192.168.20.252:8848 + +#schema namespace名称 +nacos.schema.namespace=MSH + +#schema data id名称 +nacos.schema.data.id=active_defence_event.json + +#knowledgebase namespace名称 +nacos.knowledgebase.namespace= + +#knowledgebase data id名称 +nacos.knowledgebase.data.id=knowledge_base.json + +#--------------------------------Kafka消费/生产配置------------------------------# +#kafka 接收数据topic +source.kafka.topic=VOIP-RECORD + +#补全数据 输出 topic +sink.kafka.topic=VOIP-RECORD + +#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +group.id=voip-record-20230629-1 + +#--------------------------------topology配置------------------------------# +#consumer 并行度 +source.parallelism=3 + +#转换函数并行度 +transform.parallelism=3 + +#kafka producer 并行度 +sink.parallelism=3 + +#数据中心,取值范围(0-31) +data.center.id.num=1 + +#hbase 更新时间,如填写0则不更新缓存 +hbase.tick.tuple.freq.secs=180 + +#--------------------------------默认值配置------------------------------# + +#0不需要补全原样输出日志,1需要补全 +log.need.complete=0 + +#生产者压缩模式 none or snappy +producer.kafka.compression.type=snappy diff --git a/TWA-PIC/flink/topology/completion/service_flow_config.properties b/TWA-PIC/flink/topology/completion/service_flow_config.properties new file mode 100644 index 0000000..3c9b3b1 --- /dev/null +++ b/TWA-PIC/flink/topology/completion/service_flow_config.properties @@ -0,0 +1,78 @@ +#--------------------------------地址配置------------------------------# +#管理kafka地址 +source.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#管理输出kafka地址 +sink.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#用于分配log_id、连接hbase的zookeeper地址 +zookeeper.servers=192.168.30.193:2181,192.168.30.194:2181,192.168.30.195:2181 + +#hdfs地址用于获取定位库 +hdfs.servers=192.168.30.193:9000,192.168.30.194:9000 + +#--------------------------------HTTP/定位库------------------------------# +#定位库存储文件系统类型,hdfs or local +knowledgebase.file.storage.type=hdfs + +#定位库地址,根据file.system.type配置填写对应地址路径。 +knowledgebase.file.storage.path=/knowledgebase/ETL-SECURITY-EVENT-COMPLETED/ + +#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高 +knowledgebase.type.list=ip_location,asn + +#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name +knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6 + +#工具库地址,存放秘钥文件等。 +tools.library=/home/tsg/olap/topology/dat/ + +#--------------------------------nacos配置------------------------------# +#nacos 地址 +nacos.server=192.168.20.252:8848 + +#schema namespace名称 +nacos.schema.namespace=MSH + +#schema data id名称 +nacos.schema.data.id=security_event.json + +#knowledgebase namespace名称 +nacos.knowledgebase.namespace= + +#knowledgebase data id名称 +nacos.knowledgebase.data.id=knowledge_base.json + +#--------------------------------Kafka消费/生产配置------------------------------# +#kafka 接收数据topic +source.kafka.topic=SECURITY-EVENT + +#补全数据 输出 topic +sink.kafka.topic=SECURITY-EVENT-COMPLETED + +#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +group.id=security-event-log-20230718-A + +#--------------------------------topology配置------------------------------# +#consumer 并行度 +source.parallelism=24 + +#转换函数并行度 +transform.parallelism=24 + +#kafka producer 并行度 +sink.parallelism=24 + +#数据中心,取值范围(0-31) +data.center.id.num=7 + +#hbase 更新时间,如填写0则不更新缓存 +hbase.tick.tuple.freq.secs=180 + +#--------------------------------默认值配置------------------------------# + +#0不需要补全原样输出日志,1需要补全 +log.need.complete=1 + +#生产者压缩模式 none or snappy +producer.kafka.compression.type=snappy diff --git a/TWA-PIC/flink/topology/completion/start.sh b/TWA-PIC/flink/topology/completion/start.sh new file mode 100644 index 0000000..609a52b --- /dev/null +++ b/TWA-PIC/flink/topology/completion/start.sh @@ -0,0 +1,67 @@ +#!/bin/bash +#启动storm任务脚本 +source /etc/profile +#任务jar所在目录 +BASE_DIR=$(pwd) + +#######################参数配置#################################### +#yarn任务运行模式 per-job 或 session +TASK_MODE="per-job" +#更新jar的名字 +PRIMORDIAL='log-completion-schema-230607-FastJson2.jar' +#jar name +JAR_NAME='log-completion-schema_tmp.jar' + +MAIN_CLASS="com.zdjizhi.topology.LogFlowWriteTopology" +SESSION_CLUSTER="Flink session cluster" +CONFIG_NAME="service_flow_config.properties" +JOBMANAGER_MEMORY="1024m" +TASKMANAGER_MEMORY="4096m" +TASK_SLOTS=3 +#######################参数配置#################################### + +APPLICATION_ID=$(yarn application -list | grep "$SESSION_CLUSTER" | awk '{print $1}') + +yes | cp -r $PRIMORDIAL $JAR_NAME + +#cd $BASE_DIR +jar -xvf $BASE_DIR/$JAR_NAME $CONFIG_NAME +function read_dir() { + for file in $(ls $1); do + if [ -d $1"/"$file ]; then + read_dir $1"/"$file + else + if [[ -z $TASK_MODE || $TASK_MODE == "per-job" ]]; then + num=$(yarn application -list | grep $file | wc -l) + if [ $num -eq "0" ]; then + cat $1$file >$BASE_DIR/$CONFIG_NAME + jar -uvf $BASE_DIR/$JAR_NAME $CONFIG_NAME + if [[ $file == "ETL-PROXY-EVENT-COMPLETED" || $file == "ETL-SECURITY-EVENT-COMPLETED" ]]; then + flink run -t yarn-per-job -Djobmanager.memory.process.size=2048m -Dtaskmanager.memory.process.size=5120m -Dtaskmanager.memory.framework.off-heap.size=256m -Dyarn.application.name=$file -Dtaskmanager.numberOfTaskSlots=8 -d -c $MAIN_CLASS $BASE_DIR/$JAR_NAME $file + else + flink run -t yarn-per-job -Djobmanager.memory.process.size=$JOBMANAGER_MEMORY -Dtaskmanager.memory.process.size=$TASKMANAGER_MEMORY -Dyarn.application.name=$file -Dtaskmanager.numberOfTaskSlots=$TASK_SLOTS -d -c $MAIN_CLASS $BASE_DIR/$JAR_NAME $file + sleep 10 + fi + fi + fi + if [[ -n $APPLICATION_ID && (-z $TASK_MODE || $TASK_MODE == "session") ]]; then + num=$(flink list | grep "$file" | grep -v flink | wc -l) + if [ $num -eq "0" ]; then + cat $1$file >$BASE_DIR/$CONFIG_NAME + jar -uvf $BASE_DIR/$JAR_NAME $CONFIG_NAME + #session + flink run -t yarn-session -Dyarn.application.id=$APPLICATION_ID -d -c $MAIN_CLASS $BASE_DIR/$JAR_NAME $file + sleep 10 + fi + fi + fi + done +} +if [ $# != 1 ]; then + echo "usage: ./startall.sh [Configuration path]" + exit 1 +fi +#读取第一个参数 为配置文件目录名称 +read_dir $1 +rm -rf $JAR_NAME + diff --git a/TWA-PIC/flink/topology/completion/stop.sh b/TWA-PIC/flink/topology/completion/stop.sh new file mode 100644 index 0000000..24e1a83 --- /dev/null +++ b/TWA-PIC/flink/topology/completion/stop.sh @@ -0,0 +1,34 @@ +#!/bin/bash + +#flink任务停止脚本 +source /etc/profile +#加参数 per-job 或 session +TASK_MODE="per-job" +SESSION_CLUSTER="Flink session cluster" + +APPLICATION_ID=$(yarn application -list | grep "$SESSION_CLUSTER" | awk '{print $1}') + +function read_dir() { + for file in $(ls $1); do + if [ -d $1"/"$file ]; then + read_dir $1"/"$file + else + if [[ $TASK_MODE == "per-job" ]]; then + appid=$(yarn application -list | grep "$file" | awk '{print $1}') + yarn application -kill $appid + echo -e "\033[32mcancel $file\033[0m" + + elif [[ -n $APPLICATION_ID && $TASK_MODE == "session" ]]; then + jobid=$(flink list | grep -v flink | grep "$file" | awk '{print $4}') + flink cancel $jobid + echo -e "\033[32mcancel $file\033[0m" + fi + + fi + + done +} + +#读取第一个参数 为配置文件目录名 +read_dir $1 + diff --git a/TWA-PIC/flink/topology/completion/tmp/ETL-PROXY-EVENT-COMPLETED b/TWA-PIC/flink/topology/completion/tmp/ETL-PROXY-EVENT-COMPLETED new file mode 100644 index 0000000..7638c0d --- /dev/null +++ b/TWA-PIC/flink/topology/completion/tmp/ETL-PROXY-EVENT-COMPLETED @@ -0,0 +1,78 @@ +#--------------------------------地址配置------------------------------# +#管理kafka地址 +source.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#管理输出kafka地址 +sink.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#用于分配log_id、连接hbase的zookeeper地址 +zookeeper.servers=192.168.30.193:2181,192.168.30.194:2181,192.168.30.195:2181 + +#hdfs地址用于获取定位库 +hdfs.servers=192.168.30.193:9000,192.168.30.194:9000 + +#--------------------------------HTTP/定位库------------------------------# +#定位库存储文件系统类型,hdfs or local +knowledgebase.file.storage.type=hdfs + +#定位库地址,根据file.system.type配置填写对应地址路径。 +knowledgebase.file.storage.path=/knowledgebase/ETL-PROXY-EVENT-COMPLETED/ + +#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高 +knowledgebase.type.list=ip_location,asn + +#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name +knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6 + +#工具库地址,存放秘钥文件等。 +tools.library=/home/tsg/olap/topology/data/ + +#--------------------------------nacos配置------------------------------# +#nacos 地址 +nacos.server=192.168.20.252:8848 + +#schema namespace名称 +nacos.schema.namespace=MSH + +#schema data id名称 +nacos.schema.data.id=proxy_event.json + +#knowledgebase namespace名称 +nacos.knowledgebase.namespace= + +#knowledgebase data id名称 +nacos.knowledgebase.data.id=knowledge_base.json + +#--------------------------------Kafka消费/生产配置------------------------------# +#kafka 接收数据topic +source.kafka.topic=PROXY-EVENT + +#补全数据 输出 topic +sink.kafka.topic=PROXY-EVENT-COMPLETED + +#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +group.id=proxy-event-20220408-1 + +#--------------------------------topology配置------------------------------# +#consumer 并行度 +source.parallelism=24 + +#转换函数并行度 +transform.parallelism=24 + +#kafka producer 并行度 +sink.parallelism=24 + +#数据中心,取值范围(0-31) +data.center.id.num=5 + +#hbase 更新时间,如填写0则不更新缓存 +hbase.tick.tuple.freq.secs=180 + +#--------------------------------默认值配置------------------------------# + +#0不需要补全原样输出日志,1需要补全 +log.need.complete=1 + +#生产者压缩模式 none or snappy +producer.kafka.compression.type=snappy diff --git a/TWA-PIC/flink/topology/completion/tmp/ETL-SECURITY-EVENT-COMPLETED b/TWA-PIC/flink/topology/completion/tmp/ETL-SECURITY-EVENT-COMPLETED new file mode 100644 index 0000000..2e32ae6 --- /dev/null +++ b/TWA-PIC/flink/topology/completion/tmp/ETL-SECURITY-EVENT-COMPLETED @@ -0,0 +1,78 @@ +#--------------------------------地址配置------------------------------# +#管理kafka地址 +source.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#管理输出kafka地址 +sink.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#用于分配log_id、连接hbase的zookeeper地址 +zookeeper.servers=192.168.30.193:2181,192.168.30.194:2181,192.168.30.195:2181 + +#hdfs地址用于获取定位库 +hdfs.servers=192.168.30.193:9000,192.168.30.194:9000 + +#--------------------------------HTTP/定位库------------------------------# +#定位库存储文件系统类型,hdfs or local +knowledgebase.file.storage.type=hdfs + +#定位库地址,根据file.system.type配置填写对应地址路径。 +knowledgebase.file.storage.path=/knowledgebase/ETL-SECURITY-EVENT-COMPLETED/ + +#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高 +knowledgebase.type.list=ip_location,asn + +#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name +knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6 + +#工具库地址,存放秘钥文件等。 +tools.library=/home/tsg/olap/topology/data/ + +#--------------------------------nacos配置------------------------------# +#nacos 地址 +nacos.server=192.168.20.252:8848 + +#schema namespace名称 +nacos.schema.namespace=MSH + +#schema data id名称 +nacos.schema.data.id=security_event.json + +#knowledgebase namespace名称 +nacos.knowledgebase.namespace= + +#knowledgebase data id名称 +nacos.knowledgebase.data.id=knowledge_base.json + +#--------------------------------Kafka消费/生产配置------------------------------# +#kafka 接收数据topic +source.kafka.topic=SECURITY-EVENT + +#补全数据 输出 topic +sink.kafka.topic=SECURITY-EVENT-COMPLETED + +#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +group.id=security-event-log-20230718-A + +#--------------------------------topology配置------------------------------# +#consumer 并行度 +source.parallelism=24 + +#转换函数并行度 +transform.parallelism=24 + +#kafka producer 并行度 +sink.parallelism=24 + +#数据中心,取值范围(0-31) +data.center.id.num=7 + +#hbase 更新时间,如填写0则不更新缓存 +hbase.tick.tuple.freq.secs=180 + +#--------------------------------默认值配置------------------------------# + +#0不需要补全原样输出日志,1需要补全 +log.need.complete=1 + +#生产者压缩模式 none or snappy +producer.kafka.compression.type=snappy diff --git a/TWA-PIC/flink/topology/data/asn_v4.mmdb b/TWA-PIC/flink/topology/data/asn_v4.mmdb Binary files differnew file mode 100644 index 0000000..63df444 --- /dev/null +++ b/TWA-PIC/flink/topology/data/asn_v4.mmdb diff --git a/TWA-PIC/flink/topology/data/asn_v6.mmdb b/TWA-PIC/flink/topology/data/asn_v6.mmdb Binary files differnew file mode 100644 index 0000000..25cff33 --- /dev/null +++ b/TWA-PIC/flink/topology/data/asn_v6.mmdb diff --git a/TWA-PIC/flink/topology/data/ip_v4_built_in.mmdb b/TWA-PIC/flink/topology/data/ip_v4_built_in.mmdb Binary files differnew file mode 100644 index 0000000..7210af4 --- /dev/null +++ b/TWA-PIC/flink/topology/data/ip_v4_built_in.mmdb diff --git a/TWA-PIC/flink/topology/data/ip_v4_user_defined.mmdb b/TWA-PIC/flink/topology/data/ip_v4_user_defined.mmdb Binary files differnew file mode 100644 index 0000000..9853019 --- /dev/null +++ b/TWA-PIC/flink/topology/data/ip_v4_user_defined.mmdb diff --git a/TWA-PIC/flink/topology/data/ip_v6_built_in.mmdb b/TWA-PIC/flink/topology/data/ip_v6_built_in.mmdb Binary files differnew file mode 100644 index 0000000..35d1d32 --- /dev/null +++ b/TWA-PIC/flink/topology/data/ip_v6_built_in.mmdb diff --git a/TWA-PIC/flink/topology/data/ip_v6_user_defined.mmdb b/TWA-PIC/flink/topology/data/ip_v6_user_defined.mmdb Binary files differnew file mode 100644 index 0000000..5047903 --- /dev/null +++ b/TWA-PIC/flink/topology/data/ip_v6_user_defined.mmdb diff --git a/TWA-PIC/flink/topology/data/keystore.jks b/TWA-PIC/flink/topology/data/keystore.jks Binary files differnew file mode 100644 index 0000000..2e2328b --- /dev/null +++ b/TWA-PIC/flink/topology/data/keystore.jks diff --git a/TWA-PIC/flink/topology/data/truststore.jks b/TWA-PIC/flink/topology/data/truststore.jks Binary files differnew file mode 100644 index 0000000..b435e09 --- /dev/null +++ b/TWA-PIC/flink/topology/data/truststore.jks diff --git a/TWA-PIC/flink/topology/relationship-gtpc-user/config/RELATIONSHIP-GTPC-USER b/TWA-PIC/flink/topology/relationship-gtpc-user/config/RELATIONSHIP-GTPC-USER new file mode 100644 index 0000000..79c0baf --- /dev/null +++ b/TWA-PIC/flink/topology/relationship-gtpc-user/config/RELATIONSHIP-GTPC-USER @@ -0,0 +1,33 @@ +#--------------------------------地址配置------------------------------# +#管理kafka地址 +input.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#hbase zookeeper地址 用于连接HBase +hbase.zookeeper.servers=192.168.30.193:2181,192.168.30.194:2181,192.168.30.195:2181 + +hbase.scan.limit=100000 + +hbase.rpc.timeout=60000 + +cache.expire.seconds=86400 + +cache.max.size=100000 + +cache.update.seconds=3600 +#--------------------------------Kafka消费组信息------------------------------# + +#kafka 接收数据topic +input.kafka.topic=GTPC-RECORD-COMPLETED + +#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +group.id=relationship-gtpc-user-20220830-1 + +#--------------------------------topology配置------------------------------# +#ip-account对应关系表 +relation.user.teid.table.name=tsg_galaxy:relation_user_teid + +#定位库地址 +tools.library=/home/tsg/olap/topology/data/ + +#account-ip对应关系表 +gtpc.knowledge.base.table.name=tsg_galaxy:gtpc_knowledge_base diff --git a/TWA-PIC/flink/topology/relationship-gtpc-user/service_flow_config.properties b/TWA-PIC/flink/topology/relationship-gtpc-user/service_flow_config.properties new file mode 100644 index 0000000..2278c06 --- /dev/null +++ b/TWA-PIC/flink/topology/relationship-gtpc-user/service_flow_config.properties @@ -0,0 +1,33 @@ +#--------------------------------地址配置------------------------------# +#管理kafka地址 +input.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#hbase zookeeper地址 用于连接HBase +hbase.zookeeper.servers=192.168.30.193:2181,192.168.30.194:2181,192.168.30.195:2181 + +hbase.scan.limit=100000 + +hbase.rpc.timeout=60000 + +cache.expire.seconds=86400 + +cache.max.size=100000 + +cache.update.seconds=3600 +#--------------------------------Kafka消费组信息------------------------------# + +#kafka 接收数据topic +input.kafka.topic=GTPC-RECORD-COMPLETED + +#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +group.id=relationship-gtpc-user-20220830-1 + +#--------------------------------topology配置------------------------------# +#ip-account对应关系表 +relation.user.teid.table.name=tsg_galaxy:relation_user_teid + +#定位库地址 +tools.library=/home/tsg/olap/topology/dat/ + +#account-ip对应关系表 +gtpc.knowledge.base.table.name=tsg_galaxy:gtpc_knowledge_base diff --git a/TWA-PIC/flink/topology/relationship-gtpc-user/start.sh b/TWA-PIC/flink/topology/relationship-gtpc-user/start.sh new file mode 100644 index 0000000..9ea8fd5 --- /dev/null +++ b/TWA-PIC/flink/topology/relationship-gtpc-user/start.sh @@ -0,0 +1,67 @@ +#!/bin/bash +#启动storm任务脚本 +source /etc/profile +#######################参数配置#################################### +#yarn任务运行模式 per-job 或 per-job +TASK_MODE="per-job" +#更新jar的名字 +#PRIMORDIAL +PRIMORDIAL='relationship-gtpc-user-23-06-02.jar' +#jar name +JAR_NAME='relationship-gtpc-user_tmp.jar' + +SESSION_CLUSTER="Flink per-job cluster" +MAIN_CLASS="" +CONFIG_NAME="service_flow_config.properties" +JOBMANAGER_MEMORY="1024m" +TASKMANAGER_MEMORY="3072m" +TASK_SLOTS=3 +CLASS_LOADER='child-first' +#######################参数配置#################################### + +#任务jar所在目录 +BASE_DIR=$(pwd) +APPLICATION_ID=$(yarn application -list | grep "$SESSION_CLUSTER" | awk '{print $1}') + +yes | cp -r $PRIMORDIAL $JAR_NAME + +#cd $BASE_DIR +jar -xvf $BASE_DIR/$JAR_NAME $CONFIG_NAME +function read_dir() { + for file in $(ls $1); do + if [ -d $1"/"$file ]; then + read_dir $1"/"$file + else + #perl job + if [[ $TASK_MODE == "per-job" ]]; then + num=$(yarn application -list | grep $file | wc -l) + if [ $num -eq "0" ]; then + cat $1$file >$BASE_DIR/$CONFIG_NAME + jar -uvf $BASE_DIR/$JAR_NAME $CONFIG_NAME + + flink run -t yarn-per-job -Djobmanager.memory.process.size=$JOBMANAGER_MEMORY -Dtaskmanager.memory.process.size=$TASKMANAGER_MEMORY -Dyarn.application.name=$file -Dtaskmanager.numberOfTaskSlots=$TASK_SLOTS -p 3 -d $BASE_DIR/$JAR_NAME $file + sleep 10 + fi + elif [[ -n $APPLICATION_ID && $TASK_MODE == "per-job" ]]; then + num=$(flink list | grep "$file" | grep -v flink | wc -l) + if [ $num -eq "0" ]; then + cat $1$file >$BASE_DIR/$CONFIG_NAME + jar -uvf $BASE_DIR/$JAR_NAME $CONFIG_NAME + #per-job + flink run -t yarn-per-job -Dyarn.application.id=$APPLICATION_ID -Dclassloader.resolve-order=$CLASS_LOADER -d $BASE_DIR/$JAR_NAME $file + sleep 10 + fi + fi + + fi + done +} +if [ $# != 1 ]; then + echo "usage: ./startall.sh [Configuration path]" + exit 1 +fi +#读取第一个参数 为配置文件目录名称 +read_dir $1 + +rm -rf $JAR_NAME + diff --git a/TWA-PIC/flink/topology/relationship-gtpc-user/stop.sh b/TWA-PIC/flink/topology/relationship-gtpc-user/stop.sh new file mode 100644 index 0000000..3657871 --- /dev/null +++ b/TWA-PIC/flink/topology/relationship-gtpc-user/stop.sh @@ -0,0 +1,34 @@ +#!/bin/bash + +#flink任务停止脚本 +source /etc/profile +#加参数 per-job 或 per-job +TASK_MODE="per-job" +SESSION_CLUSTER="Flink per-job cluster" + +APPLICATION_ID=$(yarn application -list | grep "$SESSION_CLUSTER" | awk '{print $1}') + +function read_dir() { + for file in $(ls $1); do + if [ -d $1"/"$file ]; then + read_dir $1"/"$file + else + if [[ $TASK_MODE == "per-job" ]]; then + appid=$(yarn application -list | grep "$file" | awk '{print $1}') + yarn application -kill $appid + echo -e "\033[32mcancel $file\033[0m" + + elif [[ -n $APPLICATION_ID && $TASK_MODE == "per-job" ]]; then + jobid=$(flink list | grep -v flink | grep "$file" | awk '{print $4}') + flink cancel $jobid + echo -e "\033[32mcancel $file\033[0m" + fi + + fi + + done +} + +#读取第一个参数 为配置文件目录名 +read_dir $1 + diff --git a/TWA-PIC/flink/topology/relationship-radius-account/config/RELATIONSHIP-RADIUS-ACCOUNT b/TWA-PIC/flink/topology/relationship-radius-account/config/RELATIONSHIP-RADIUS-ACCOUNT new file mode 100644 index 0000000..7ccb244 --- /dev/null +++ b/TWA-PIC/flink/topology/relationship-radius-account/config/RELATIONSHIP-RADIUS-ACCOUNT @@ -0,0 +1,28 @@ +#管理kafka地址 +input.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#hbase zookeeper地址 用于连接HBase +hbase.zookeeper.servers=192.168.30.193:2181,192.168.30.194:2181,192.168.30.195:2181 + +#--------------------------------Kafka消费组信息------------------------------# + +#kafka 接收数据topic +input.kafka.topic=RADIUS-RECORD + +#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +group.id=account-framedip-hbase-20211113-1 + +#--------------------------------topology配置------------------------------# +#ip-account对应关系表 +hbase.framedip.table.name=tsg_galaxy:relation_framedip_account + +#定位库地址 +tools.library=/home/tsg/olap/topology/data/ + +#account-ip对应关系表 +hbase.account.table.name=tsg_galaxy:relation_account_framedip + +hbase.rpc.timeout=60000 + +hbase.scan.limit=100000 + diff --git a/TWA-PIC/flink/topology/relationship-radius-account/service_flow_config.properties b/TWA-PIC/flink/topology/relationship-radius-account/service_flow_config.properties new file mode 100644 index 0000000..b231854 --- /dev/null +++ b/TWA-PIC/flink/topology/relationship-radius-account/service_flow_config.properties @@ -0,0 +1,28 @@ +#管理kafka地址 +input.kafka.servers=192.168.30.193:9094,192.168.30.194:9094,192.168.30.195:9094 + +#hbase zookeeper地址 用于连接HBase +hbase.zookeeper.servers=192.168.30.193:2181,192.168.30.194:2181,192.168.30.195:2181 + +#--------------------------------Kafka消费组信息------------------------------# + +#kafka 接收数据topic +input.kafka.topic=RADIUS-RECORD + +#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +group.id=account-framedip-hbase-20211113-1 + +#--------------------------------topology配置------------------------------# +#ip-account对应关系表 +hbase.framedip.table.name=tsg_galaxy:relation_framedip_account + +#定位库地址 +tools.library=/home/tsg/olap/topology/dat/ + +#account-ip对应关系表 +hbase.account.table.name=tsg_galaxy:relation_account_framedip + +hbase.rpc.timeout=60000 + +hbase.scan.limit=100000 + diff --git a/TWA-PIC/flink/topology/relationship-radius-account/start.sh b/TWA-PIC/flink/topology/relationship-radius-account/start.sh new file mode 100644 index 0000000..00eee48 --- /dev/null +++ b/TWA-PIC/flink/topology/relationship-radius-account/start.sh @@ -0,0 +1,67 @@ +#!/bin/bash +#启动storm任务脚本 +source /etc/profile +#######################参数配置#################################### +#yarn任务运行模式 per-job 或 per-job +TASK_MODE="per-job" +#更新jar的名字 +#PRIMORDIAL +PRIMORDIAL='radius-relation-23-06-02.jar' +#jar name +JAR_NAME='radius-relation_tmp.jar' + +SESSION_CLUSTER="Flink per-job cluster" +MAIN_CLASS="" +CONFIG_NAME="service_flow_config.properties" +JOBMANAGER_MEMORY="1024m" +TASKMANAGER_MEMORY="3072m" +TASK_SLOTS=3 +CLASS_LOADER='child-first' +#######################参数配置#################################### + +#任务jar所在目录 +BASE_DIR=$(pwd) +APPLICATION_ID=$(yarn application -list | grep "$SESSION_CLUSTER" | awk '{print $1}') + +yes | cp -r $PRIMORDIAL $JAR_NAME + +#cd $BASE_DIR +jar -xvf $BASE_DIR/$JAR_NAME $CONFIG_NAME +function read_dir() { + for file in $(ls $1); do + if [ -d $1"/"$file ]; then + read_dir $1"/"$file + else + #perl job + if [[ $TASK_MODE == "per-job" ]]; then + num=$(yarn application -list | grep $file | wc -l) + if [ $num -eq "0" ]; then + cat $1$file >$BASE_DIR/$CONFIG_NAME + jar -uvf $BASE_DIR/$JAR_NAME $CONFIG_NAME + + flink run -t yarn-per-job -Djobmanager.memory.process.size=$JOBMANAGER_MEMORY -Dtaskmanager.memory.process.size=$TASKMANAGER_MEMORY -Dyarn.application.name=$file -Dtaskmanager.numberOfTaskSlots=$TASK_SLOTS -p 3 -d $BASE_DIR/$JAR_NAME $file + sleep 10 + fi + elif [[ -n $APPLICATION_ID && $TASK_MODE == "per-job" ]]; then + num=$(flink list | grep "$file" | grep -v flink | wc -l) + if [ $num -eq "0" ]; then + cat $1$file >$BASE_DIR/$CONFIG_NAME + jar -uvf $BASE_DIR/$JAR_NAME $CONFIG_NAME + #per-job + flink run -t yarn-per-job -Dyarn.application.id=$APPLICATION_ID -Dclassloader.resolve-order=$CLASS_LOADER -d $BASE_DIR/$JAR_NAME $file + sleep 10 + fi + fi + + fi + done +} +if [ $# != 1 ]; then + echo "usage: ./startall.sh [Configuration path]" + exit 1 +fi +#读取第一个参数 为配置文件目录名称 +read_dir $1 + +rm -rf $JAR_NAME + diff --git a/TWA-PIC/flink/topology/relationship-radius-account/stop.sh b/TWA-PIC/flink/topology/relationship-radius-account/stop.sh new file mode 100644 index 0000000..3657871 --- /dev/null +++ b/TWA-PIC/flink/topology/relationship-radius-account/stop.sh @@ -0,0 +1,34 @@ +#!/bin/bash + +#flink任务停止脚本 +source /etc/profile +#加参数 per-job 或 per-job +TASK_MODE="per-job" +SESSION_CLUSTER="Flink per-job cluster" + +APPLICATION_ID=$(yarn application -list | grep "$SESSION_CLUSTER" | awk '{print $1}') + +function read_dir() { + for file in $(ls $1); do + if [ -d $1"/"$file ]; then + read_dir $1"/"$file + else + if [[ $TASK_MODE == "per-job" ]]; then + appid=$(yarn application -list | grep "$file" | awk '{print $1}') + yarn application -kill $appid + echo -e "\033[32mcancel $file\033[0m" + + elif [[ -n $APPLICATION_ID && $TASK_MODE == "per-job" ]]; then + jobid=$(flink list | grep -v flink | grep "$file" | awk '{print $4}') + flink cancel $jobid + echo -e "\033[32mcancel $file\033[0m" + fi + + fi + + done +} + +#读取第一个参数 为配置文件目录名 +read_dir $1 + |
