diff options
Diffstat (limited to 'TWA-PIC/flink/bin')
26 files changed, 1930 insertions, 0 deletions
diff --git a/TWA-PIC/flink/bin/bash-java-utils.jar b/TWA-PIC/flink/bin/bash-java-utils.jar Binary files differnew file mode 100644 index 0000000..5b2e369 --- /dev/null +++ b/TWA-PIC/flink/bin/bash-java-utils.jar diff --git a/TWA-PIC/flink/bin/config.sh b/TWA-PIC/flink/bin/config.sh new file mode 100644 index 0000000..208b2d1 --- /dev/null +++ b/TWA-PIC/flink/bin/config.sh @@ -0,0 +1,560 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +constructFlinkClassPath() { + local FLINK_DIST + local FLINK_CLASSPATH + + while read -d '' -r jarfile ; do + if [[ "$jarfile" =~ .*/flink-dist[^/]*.jar$ ]]; then + FLINK_DIST="$FLINK_DIST":"$jarfile" + elif [[ "$FLINK_CLASSPATH" == "" ]]; then + FLINK_CLASSPATH="$jarfile"; + else + FLINK_CLASSPATH="$FLINK_CLASSPATH":"$jarfile" + fi + done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0 | sort -z) + + if [[ "$FLINK_DIST" == "" ]]; then + # write error message to stderr since stdout is stored as the classpath + (>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_LIB_DIR.") + + # exit function with empty classpath to force process failure + exit 1 + fi + + echo "$FLINK_CLASSPATH""$FLINK_DIST" +} + +findFlinkDistJar() { + local FLINK_DIST="`find "$FLINK_LIB_DIR" -name 'flink-dist*.jar'`" + + if [[ "$FLINK_DIST" == "" ]]; then + # write error message to stderr since stdout is stored as the classpath + (>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_LIB_DIR.") + + # exit function with empty classpath to force process failure + exit 1 + fi + + echo "$FLINK_DIST" +} + +# These are used to mangle paths that are passed to java when using +# cygwin. Cygwin paths are like linux paths, i.e. /path/to/somewhere +# but the windows java version expects them in Windows Format, i.e. C:\bla\blub. +# "cygpath" can do the conversion. +manglePath() { + UNAME=$(uname -s) + if [ "${UNAME:0:6}" == "CYGWIN" ]; then + echo `cygpath -w "$1"` + else + echo $1 + fi +} + +manglePathList() { + UNAME=$(uname -s) + # a path list, for example a java classpath + if [ "${UNAME:0:6}" == "CYGWIN" ]; then + echo `cygpath -wp "$1"` + else + echo $1 + fi +} + +# Looks up a config value by key from a simple YAML-style key-value map. +# $1: key to look up +# $2: default value to return if key does not exist +# $3: config file to read from +readFromConfig() { + local key=$1 + local defaultValue=$2 + local configFile=$3 + + # first extract the value with the given key (1st sed), then trim the result (2nd sed) + # if a key exists multiple times, take the "last" one (tail) + local value=`sed -n "s/^[ ]*${key}[ ]*: \([^#]*\).*$/\1/p" "${configFile}" | sed "s/^ *//;s/ *$//" | tail -n 1` + + [ -z "$value" ] && echo "$defaultValue" || echo "$value" +} + +######################################################################################################################## +# DEFAULT CONFIG VALUES: These values will be used when nothing has been specified in conf/flink-conf.yaml +# -or- the respective environment variables are not set. +######################################################################################################################## + + +# WARNING !!! , these values are only used if there is nothing else is specified in +# conf/flink-conf.yaml + +DEFAULT_ENV_PID_DIR="$(cd "`dirname "$0"`"/..; pwd)/tmp" # Directory to store *.pid files to +DEFAULT_ENV_LOG_MAX=10 # Maximum number of old log files to keep +DEFAULT_ENV_JAVA_OPTS="" # Optional JVM args +DEFAULT_ENV_JAVA_OPTS_JM="" # Optional JVM args (JobManager) +DEFAULT_ENV_JAVA_OPTS_TM="" # Optional JVM args (TaskManager) +DEFAULT_ENV_JAVA_OPTS_HS="" # Optional JVM args (HistoryServer) +DEFAULT_ENV_JAVA_OPTS_CLI="" # Optional JVM args (Client) +DEFAULT_ENV_SSH_OPTS="" # Optional SSH parameters running in cluster mode +DEFAULT_YARN_CONF_DIR="" # YARN Configuration Directory, if necessary +DEFAULT_HADOOP_CONF_DIR="" # Hadoop Configuration Directory, if necessary +DEFAULT_HBASE_CONF_DIR="" # HBase Configuration Directory, if necessary + +######################################################################################################################## +# CONFIG KEYS: The default values can be overwritten by the following keys in conf/flink-conf.yaml +######################################################################################################################## + +KEY_TASKM_COMPUTE_NUMA="taskmanager.compute.numa" + +KEY_ENV_PID_DIR="env.pid.dir" +KEY_ENV_LOG_DIR="env.log.dir" +KEY_ENV_LOG_MAX="env.log.max" +KEY_ENV_YARN_CONF_DIR="env.yarn.conf.dir" +KEY_ENV_HADOOP_CONF_DIR="env.hadoop.conf.dir" +KEY_ENV_HBASE_CONF_DIR="env.hbase.conf.dir" +KEY_ENV_JAVA_HOME="env.java.home" +KEY_ENV_JAVA_OPTS="env.java.opts" +KEY_ENV_JAVA_OPTS_JM="env.java.opts.jobmanager" +KEY_ENV_JAVA_OPTS_TM="env.java.opts.taskmanager" +KEY_ENV_JAVA_OPTS_HS="env.java.opts.historyserver" +KEY_ENV_JAVA_OPTS_CLI="env.java.opts.client" +KEY_ENV_SSH_OPTS="env.ssh.opts" +KEY_HIGH_AVAILABILITY="high-availability" +KEY_ZK_HEAP_MB="zookeeper.heap.mb" + +######################################################################################################################## +# PATHS AND CONFIG +######################################################################################################################## + +target="$0" +# For the case, the executable has been directly symlinked, figure out +# the correct bin path by following its symlink up to an upper bound. +# Note: we can't use the readlink utility here if we want to be POSIX +# compatible. +iteration=0 +while [ -L "$target" ]; do + if [ "$iteration" -gt 100 ]; then + echo "Cannot resolve path: You have a cyclic symlink in $target." + break + fi + ls=`ls -ld -- "$target"` + target=`expr "$ls" : '.* -> \(.*\)$'` + iteration=$((iteration + 1)) +done + +# Convert relative path to absolute path and resolve directory symlinks +bin=`dirname "$target"` +SYMLINK_RESOLVED_BIN=`cd "$bin"; pwd -P` + +# Define the main directory of the flink installation +# If config.sh is called by pyflink-shell.sh in python bin directory(pip installed), then do not need to set the FLINK_HOME here. +if [ -z "$_FLINK_HOME_DETERMINED" ]; then + FLINK_HOME=`dirname "$SYMLINK_RESOLVED_BIN"` +fi +if [ -z "$FLINK_LIB_DIR" ]; then FLINK_LIB_DIR=$FLINK_HOME/lib; fi +if [ -z "$FLINK_PLUGINS_DIR" ]; then FLINK_PLUGINS_DIR=$FLINK_HOME/plugins; fi +if [ -z "$FLINK_OPT_DIR" ]; then FLINK_OPT_DIR=$FLINK_HOME/opt; fi + + +# These need to be mangled because they are directly passed to java. +# The above lib path is used by the shell script to retrieve jars in a +# directory, so it needs to be unmangled. +FLINK_HOME_DIR_MANGLED=`manglePath "$FLINK_HOME"` +if [ -z "$FLINK_CONF_DIR" ]; then FLINK_CONF_DIR=$FLINK_HOME_DIR_MANGLED/conf; fi +FLINK_BIN_DIR=$FLINK_HOME_DIR_MANGLED/bin +DEFAULT_FLINK_LOG_DIR=$FLINK_HOME_DIR_MANGLED/log +FLINK_CONF_FILE="flink-conf.yaml" +YAML_CONF=${FLINK_CONF_DIR}/${FLINK_CONF_FILE} + +### Exported environment variables ### +export FLINK_CONF_DIR +export FLINK_BIN_DIR +export FLINK_PLUGINS_DIR +# export /lib dir to access it during deployment of the Yarn staging files +export FLINK_LIB_DIR +# export /opt dir to access it for the SQL client +export FLINK_OPT_DIR + +######################################################################################################################## +# ENVIRONMENT VARIABLES +######################################################################################################################## + +# read JAVA_HOME from config with no default value +MY_JAVA_HOME=$(readFromConfig ${KEY_ENV_JAVA_HOME} "" "${YAML_CONF}") +# check if config specified JAVA_HOME +if [ -z "${MY_JAVA_HOME}" ]; then + # config did not specify JAVA_HOME. Use system JAVA_HOME + MY_JAVA_HOME="${JAVA_HOME}" +fi +# check if we have a valid JAVA_HOME and if java is not available +if [ -z "${MY_JAVA_HOME}" ] && ! type java > /dev/null 2> /dev/null; then + echo "Please specify JAVA_HOME. Either in Flink config ./conf/flink-conf.yaml or as system-wide JAVA_HOME." + exit 1 +else + JAVA_HOME="${MY_JAVA_HOME}" +fi + +UNAME=$(uname -s) +if [ "${UNAME:0:6}" == "CYGWIN" ]; then + JAVA_RUN=java +else + if [[ -d "$JAVA_HOME" ]]; then + JAVA_RUN="$JAVA_HOME"/bin/java + else + JAVA_RUN=java + fi +fi + +# Define HOSTNAME if it is not already set +if [ -z "${HOSTNAME}" ]; then + HOSTNAME=`hostname` +fi + +IS_NUMBER="^[0-9]+$" + +# Verify that NUMA tooling is available +command -v numactl >/dev/null 2>&1 +if [[ $? -ne 0 ]]; then + FLINK_TM_COMPUTE_NUMA="false" +else + # Define FLINK_TM_COMPUTE_NUMA if it is not already set + if [ -z "${FLINK_TM_COMPUTE_NUMA}" ]; then + FLINK_TM_COMPUTE_NUMA=$(readFromConfig ${KEY_TASKM_COMPUTE_NUMA} "false" "${YAML_CONF}") + fi +fi + +if [ -z "${MAX_LOG_FILE_NUMBER}" ]; then + MAX_LOG_FILE_NUMBER=$(readFromConfig ${KEY_ENV_LOG_MAX} ${DEFAULT_ENV_LOG_MAX} "${YAML_CONF}") + export MAX_LOG_FILE_NUMBER +fi + +if [ -z "${FLINK_LOG_DIR}" ]; then + FLINK_LOG_DIR=$(readFromConfig ${KEY_ENV_LOG_DIR} "${DEFAULT_FLINK_LOG_DIR}" "${YAML_CONF}") +fi + +if [ -z "${YARN_CONF_DIR}" ]; then + YARN_CONF_DIR=$(readFromConfig ${KEY_ENV_YARN_CONF_DIR} "${DEFAULT_YARN_CONF_DIR}" "${YAML_CONF}") +fi + +if [ -z "${HADOOP_CONF_DIR}" ]; then + HADOOP_CONF_DIR=$(readFromConfig ${KEY_ENV_HADOOP_CONF_DIR} "${DEFAULT_HADOOP_CONF_DIR}" "${YAML_CONF}") +fi + +if [ -z "${HBASE_CONF_DIR}" ]; then + HBASE_CONF_DIR=$(readFromConfig ${KEY_ENV_HBASE_CONF_DIR} "${DEFAULT_HBASE_CONF_DIR}" "${YAML_CONF}") +fi + +if [ -z "${FLINK_PID_DIR}" ]; then + FLINK_PID_DIR=$(readFromConfig ${KEY_ENV_PID_DIR} "${DEFAULT_ENV_PID_DIR}" "${YAML_CONF}") +fi + +if [ -z "${FLINK_ENV_JAVA_OPTS}" ]; then + FLINK_ENV_JAVA_OPTS=$(readFromConfig ${KEY_ENV_JAVA_OPTS} "${DEFAULT_ENV_JAVA_OPTS}" "${YAML_CONF}") + + # Remove leading and ending double quotes (if present) of value + FLINK_ENV_JAVA_OPTS="$( echo "${FLINK_ENV_JAVA_OPTS}" | sed -e 's/^"//' -e 's/"$//' )" +fi + +if [ -z "${FLINK_ENV_JAVA_OPTS_JM}" ]; then + FLINK_ENV_JAVA_OPTS_JM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_JM} "${DEFAULT_ENV_JAVA_OPTS_JM}" "${YAML_CONF}") + # Remove leading and ending double quotes (if present) of value + FLINK_ENV_JAVA_OPTS_JM="$( echo "${FLINK_ENV_JAVA_OPTS_JM}" | sed -e 's/^"//' -e 's/"$//' )" +fi + +if [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then + FLINK_ENV_JAVA_OPTS_TM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_TM} "${DEFAULT_ENV_JAVA_OPTS_TM}" "${YAML_CONF}") + # Remove leading and ending double quotes (if present) of value + FLINK_ENV_JAVA_OPTS_TM="$( echo "${FLINK_ENV_JAVA_OPTS_TM}" | sed -e 's/^"//' -e 's/"$//' )" +fi + +if [ -z "${FLINK_ENV_JAVA_OPTS_HS}" ]; then + FLINK_ENV_JAVA_OPTS_HS=$(readFromConfig ${KEY_ENV_JAVA_OPTS_HS} "${DEFAULT_ENV_JAVA_OPTS_HS}" "${YAML_CONF}") + # Remove leading and ending double quotes (if present) of value + FLINK_ENV_JAVA_OPTS_HS="$( echo "${FLINK_ENV_JAVA_OPTS_HS}" | sed -e 's/^"//' -e 's/"$//' )" +fi + +if [ -z "${FLINK_ENV_JAVA_OPTS_CLI}" ]; then + FLINK_ENV_JAVA_OPTS_CLI=$(readFromConfig ${KEY_ENV_JAVA_OPTS_CLI} "${DEFAULT_ENV_JAVA_OPTS_CLI}" "${YAML_CONF}") + # Remove leading and ending double quotes (if present) of value + FLINK_ENV_JAVA_OPTS_CLI="$( echo "${FLINK_ENV_JAVA_OPTS_CLI}" | sed -e 's/^"//' -e 's/"$//' )" +fi + +if [ -z "${FLINK_SSH_OPTS}" ]; then + FLINK_SSH_OPTS=$(readFromConfig ${KEY_ENV_SSH_OPTS} "${DEFAULT_ENV_SSH_OPTS}" "${YAML_CONF}") +fi + +# Define ZK_HEAP if it is not already set +if [ -z "${ZK_HEAP}" ]; then + ZK_HEAP=$(readFromConfig ${KEY_ZK_HEAP_MB} 0 "${YAML_CONF}") +fi + +# High availability +if [ -z "${HIGH_AVAILABILITY}" ]; then + HIGH_AVAILABILITY=$(readFromConfig ${KEY_HIGH_AVAILABILITY} "" "${YAML_CONF}") + if [ -z "${HIGH_AVAILABILITY}" ]; then + # Try deprecated value + DEPRECATED_HA=$(readFromConfig "recovery.mode" "" "${YAML_CONF}") + if [ -z "${DEPRECATED_HA}" ]; then + HIGH_AVAILABILITY="none" + elif [ ${DEPRECATED_HA} == "standalone" ]; then + # Standalone is now 'none' + HIGH_AVAILABILITY="none" + else + HIGH_AVAILABILITY=${DEPRECATED_HA} + fi + fi +fi + +# Arguments for the JVM. Used for job and task manager JVMs. +# DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys +# JobManagerOptions#TOTAL_PROCESS_MEMORY and TaskManagerOptions#TOTAL_PROCESS_MEMORY for that! +if [ -z "${JVM_ARGS}" ]; then + JVM_ARGS="" +fi + +# Check if deprecated HADOOP_HOME is set, and specify config path to HADOOP_CONF_DIR if it's empty. +if [ -z "$HADOOP_CONF_DIR" ]; then + if [ -n "$HADOOP_HOME" ]; then + # HADOOP_HOME is set. Check if its a Hadoop 1.x or 2.x HADOOP_HOME path + if [ -d "$HADOOP_HOME/conf" ]; then + # It's Hadoop 1.x + HADOOP_CONF_DIR="$HADOOP_HOME/conf" + fi + if [ -d "$HADOOP_HOME/etc/hadoop" ]; then + # It's Hadoop 2.2+ + HADOOP_CONF_DIR="$HADOOP_HOME/etc/hadoop" + fi + fi +fi + +# if neither HADOOP_CONF_DIR nor HADOOP_CLASSPATH are set, use some common default (if available) +if [ -z "$HADOOP_CONF_DIR" ] && [ -z "$HADOOP_CLASSPATH" ]; then + if [ -d "/etc/hadoop/conf" ]; then + echo "Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or HADOOP_CLASSPATH was set." + HADOOP_CONF_DIR="/etc/hadoop/conf" + fi +fi + +# Check if deprecated HBASE_HOME is set, and specify config path to HBASE_CONF_DIR if it's empty. +if [ -z "$HBASE_CONF_DIR" ]; then + if [ -n "$HBASE_HOME" ]; then + # HBASE_HOME is set. + if [ -d "$HBASE_HOME/conf" ]; then + HBASE_CONF_DIR="$HBASE_HOME/conf" + fi + fi +fi + +# try and set HBASE_CONF_DIR to some common default if it's not set +if [ -z "$HBASE_CONF_DIR" ]; then + if [ -d "/etc/hbase/conf" ]; then + echo "Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set." + HBASE_CONF_DIR="/etc/hbase/conf" + fi +fi + +INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}" + +if [ -n "${HBASE_CONF_DIR}" ]; then + INTERNAL_HADOOP_CLASSPATHS="${INTERNAL_HADOOP_CLASSPATHS}:${HBASE_CONF_DIR}" +fi + +# Auxilliary function which extracts the name of host from a line which +# also potentially includes topology information and the taskManager type +extractHostName() { + # handle comments: extract first part of string (before first # character) + WORKER=`echo $1 | cut -d'#' -f 1` + + # Extract the hostname from the network hierarchy + if [[ "$WORKER" =~ ^.*/([0-9a-zA-Z.-]+)$ ]]; then + WORKER=${BASH_REMATCH[1]} + fi + + echo $WORKER +} + +readMasters() { + MASTERS_FILE="${FLINK_CONF_DIR}/masters" + + if [[ ! -f "${MASTERS_FILE}" ]]; then + echo "No masters file. Please specify masters in 'conf/masters'." + exit 1 + fi + + MASTERS=() + WEBUIPORTS=() + + MASTERS_ALL_LOCALHOST=true + GOON=true + while $GOON; do + read line || GOON=false + HOSTWEBUIPORT=$( extractHostName $line) + + if [ -n "$HOSTWEBUIPORT" ]; then + HOST=$(echo $HOSTWEBUIPORT | cut -f1 -d:) + WEBUIPORT=$(echo $HOSTWEBUIPORT | cut -s -f2 -d:) + MASTERS+=(${HOST}) + + if [ -z "$WEBUIPORT" ]; then + WEBUIPORTS+=(0) + else + WEBUIPORTS+=(${WEBUIPORT}) + fi + + if [ "${HOST}" != "localhost" ] && [ "${HOST}" != "127.0.0.1" ] ; then + MASTERS_ALL_LOCALHOST=false + fi + fi + done < "$MASTERS_FILE" +} + +readWorkers() { + WORKERS_FILE="${FLINK_CONF_DIR}/workers" + + if [[ ! -f "$WORKERS_FILE" ]]; then + echo "No workers file. Please specify workers in 'conf/workers'." + exit 1 + fi + + WORKERS=() + + WORKERS_ALL_LOCALHOST=true + GOON=true + while $GOON; do + read line || GOON=false + HOST=$( extractHostName $line) + if [ -n "$HOST" ] ; then + WORKERS+=(${HOST}) + if [ "${HOST}" != "localhost" ] && [ "${HOST}" != "127.0.0.1" ] ; then + WORKERS_ALL_LOCALHOST=false + fi + fi + done < "$WORKERS_FILE" +} + +# starts or stops TMs on all workers +# TMWorkers start|stop +TMWorkers() { + CMD=$1 + + readWorkers + + if [ ${WORKERS_ALL_LOCALHOST} = true ] ; then + # all-local setup + for worker in ${WORKERS[@]}; do + "${FLINK_BIN_DIR}"/taskmanager.sh "${CMD}" + done + else + # non-local setup + # start/stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) when available + command -v pdsh >/dev/null 2>&1 + if [[ $? -ne 0 ]]; then + for worker in ${WORKERS[@]}; do + ssh -n $FLINK_SSH_OPTS $worker -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\" &" + done + else + PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w $(IFS=, ; echo "${WORKERS[*]}") \ + "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\"" + fi + fi +} + +runBashJavaUtilsCmd() { + local cmd=$1 + local conf_dir=$2 + local class_path=$3 + local dynamic_args=${@:4} + class_path=`manglePathList "${class_path}"` + + local output=`"${JAVA_RUN}" -classpath "${class_path}" org.apache.flink.runtime.util.bash.BashJavaUtils ${cmd} --configDir "${conf_dir}" $dynamic_args 2>&1 | tail -n 1000` + if [[ $? -ne 0 ]]; then + echo "[ERROR] Cannot run BashJavaUtils to execute command ${cmd}." 1>&2 + # Print the output in case the user redirect the log to console. + echo "$output" 1>&2 + exit 1 + fi + + echo "$output" +} + +extractExecutionResults() { + local output="$1" + local expected_lines="$2" + local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:" + local execution_results + local num_lines + + execution_results=$(echo "${output}" | grep ${EXECUTION_PREFIX}) + num_lines=$(echo "${execution_results}" | wc -l) + # explicit check for empty result, becuase if execution_results is empty, then wc returns 1 + if [[ -z ${execution_results} ]]; then + echo "[ERROR] The execution result is empty." 1>&2 + exit 1 + fi + if [[ ${num_lines} -ne ${expected_lines} ]]; then + echo "[ERROR] The execution results has unexpected number of lines, expected: ${expected_lines}, actual: ${num_lines}." 1>&2 + echo "[ERROR] An execution result line is expected following the prefix '${EXECUTION_PREFIX}'" 1>&2 + echo "$output" 1>&2 + exit 1 + fi + + echo "${execution_results//${EXECUTION_PREFIX}/}" +} + +extractLoggingOutputs() { + local output="$1" + local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:" + + echo "${output}" | grep -v ${EXECUTION_PREFIX} +} + +parseResourceParamsAndExportLogs() { + local cmd=$1 + java_utils_output=$(runBashJavaUtilsCmd ${cmd} "${FLINK_CONF_DIR}" "${FLINK_BIN_DIR}/bash-java-utils.jar:$(findFlinkDistJar)" "$@") + logging_output=$(extractLoggingOutputs "${java_utils_output}") + params_output=$(extractExecutionResults "${java_utils_output}" 2) + + if [[ $? -ne 0 ]]; then + echo "[ERROR] Could not get JVM parameters and dynamic configurations properly." + echo "[ERROR] Raw output from BashJavaUtils:" + echo "$java_utils_output" + exit 1 + fi + + jvm_params=$(echo "${params_output}" | head -n1) + export JVM_ARGS="${JVM_ARGS} ${jvm_params}" + export DYNAMIC_PARAMETERS=$(IFS=" " echo "${params_output}" | tail -n1) + + export FLINK_INHERITED_LOGS=" +$FLINK_INHERITED_LOGS + +RESOURCE_PARAMS extraction logs: +jvm_params: $jvm_params +dynamic_configs: $DYNAMIC_PARAMETERS +logs: $logging_output +" +} + +parseJmArgsAndExportLogs() { + parseResourceParamsAndExportLogs GET_JM_RESOURCE_PARAMS +} + +parseTmArgsAndExportLogs() { + parseResourceParamsAndExportLogs GET_TM_RESOURCE_PARAMS +} diff --git a/TWA-PIC/flink/bin/find-flink-home.sh b/TWA-PIC/flink/bin/find-flink-home.sh new file mode 100644 index 0000000..e0fe95f --- /dev/null +++ b/TWA-PIC/flink/bin/find-flink-home.sh @@ -0,0 +1,28 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +CURRENT_DIR="$( cd "$(dirname "$0")" ; pwd -P )" +FIND_FLINK_HOME_PYTHON_SCRIPT="$CURRENT_DIR/find_flink_home.py" + +if [ ! -f "$FIND_FLINK_HOME_PYTHON_SCRIPT" ]; then + export FLINK_HOME="$( cd "$CURRENT_DIR"/.. ; pwd -P )" +else + PYFLINK_PYTHON="${PYFLINK_PYTHON:-"python"}" + export FLINK_HOME=$("$FIND_FLINK_HOME_PYTHON_SCRIPT") +fi diff --git a/TWA-PIC/flink/bin/flink b/TWA-PIC/flink/bin/flink new file mode 100644 index 0000000..3413463 --- /dev/null +++ b/TWA-PIC/flink/bin/flink @@ -0,0 +1,55 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +target="$0" +# For the case, the executable has been directly symlinked, figure out +# the correct bin path by following its symlink up to an upper bound. +# Note: we can't use the readlink utility here if we want to be POSIX +# compatible. +iteration=0 +while [ -L "$target" ]; do + if [ "$iteration" -gt 100 ]; then + echo "Cannot resolve path: You have a cyclic symlink in $target." + break + fi + ls=`ls -ld -- "$target"` + target=`expr "$ls" : '.* -> \(.*\)$'` + iteration=$((iteration + 1)) +done + +# Convert relative path to absolute path +bin=`dirname "$target"` + +# get flink config +. "$bin"/config.sh + +if [ "$FLINK_IDENT_STRING" = "" ]; then + FLINK_IDENT_STRING="$USER" +fi + +CC_CLASSPATH=`constructFlinkClassPath` + +log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-client-$HOSTNAME.log +log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml) + +# Add Client-specific JVM options +FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_CLI}" + +# Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems +exec "${JAVA_RUN}" $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@" diff --git a/TWA-PIC/flink/bin/flink-console.sh b/TWA-PIC/flink/bin/flink-console.sh new file mode 100644 index 0000000..6ebe2ac --- /dev/null +++ b/TWA-PIC/flink/bin/flink-console.sh @@ -0,0 +1,111 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Start a Flink service as a console application. Must be stopped with Ctrl-C +# or with SIGTERM by kill or the controlling process. +USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|kubernetes-session|kubernetes-application|kubernetes-taskmanager) [args]" + +SERVICE=$1 +ARGS=("${@:2}") # get remaining arguments as array + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +case $SERVICE in + (taskexecutor) + CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner + ;; + + (historyserver) + CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer + ;; + + (zookeeper) + CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer + ;; + + (standalonesession) + CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint + ;; + + (standalonejob) + CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint + ;; + + (kubernetes-session) + CLASS_TO_RUN=org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint + ;; + + (kubernetes-application) + CLASS_TO_RUN=org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint + ;; + + (kubernetes-taskmanager) + CLASS_TO_RUN=org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner + ;; + + (*) + echo "Unknown service '${SERVICE}'. $USAGE." + exit 1 + ;; +esac + +FLINK_TM_CLASSPATH=`constructFlinkClassPath` + +if [ "$FLINK_IDENT_STRING" = "" ]; then + FLINK_IDENT_STRING="$USER" +fi + +pid=$FLINK_PID_DIR/flink-$FLINK_IDENT_STRING-$SERVICE.pid +mkdir -p "$FLINK_PID_DIR" +# The lock needs to be released after use because this script is started foreground +command -v flock >/dev/null 2>&1 +flock_exist=$? +if [[ ${flock_exist} -eq 0 ]]; then + exec 200<"$FLINK_PID_DIR" + flock 200 +fi +# Remove the pid file when all the processes are dead +if [ -f "$pid" ]; then + all_dead=0 + while read each_pid; do + # Check whether the process is still running + kill -0 $each_pid > /dev/null 2>&1 + [[ $? -eq 0 ]] && all_dead=1 + done < "$pid" + [ ${all_dead} -eq 0 ] && rm $pid +fi +id=$([ -f "$pid" ] && echo $(wc -l < "$pid") || echo "0") + +FLINK_LOG_PREFIX="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-${SERVICE}-${id}-${HOSTNAME}" +log="${FLINK_LOG_PREFIX}.log" + +log_setting=("-Dlog.file=${log}" "-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlog4j.configurationFile=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml") + +echo "Starting $SERVICE as a console application on host $HOSTNAME." + +# Add the current process id to pid file +echo $$ >> "$pid" 2>/dev/null + +# Release the lock because the java process runs in the foreground and would block other processes from modifying the pid file +[[ ${flock_exist} -eq 0 ]] && flock -u 200 + +exec "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" diff --git a/TWA-PIC/flink/bin/flink-daemon.sh b/TWA-PIC/flink/bin/flink-daemon.sh new file mode 100644 index 0000000..67fe698 --- /dev/null +++ b/TWA-PIC/flink/bin/flink-daemon.sh @@ -0,0 +1,194 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Start/stop a Flink daemon. +USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]" + +STARTSTOP=$1 +DAEMON=$2 +ARGS=("${@:3}") # get remaining arguments as array + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +case $DAEMON in + (taskexecutor) + CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner + ;; + + (zookeeper) + CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer + ;; + + (historyserver) + CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer + ;; + + (standalonesession) + CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint + ;; + + (standalonejob) + CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint + ;; + + (*) + echo "Unknown daemon '${DAEMON}'. $USAGE." + exit 1 + ;; +esac + +if [ "$FLINK_IDENT_STRING" = "" ]; then + FLINK_IDENT_STRING="$USER" +fi + +FLINK_TM_CLASSPATH=`constructFlinkClassPath` + +pid=$FLINK_PID_DIR/flink-$FLINK_IDENT_STRING-$DAEMON.pid + +mkdir -p "$FLINK_PID_DIR" + +# Log files for daemons are indexed from the process ID's position in the PID +# file. The following lock prevents a race condition during daemon startup +# when multiple daemons read, index, and write to the PID file concurrently. +# The lock is created on the PID directory since a lock file cannot be safely +# removed. The daemon is started with the lock closed and the lock remains +# active in this script until the script exits. +command -v flock >/dev/null 2>&1 +if [[ $? -eq 0 ]]; then + exec 200<"$FLINK_PID_DIR" + flock 200 +fi + +# Ascending ID depending on number of lines in pid file. +# This allows us to start multiple daemon of each type. +id=$([ -f "$pid" ] && echo $(wc -l < "$pid") || echo "0") + +FLINK_LOG_PREFIX="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-${DAEMON}-${id}-${HOSTNAME}" +log="${FLINK_LOG_PREFIX}.log" +out="${FLINK_LOG_PREFIX}.out" + +log_setting=("-Dlog.file=${log}" "-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j.properties" "-Dlog4j.configurationFile=file:${FLINK_CONF_DIR}/log4j.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback.xml") + +function guaranteed_kill { + to_stop_pid=$1 + daemon=$2 + + # send sigterm for graceful shutdown + kill $to_stop_pid + # if timeout exists, use it + if command -v timeout &> /dev/null ; then + # wait 10 seconds for process to stop. By default, Flink kills the JVM 5 seconds after sigterm. + timeout 10 tail --pid=$to_stop_pid -f /dev/null + if [ "$?" -eq 124 ]; then + echo "Daemon $daemon didn't stop within 10 seconds. Killing it." + # send sigkill + kill -9 $to_stop_pid + fi + fi +} + +case $STARTSTOP in + + (start) + + # Print a warning if daemons are already running on host + if [ -f "$pid" ]; then + active=() + while IFS='' read -r p || [[ -n "$p" ]]; do + kill -0 $p >/dev/null 2>&1 + if [ $? -eq 0 ]; then + active+=($p) + fi + done < "${pid}" + + count="${#active[@]}" + + if [ ${count} -gt 0 ]; then + echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME." + fi + fi + + # Evaluate user options for local variable expansion + FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS}) + + echo "Starting $DAEMON daemon on host $HOSTNAME." + "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 200<&- 2>&1 < /dev/null & + + mypid=$! + + # Add to pid file if successful start + if [[ ${mypid} =~ ${IS_NUMBER} ]] && kill -0 $mypid > /dev/null 2>&1 ; then + echo $mypid >> "$pid" + else + echo "Error starting $DAEMON daemon." + exit 1 + fi + ;; + + (stop) + if [ -f "$pid" ]; then + # Remove last in pid file + to_stop=$(tail -n 1 "$pid") + + if [ -z $to_stop ]; then + rm "$pid" # If all stopped, clean up pid file + echo "No $DAEMON daemon to stop on host $HOSTNAME." + else + sed \$d "$pid" > "$pid.tmp" # all but last line + + # If all stopped, clean up pid file + [ $(wc -l < "$pid.tmp") -eq 0 ] && rm "$pid" "$pid.tmp" || mv "$pid.tmp" "$pid" + + if kill -0 $to_stop > /dev/null 2>&1; then + echo "Stopping $DAEMON daemon (pid: $to_stop) on host $HOSTNAME." + guaranteed_kill $to_stop $DAEMON + else + echo "No $DAEMON daemon (pid: $to_stop) is running anymore on $HOSTNAME." + fi + fi + else + echo "No $DAEMON daemon to stop on host $HOSTNAME." + fi + ;; + + (stop-all) + if [ -f "$pid" ]; then + mv "$pid" "${pid}.tmp" + + while read to_stop; do + if kill -0 $to_stop > /dev/null 2>&1; then + echo "Stopping $DAEMON daemon (pid: $to_stop) on host $HOSTNAME." + guaranteed_kill $to_stop $DAEMON + else + echo "Skipping $DAEMON daemon (pid: $to_stop), because it is not running anymore on $HOSTNAME." + fi + done < "${pid}.tmp" + rm "${pid}.tmp" + fi + ;; + + (*) + echo "Unexpected argument '$STARTSTOP'. $USAGE." + exit 1 + ;; + +esac diff --git a/TWA-PIC/flink/bin/historyserver.sh b/TWA-PIC/flink/bin/historyserver.sh new file mode 100644 index 0000000..3bc3049 --- /dev/null +++ b/TWA-PIC/flink/bin/historyserver.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Start/stop a Flink HistoryServer +USAGE="Usage: historyserver.sh (start|start-foreground|stop)" + +STARTSTOP=$1 + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then + export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_HS}" + args=("--configDir" "${FLINK_CONF_DIR}") +fi + +if [[ $STARTSTOP == "start-foreground" ]]; then + exec "${FLINK_BIN_DIR}"/flink-console.sh historyserver "${args[@]}" +else + "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP historyserver "${args[@]}" +fi diff --git a/TWA-PIC/flink/bin/jobmanager.sh b/TWA-PIC/flink/bin/jobmanager.sh new file mode 100644 index 0000000..35fbe2c --- /dev/null +++ b/TWA-PIC/flink/bin/jobmanager.sh @@ -0,0 +1,64 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Start/stop a Flink JobManager. +USAGE="Usage: jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all" + +STARTSTOP=$1 +HOST=$2 # optional when starting multiple instances +WEBUIPORT=$3 # optional when starting multiple instances + +if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then + echo $USAGE + exit 1 +fi + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +ENTRYPOINT=standalonesession + +if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then + # Add JobManager-specific JVM options + export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}" + parseJmArgsAndExportLogs "${ARGS[@]}" + + args=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "cluster") + if [ ! -z $HOST ]; then + args+=("--host") + args+=("${HOST}") + fi + + if [ ! -z $WEBUIPORT ]; then + args+=("--webui-port") + args+=("${WEBUIPORT}") + fi + + if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then + args+=(${DYNAMIC_PARAMETERS[@]}) + fi +fi + +if [[ $STARTSTOP == "start-foreground" ]]; then + exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${args[@]}" +else + "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${args[@]}" +fi diff --git a/TWA-PIC/flink/bin/kubernetes-jobmanager.sh b/TWA-PIC/flink/bin/kubernetes-jobmanager.sh new file mode 100644 index 0000000..0513123 --- /dev/null +++ b/TWA-PIC/flink/bin/kubernetes-jobmanager.sh @@ -0,0 +1,42 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Start a Flink JobManager for native Kubernetes. +# NOTE: This script is not meant to be started manually. It will be used by native Kubernetes integration. + +USAGE="Usage: kubernetes-jobmanager.sh kubernetes-session|kubernetes-application [args]" + +ENTRY_POINT_NAME=$1 + +ARGS=("${@:2}") + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +# Add JobManager specific JVM options +export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}" +parseJmArgsAndExportLogs "${ARGS[@]}" + +if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then + ARGS=(${DYNAMIC_PARAMETERS[@]} "${ARGS[@]}") +fi + +exec "${FLINK_BIN_DIR}"/flink-console.sh ${ENTRY_POINT_NAME} "${ARGS[@]}" diff --git a/TWA-PIC/flink/bin/kubernetes-session.sh b/TWA-PIC/flink/bin/kubernetes-session.sh new file mode 100644 index 0000000..559a776 --- /dev/null +++ b/TWA-PIC/flink/bin/kubernetes-session.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +# get Flink config +. "$bin"/config.sh + +if [ "$FLINK_IDENT_STRING" = "" ]; then + FLINK_IDENT_STRING="$USER" +fi + +JVM_ARGS="$JVM_ARGS -Xmx512m" + +CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS` + +log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-k8s-session-$HOSTNAME.log +log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-session.properties -Dlog4j.configurationFile=file:"$FLINK_CONF_DIR"/log4j-session.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback-session.xml" + +export FLINK_CONF_DIR + +"$JAVA_RUN" $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.kubernetes.cli.KubernetesSessionCli "$@" diff --git a/TWA-PIC/flink/bin/kubernetes-taskmanager.sh b/TWA-PIC/flink/bin/kubernetes-taskmanager.sh new file mode 100644 index 0000000..b11fb89 --- /dev/null +++ b/TWA-PIC/flink/bin/kubernetes-taskmanager.sh @@ -0,0 +1,45 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Start a Flink TaskManager for native Kubernetes. +# NOTE: This script is not meant to be started manually. It will be used by native Kubernetes integration. + +USAGE="Usage: kubernetes-taskmanager.sh [args]" + +ENTRYPOINT=kubernetes-taskmanager + +ARGS=("${@:1}") + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +# if no other JVM options are set, set the GC to G1 +if [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then + export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC" +fi + +# Add TaskManager specific JVM options +export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}" +export JVM_ARGS="$JVM_ARGS $FLINK_TM_JVM_MEM_OPTS" + +ARGS=("--configDir" "${FLINK_CONF_DIR}" "${ARGS[@]}") + +exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${ARGS[@]}" diff --git a/TWA-PIC/flink/bin/mesos-appmaster-job.sh b/TWA-PIC/flink/bin/mesos-appmaster-job.sh new file mode 100644 index 0000000..5ae7396 --- /dev/null +++ b/TWA-PIC/flink/bin/mesos-appmaster-job.sh @@ -0,0 +1,23 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +bin=$(dirname "$0") +bin=$(cd "${bin}" || exit; pwd) + +exec "${bin}"/mesos-jobmanager.sh "org.apache.flink.mesos.entrypoint.MesosJobClusterEntrypoint" "$@" diff --git a/TWA-PIC/flink/bin/mesos-appmaster.sh b/TWA-PIC/flink/bin/mesos-appmaster.sh new file mode 100644 index 0000000..2939e31 --- /dev/null +++ b/TWA-PIC/flink/bin/mesos-appmaster.sh @@ -0,0 +1,23 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +bin=$(dirname "$0") +bin=$(cd "${bin}" || exit; pwd) + +exec "${bin}"/mesos-jobmanager.sh "org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint" "$@" diff --git a/TWA-PIC/flink/bin/mesos-jobmanager.sh b/TWA-PIC/flink/bin/mesos-jobmanager.sh new file mode 100644 index 0000000..b786e18 --- /dev/null +++ b/TWA-PIC/flink/bin/mesos-jobmanager.sh @@ -0,0 +1,52 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +ENTRY_POINT=$1 +ARGS=("${@:2}") + +bin=$(dirname "$0") +bin=$(cd "${bin}" || exit; pwd) + +# get Flink config +. "${bin}"/config.sh + +parseJmArgsAndExportLogs "${ARGS[@]}" + +if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then + ARGS=(${DYNAMIC_PARAMETERS[@]} "${ARGS[@]}") +fi + +if [ "$FLINK_IDENT_STRING" = "" ]; then + FLINK_IDENT_STRING="$USER" +fi + +CC_CLASSPATH=$(manglePathList "$(constructFlinkClassPath):${INTERNAL_HADOOP_CLASSPATHS}") + +log="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-mesos-appmaster-${HOSTNAME}.log" +log_setting="-Dlog.file=${log} -Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j.properties -Dlog4j.configurationFile=file:${FLINK_CONF_DIR}/log4j.properties -Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback.xml" + +"${JAVA_RUN}" ${JVM_ARGS} -classpath ${CC_CLASSPATH} ${log_setting} ${ENTRY_POINT} "${ARGS[@]}" + +rc=$? + +if [[ ${rc} -ne 0 ]]; then + echo "Error while starting the mesos application master. Please check ${log} for more details." +fi + +exit ${rc} diff --git a/TWA-PIC/flink/bin/mesos-taskmanager.sh b/TWA-PIC/flink/bin/mesos-taskmanager.sh new file mode 100644 index 0000000..6c65c2a --- /dev/null +++ b/TWA-PIC/flink/bin/mesos-taskmanager.sh @@ -0,0 +1,43 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +# get Flink config +. "$bin"/config.sh + +CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS` + +log=flink-taskmanager.log +log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlog4j.configurationFile=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml" + +# Add precomputed memory JVM options +if [ -z "${FLINK_ENV_JAVA_OPTS_MEM}" ]; then + FLINK_ENV_JAVA_OPTS_MEM="" +fi +export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_MEM}" + +# Add TaskManager-specific JVM options +export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}" + +ENTRY_POINT=org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner + +exec "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} -classpath "$CC_CLASSPATH" $log_setting ${ENTRY_POINT} "$@" + diff --git a/TWA-PIC/flink/bin/pyflink-shell.sh b/TWA-PIC/flink/bin/pyflink-shell.sh new file mode 100644 index 0000000..a616abb --- /dev/null +++ b/TWA-PIC/flink/bin/pyflink-shell.sh @@ -0,0 +1,84 @@ +#!/bin/bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` +. "$bin"/find-flink-home.sh + +_FLINK_HOME_DETERMINED=1 + +. "$FLINK_HOME"/bin/config.sh + +FLINK_CLASSPATH=`constructFlinkClassPath` +PYTHON_JAR_PATH=`echo "$FLINK_OPT_DIR"/flink-python*.jar` + + +PYFLINK_PYTHON="${PYFLINK_PYTHON:-"python"}" + +# So that python can find out Flink's Jars +export FLINK_BIN_DIR=$FLINK_BIN_DIR +export FLINK_HOME + +# Add pyflink & py4j & cloudpickle to PYTHONPATH +export PYTHONPATH="$FLINK_OPT_DIR/python/pyflink.zip:$PYTHONPATH" +PY4J_ZIP=`echo "$FLINK_OPT_DIR"/python/py4j-*-src.zip` +CLOUDPICKLE_ZIP=`echo "$FLINK_OPT_DIR"/python/cloudpickle-*-src.zip` +export PYTHONPATH="$PY4J_ZIP:$CLOUDPICKLE_ZIP:$PYTHONPATH" + +PARSER="org.apache.flink.client.python.PythonShellParser" +function parse_options() { + "${JAVA_RUN}" ${JVM_ARGS} -cp ${FLINK_CLASSPATH}:${PYTHON_JAR_PATH} ${PARSER} "$@" + printf "%d\0" $? +} + +# Turn off posix mode since it does not allow process substitution +set +o posix +# If the command has option --help | -h, the script will directly +# run the PythonShellParser program to stdout the help message. +if [[ "$@" =~ '--help' ]] || [[ "$@" =~ '-h' ]]; then + "${JAVA_RUN}" ${JVM_ARGS} -cp ${FLINK_CLASSPATH}:${PYTHON_JAR_PATH} ${PARSER} "$@" + exit 0 +fi +OPTIONS=() +while IFS= read -d '' -r ARG; do + OPTIONS+=("$ARG") +done < <(parse_options "$@") + +COUNT=${#OPTIONS[@]} +LAST=$((COUNT - 1)) +LAUNCHER_EXIT_CODE=${OPTIONS[$LAST]} + +# Certain JVM failures result in errors being printed to stdout (instead of stderr), which causes +# the code that parses the output of the launcher to get confused. In those cases, check if the +# exit code is an integer, and if it's not, handle it as a special error case. +if ! [[ ${LAUNCHER_EXIT_CODE} =~ ^[0-9]+$ ]]; then + echo "${OPTIONS[@]}" | head -n-1 1>&2 + exit 1 +fi + +if [[ ${LAUNCHER_EXIT_CODE} != 0 ]]; then + exit ${LAUNCHER_EXIT_CODE} +fi + +OPTIONS=("${OPTIONS[@]:0:$LAST}") + +export SUBMIT_ARGS=${OPTIONS[@]} + +# -i: interactive +# -m: execute shell.py in the zip package +${PYFLINK_PYTHON} -i -m pyflink.shell diff --git a/TWA-PIC/flink/bin/set_flink_yarn_env.sh b/TWA-PIC/flink/bin/set_flink_yarn_env.sh new file mode 100644 index 0000000..70314f9 --- /dev/null +++ b/TWA-PIC/flink/bin/set_flink_yarn_env.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +echo -e "\n#flink\nexport FLINK_HOME=/home/tsg/olap/flink-1.13.1\nexport PATH=\$FLINK_HOME/bin:\$PATH" >> /etc/profile.d/flink.sh +chmod +x /etc/profile.d/flink.sh +source /etc/profile + + diff --git a/TWA-PIC/flink/bin/sql-client.sh b/TWA-PIC/flink/bin/sql-client.sh new file mode 100644 index 0000000..759f0c6 --- /dev/null +++ b/TWA-PIC/flink/bin/sql-client.sh @@ -0,0 +1,88 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +################################################################################ +# Adopted from "flink" bash script +################################################################################ + +target="$0" +# For the case, the executable has been directly symlinked, figure out +# the correct bin path by following its symlink up to an upper bound. +# Note: we can't use the readlink utility here if we want to be POSIX +# compatible. +iteration=0 +while [ -L "$target" ]; do + if [ "$iteration" -gt 100 ]; then + echo "Cannot resolve path: You have a cyclic symlink in $target." + break + fi + ls=`ls -ld -- "$target"` + target=`expr "$ls" : '.* -> \(.*\)$'` + iteration=$((iteration + 1)) +done + +# Convert relative path to absolute path +bin=`dirname "$target"` + +# get flink config +. "$bin"/config.sh + +if [ "$FLINK_IDENT_STRING" = "" ]; then + FLINK_IDENT_STRING="$USER" +fi + +CC_CLASSPATH=`constructFlinkClassPath` + +################################################################################ +# SQL client specific logic +################################################################################ + +log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-sql-client-$HOSTNAME.log +log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml) + +# get path of jar in /opt if it exist +FLINK_SQL_CLIENT_JAR=$(find "$FLINK_OPT_DIR" -regex ".*flink-sql-client.*.jar") + +# add flink-python jar to the classpath +if [[ ! "$CC_CLASSPATH" =~ .*flink-python.*.jar ]]; then + FLINK_PYTHON_JAR=$(find "$FLINK_OPT_DIR" -regex ".*flink-python.*.jar") + if [ -n "$FLINK_PYTHON_JAR" ]; then + CC_CLASSPATH="$CC_CLASSPATH:$FLINK_PYTHON_JAR" + fi +fi + +# check if SQL client is already in classpath and must not be shipped manually +if [[ "$CC_CLASSPATH" =~ .*flink-sql-client.*.jar ]]; then + + # start client without jar + exec "$JAVA_RUN" $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.table.client.SqlClient "$@" + +# check if SQL client jar is in /opt +elif [ -n "$FLINK_SQL_CLIENT_JAR" ]; then + + # start client with jar + exec "$JAVA_RUN" $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS:$FLINK_SQL_CLIENT_JAR"`" org.apache.flink.table.client.SqlClient "$@" --jar "`manglePath $FLINK_SQL_CLIENT_JAR`" + +# write error message to stderr +else + (>&2 echo "[ERROR] Flink SQL Client JAR file 'flink-sql-client*.jar' neither found in classpath nor /opt directory should be located in $FLINK_OPT_DIR.") + + # exit to force process failure + exit 1 +fi diff --git a/TWA-PIC/flink/bin/standalone-job.sh b/TWA-PIC/flink/bin/standalone-job.sh new file mode 100644 index 0000000..b4cfa20 --- /dev/null +++ b/TWA-PIC/flink/bin/standalone-job.sh @@ -0,0 +1,55 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Start/stop a Flink JobManager. +USAGE="Usage: standalone-job.sh ((start|start-foreground))|stop [args]" + +STARTSTOP=$1 +ENTRY_POINT_NAME="standalonejob" + +if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]]; then + echo $USAGE + exit 1 +fi + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +# Startup parameters +ARGS=("${@:2}") + +if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then + # Add cluster entry point specific JVM options + export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}" + parseJmArgsAndExportLogs "${ARGS[@]}" + + if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then + ARGS=(${DYNAMIC_PARAMETERS[@]} "${ARGS[@]}") + fi +fi + +ARGS=("--configDir" "${FLINK_CONF_DIR}" "${ARGS[@]}") + +if [[ $STARTSTOP == "start-foreground" ]]; then + exec "${FLINK_BIN_DIR}"/flink-console.sh ${ENTRY_POINT_NAME} "${ARGS[@]}" +else + "${FLINK_BIN_DIR}"/flink-daemon.sh ${STARTSTOP} ${ENTRY_POINT_NAME} "${ARGS[@]}" +fi diff --git a/TWA-PIC/flink/bin/start-cluster.sh b/TWA-PIC/flink/bin/start-cluster.sh new file mode 100644 index 0000000..720b33c --- /dev/null +++ b/TWA-PIC/flink/bin/start-cluster.sh @@ -0,0 +1,53 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +# Start the JobManager instance(s) +shopt -s nocasematch +if [[ $HIGH_AVAILABILITY == "zookeeper" ]]; then + # HA Mode + readMasters + + echo "Starting HA cluster with ${#MASTERS[@]} masters." + + for ((i=0;i<${#MASTERS[@]};++i)); do + master=${MASTERS[i]} + webuiport=${WEBUIPORTS[i]} + + if [ ${MASTERS_ALL_LOCALHOST} = true ] ; then + "${FLINK_BIN_DIR}"/jobmanager.sh start "${master}" "${webuiport}" + else + ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/jobmanager.sh\" start ${master} ${webuiport} &" + fi + done + +else + echo "Starting cluster." + + # Start single JobManager on this machine + "$FLINK_BIN_DIR"/jobmanager.sh start +fi +shopt -u nocasematch + +# Start TaskManager instance(s) +TMWorkers start diff --git a/TWA-PIC/flink/bin/start-zookeeper-quorum.sh b/TWA-PIC/flink/bin/start-zookeeper-quorum.sh new file mode 100644 index 0000000..d5a7593 --- /dev/null +++ b/TWA-PIC/flink/bin/start-zookeeper-quorum.sh @@ -0,0 +1,46 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +# Starts a ZooKeeper quorum as configured in $FLINK_CONF/zoo.cfg + +ZK_CONF="$FLINK_CONF_DIR/zoo.cfg" +if [ ! -f "$ZK_CONF" ]; then + echo "[ERROR] No ZooKeeper configuration file found in '$ZK_CONF'." + exit 1 +fi + +# Extract server.X from ZooKeeper config and start instances +while read server ; do + server=$(echo -e "${server}" | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//') # trim + + # match server.id=address[:port[:port]] + if [[ $server =~ ^server\.([0-9]+)[[:space:]]*\=[[:space:]]*([^: \#]+) ]]; then + id=${BASH_REMATCH[1]} + address=${BASH_REMATCH[2]} + + ssh -n $FLINK_SSH_OPTS $address -- "nohup /bin/bash -l $FLINK_BIN_DIR/zookeeper.sh start $id &" + else + echo "[WARN] Parse error. Skipping config entry '$server'." + fi +done < <(grep "^server\." "$ZK_CONF") diff --git a/TWA-PIC/flink/bin/stop-cluster.sh b/TWA-PIC/flink/bin/stop-cluster.sh new file mode 100644 index 0000000..d29b4f3 --- /dev/null +++ b/TWA-PIC/flink/bin/stop-cluster.sh @@ -0,0 +1,47 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +# Stop TaskManager instance(s) +TMWorkers stop + +# Stop JobManager instance(s) +shopt -s nocasematch +if [[ $HIGH_AVAILABILITY == "zookeeper" ]]; then + # HA Mode + readMasters + + if [ ${MASTERS_ALL_LOCALHOST} = true ] ; then + for master in ${MASTERS[@]}; do + "$FLINK_BIN_DIR"/jobmanager.sh stop + done + else + for master in ${MASTERS[@]}; do + ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/jobmanager.sh\" stop &" + done + fi + +else + "$FLINK_BIN_DIR"/jobmanager.sh stop +fi +shopt -u nocasematch diff --git a/TWA-PIC/flink/bin/stop-zookeeper-quorum.sh b/TWA-PIC/flink/bin/stop-zookeeper-quorum.sh new file mode 100644 index 0000000..ad79de8 --- /dev/null +++ b/TWA-PIC/flink/bin/stop-zookeeper-quorum.sh @@ -0,0 +1,46 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +# Stops a ZooKeeper quorum as configured in $FLINK_CONF/zoo.cfg + +ZK_CONF="$FLINK_CONF_DIR/zoo.cfg" +if [ ! -f "$ZK_CONF" ]; then + echo "[ERROR] No ZooKeeper configuration file found in '$ZK_CONF'." + exit 1 +fi + +# Extract server.X from ZooKeeper config and stop instances +while read server ; do + server=$(echo -e "${server}" | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//') # trim + + # match server.id=address[:port[:port]] + if [[ $server =~ ^server\.([0-9]+)[[:space:]]*\=[[:space:]]*([^: \#]+) ]]; then + id=${BASH_REMATCH[1]} + server=${BASH_REMATCH[2]} + + ssh -n $FLINK_SSH_OPTS $server -- "nohup /bin/bash -l $FLINK_BIN_DIR/zookeeper.sh stop &" + else + echo "[WARN] Parse error. Skipping config entry '$server'." + fi +done < <(grep "^server\." "$ZK_CONF") diff --git a/TWA-PIC/flink/bin/taskmanager.sh b/TWA-PIC/flink/bin/taskmanager.sh new file mode 100644 index 0000000..fdc6514 --- /dev/null +++ b/TWA-PIC/flink/bin/taskmanager.sh @@ -0,0 +1,80 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Start/stop a Flink TaskManager. +USAGE="Usage: taskmanager.sh (start|start-foreground|stop|stop-all)" + +STARTSTOP=$1 + +ARGS=("${@:2}") + +if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then + echo $USAGE + exit 1 +fi + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +ENTRYPOINT=taskexecutor + +if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then + + # if no other JVM options are set, set the GC to G1 + if [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then + export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC" + fi + + # Add TaskManager-specific JVM options + export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}" + + # Startup parameters + + parseTmArgsAndExportLogs "${ARGS[@]}" + + if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then + ARGS=(${DYNAMIC_PARAMETERS[@]} "${ARGS[@]}") + fi + + ARGS=("--configDir" "${FLINK_CONF_DIR}" "${ARGS[@]}") +fi + +if [[ $STARTSTOP == "start-foreground" ]]; then + exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${ARGS[@]}" +else + if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then + # Start a single TaskManager + "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}" + else + # Example output from `numactl --show` on an AWS c4.8xlarge: + # policy: default + # preferred node: current + # physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 + # cpubind: 0 1 + # nodebind: 0 1 + # membind: 0 1 + read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ") + for NODE_ID in "${NODE_LIST[@]:1}"; do + # Start a TaskManager for each NUMA node + numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}" + done + fi +fi diff --git a/TWA-PIC/flink/bin/yarn-session.sh b/TWA-PIC/flink/bin/yarn-session.sh new file mode 100644 index 0000000..f36ca34 --- /dev/null +++ b/TWA-PIC/flink/bin/yarn-session.sh @@ -0,0 +1,38 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +# get Flink config +. "$bin"/config.sh + +if [ "$FLINK_IDENT_STRING" = "" ]; then + FLINK_IDENT_STRING="$USER" +fi + +JVM_ARGS="$JVM_ARGS -Xmx512m" + +CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS` + +log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-yarn-session-$HOSTNAME.log +log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-session.properties -Dlog4j.configurationFile=file:"$FLINK_CONF_DIR"/log4j-session.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback-session.xml" + +"$JAVA_RUN" $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.yarn.cli.FlinkYarnSessionCli -j "$FLINK_LIB_DIR"/flink-dist*.jar "$@" + diff --git a/TWA-PIC/flink/bin/zookeeper.sh b/TWA-PIC/flink/bin/zookeeper.sh new file mode 100644 index 0000000..9297401 --- /dev/null +++ b/TWA-PIC/flink/bin/zookeeper.sh @@ -0,0 +1,68 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Start/stop a ZooKeeper quorum peer. +USAGE="Usage: zookeeper.sh ((start|start-foreground) peer-id)|stop|stop-all" + +STARTSTOP=$1 +PEER_ID=$2 + +if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then + echo $USAGE + exit 1 +fi + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +ZK_CONF="$FLINK_CONF_DIR/zoo.cfg" +if [ ! -f "$ZK_CONF" ]; then + echo "[ERROR] No ZooKeeper configuration file found in '$ZK_CONF'." + exit 1 +fi + +if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then + if [ -z $PEER_ID ]; then + echo "[ERROR] Missing peer id argument. $USAGE." + exit 1 + fi + + if [[ ! ${ZK_HEAP} =~ ${IS_NUMBER} ]]; then + echo "[ERROR] Configured ZooKeeper JVM heap size is not a number. Please set '$KEY_ZK_HEAP_MB' in $FLINK_CONF_FILE." + exit 1 + fi + + if [ "$ZK_HEAP" -gt 0 ]; then + export JVM_ARGS="$JVM_ARGS -Xms"$ZK_HEAP"m -Xmx"$ZK_HEAP"m" + fi + + # Startup parameters + args=("--zkConfigFile" "${ZK_CONF}" "--peerId" "${PEER_ID}") +fi + +# the JMX log4j integration in ZK 3.4 does not work log4j 2 +export JVM_ARGS="$JVM_ARGS -Dzookeeper.jmx.log4j.disable=true" + +if [[ $STARTSTOP == "start-foreground" ]]; then + "${FLINK_BIN_DIR}"/flink-console.sh zookeeper "${args[@]}" +else + "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP zookeeper "${args[@]}" +fi |
