summaryrefslogtreecommitdiff
path: root/MPE/flink/bin
diff options
context:
space:
mode:
authorwangchengcheng <[email protected]>2023-07-27 15:43:51 +0800
committerwangchengcheng <[email protected]>2023-07-27 15:43:51 +0800
commit124f687daace8b85e5c74abac04bcd0a92744a8d (patch)
tree4f563326b1be67cfb51bf6a04f1ca4d953536e76 /MPE/flink/bin
parent08686ae87f9efe7a590f48db74ed133b481c85b1 (diff)
P19 23.07 online-configP19
Diffstat (limited to 'MPE/flink/bin')
-rw-r--r--MPE/flink/bin/bash-java-utils.jarbin0 -> 2010313 bytes
-rw-r--r--MPE/flink/bin/config.sh560
-rw-r--r--MPE/flink/bin/find-flink-home.sh28
-rw-r--r--MPE/flink/bin/flink55
-rw-r--r--MPE/flink/bin/flink-console.sh111
-rw-r--r--MPE/flink/bin/flink-daemon.sh194
-rw-r--r--MPE/flink/bin/historyserver.sh39
-rw-r--r--MPE/flink/bin/jobmanager.sh64
-rw-r--r--MPE/flink/bin/kubernetes-jobmanager.sh42
-rw-r--r--MPE/flink/bin/kubernetes-session.sh39
-rw-r--r--MPE/flink/bin/kubernetes-taskmanager.sh45
-rw-r--r--MPE/flink/bin/mesos-appmaster-job.sh23
-rw-r--r--MPE/flink/bin/mesos-appmaster.sh23
-rw-r--r--MPE/flink/bin/mesos-jobmanager.sh52
-rw-r--r--MPE/flink/bin/mesos-taskmanager.sh43
-rw-r--r--MPE/flink/bin/pyflink-shell.sh84
-rw-r--r--MPE/flink/bin/set_flink_yarn_env.sh7
-rw-r--r--MPE/flink/bin/sql-client.sh88
-rw-r--r--MPE/flink/bin/standalone-job.sh55
-rw-r--r--MPE/flink/bin/start-cluster.sh53
-rw-r--r--MPE/flink/bin/start-zookeeper-quorum.sh46
-rw-r--r--MPE/flink/bin/stop-cluster.sh47
-rw-r--r--MPE/flink/bin/stop-zookeeper-quorum.sh46
-rw-r--r--MPE/flink/bin/taskmanager.sh80
-rw-r--r--MPE/flink/bin/yarn-session.sh38
-rw-r--r--MPE/flink/bin/zookeeper.sh68
26 files changed, 1930 insertions, 0 deletions
diff --git a/MPE/flink/bin/bash-java-utils.jar b/MPE/flink/bin/bash-java-utils.jar
new file mode 100644
index 0000000..5b2e369
--- /dev/null
+++ b/MPE/flink/bin/bash-java-utils.jar
Binary files differ
diff --git a/MPE/flink/bin/config.sh b/MPE/flink/bin/config.sh
new file mode 100644
index 0000000..208b2d1
--- /dev/null
+++ b/MPE/flink/bin/config.sh
@@ -0,0 +1,560 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+constructFlinkClassPath() {
+ local FLINK_DIST
+ local FLINK_CLASSPATH
+
+ while read -d '' -r jarfile ; do
+ if [[ "$jarfile" =~ .*/flink-dist[^/]*.jar$ ]]; then
+ FLINK_DIST="$FLINK_DIST":"$jarfile"
+ elif [[ "$FLINK_CLASSPATH" == "" ]]; then
+ FLINK_CLASSPATH="$jarfile";
+ else
+ FLINK_CLASSPATH="$FLINK_CLASSPATH":"$jarfile"
+ fi
+ done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0 | sort -z)
+
+ if [[ "$FLINK_DIST" == "" ]]; then
+ # write error message to stderr since stdout is stored as the classpath
+ (>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_LIB_DIR.")
+
+ # exit function with empty classpath to force process failure
+ exit 1
+ fi
+
+ echo "$FLINK_CLASSPATH""$FLINK_DIST"
+}
+
+findFlinkDistJar() {
+ local FLINK_DIST="`find "$FLINK_LIB_DIR" -name 'flink-dist*.jar'`"
+
+ if [[ "$FLINK_DIST" == "" ]]; then
+ # write error message to stderr since stdout is stored as the classpath
+ (>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_LIB_DIR.")
+
+ # exit function with empty classpath to force process failure
+ exit 1
+ fi
+
+ echo "$FLINK_DIST"
+}
+
+# These are used to mangle paths that are passed to java when using
+# cygwin. Cygwin paths are like linux paths, i.e. /path/to/somewhere
+# but the windows java version expects them in Windows Format, i.e. C:\bla\blub.
+# "cygpath" can do the conversion.
+manglePath() {
+ UNAME=$(uname -s)
+ if [ "${UNAME:0:6}" == "CYGWIN" ]; then
+ echo `cygpath -w "$1"`
+ else
+ echo $1
+ fi
+}
+
+manglePathList() {
+ UNAME=$(uname -s)
+ # a path list, for example a java classpath
+ if [ "${UNAME:0:6}" == "CYGWIN" ]; then
+ echo `cygpath -wp "$1"`
+ else
+ echo $1
+ fi
+}
+
+# Looks up a config value by key from a simple YAML-style key-value map.
+# $1: key to look up
+# $2: default value to return if key does not exist
+# $3: config file to read from
+readFromConfig() {
+ local key=$1
+ local defaultValue=$2
+ local configFile=$3
+
+ # first extract the value with the given key (1st sed), then trim the result (2nd sed)
+ # if a key exists multiple times, take the "last" one (tail)
+ local value=`sed -n "s/^[ ]*${key}[ ]*: \([^#]*\).*$/\1/p" "${configFile}" | sed "s/^ *//;s/ *$//" | tail -n 1`
+
+ [ -z "$value" ] && echo "$defaultValue" || echo "$value"
+}
+
+########################################################################################################################
+# DEFAULT CONFIG VALUES: These values will be used when nothing has been specified in conf/flink-conf.yaml
+# -or- the respective environment variables are not set.
+########################################################################################################################
+
+
+# WARNING !!! , these values are only used if there is nothing else is specified in
+# conf/flink-conf.yaml
+
+DEFAULT_ENV_PID_DIR="$(cd "`dirname "$0"`"/..; pwd)/tmp" # Directory to store *.pid files to
+DEFAULT_ENV_LOG_MAX=10 # Maximum number of old log files to keep
+DEFAULT_ENV_JAVA_OPTS="" # Optional JVM args
+DEFAULT_ENV_JAVA_OPTS_JM="" # Optional JVM args (JobManager)
+DEFAULT_ENV_JAVA_OPTS_TM="" # Optional JVM args (TaskManager)
+DEFAULT_ENV_JAVA_OPTS_HS="" # Optional JVM args (HistoryServer)
+DEFAULT_ENV_JAVA_OPTS_CLI="" # Optional JVM args (Client)
+DEFAULT_ENV_SSH_OPTS="" # Optional SSH parameters running in cluster mode
+DEFAULT_YARN_CONF_DIR="" # YARN Configuration Directory, if necessary
+DEFAULT_HADOOP_CONF_DIR="" # Hadoop Configuration Directory, if necessary
+DEFAULT_HBASE_CONF_DIR="" # HBase Configuration Directory, if necessary
+
+########################################################################################################################
+# CONFIG KEYS: The default values can be overwritten by the following keys in conf/flink-conf.yaml
+########################################################################################################################
+
+KEY_TASKM_COMPUTE_NUMA="taskmanager.compute.numa"
+
+KEY_ENV_PID_DIR="env.pid.dir"
+KEY_ENV_LOG_DIR="env.log.dir"
+KEY_ENV_LOG_MAX="env.log.max"
+KEY_ENV_YARN_CONF_DIR="env.yarn.conf.dir"
+KEY_ENV_HADOOP_CONF_DIR="env.hadoop.conf.dir"
+KEY_ENV_HBASE_CONF_DIR="env.hbase.conf.dir"
+KEY_ENV_JAVA_HOME="env.java.home"
+KEY_ENV_JAVA_OPTS="env.java.opts"
+KEY_ENV_JAVA_OPTS_JM="env.java.opts.jobmanager"
+KEY_ENV_JAVA_OPTS_TM="env.java.opts.taskmanager"
+KEY_ENV_JAVA_OPTS_HS="env.java.opts.historyserver"
+KEY_ENV_JAVA_OPTS_CLI="env.java.opts.client"
+KEY_ENV_SSH_OPTS="env.ssh.opts"
+KEY_HIGH_AVAILABILITY="high-availability"
+KEY_ZK_HEAP_MB="zookeeper.heap.mb"
+
+########################################################################################################################
+# PATHS AND CONFIG
+########################################################################################################################
+
+target="$0"
+# For the case, the executable has been directly symlinked, figure out
+# the correct bin path by following its symlink up to an upper bound.
+# Note: we can't use the readlink utility here if we want to be POSIX
+# compatible.
+iteration=0
+while [ -L "$target" ]; do
+ if [ "$iteration" -gt 100 ]; then
+ echo "Cannot resolve path: You have a cyclic symlink in $target."
+ break
+ fi
+ ls=`ls -ld -- "$target"`
+ target=`expr "$ls" : '.* -> \(.*\)$'`
+ iteration=$((iteration + 1))
+done
+
+# Convert relative path to absolute path and resolve directory symlinks
+bin=`dirname "$target"`
+SYMLINK_RESOLVED_BIN=`cd "$bin"; pwd -P`
+
+# Define the main directory of the flink installation
+# If config.sh is called by pyflink-shell.sh in python bin directory(pip installed), then do not need to set the FLINK_HOME here.
+if [ -z "$_FLINK_HOME_DETERMINED" ]; then
+ FLINK_HOME=`dirname "$SYMLINK_RESOLVED_BIN"`
+fi
+if [ -z "$FLINK_LIB_DIR" ]; then FLINK_LIB_DIR=$FLINK_HOME/lib; fi
+if [ -z "$FLINK_PLUGINS_DIR" ]; then FLINK_PLUGINS_DIR=$FLINK_HOME/plugins; fi
+if [ -z "$FLINK_OPT_DIR" ]; then FLINK_OPT_DIR=$FLINK_HOME/opt; fi
+
+
+# These need to be mangled because they are directly passed to java.
+# The above lib path is used by the shell script to retrieve jars in a
+# directory, so it needs to be unmangled.
+FLINK_HOME_DIR_MANGLED=`manglePath "$FLINK_HOME"`
+if [ -z "$FLINK_CONF_DIR" ]; then FLINK_CONF_DIR=$FLINK_HOME_DIR_MANGLED/conf; fi
+FLINK_BIN_DIR=$FLINK_HOME_DIR_MANGLED/bin
+DEFAULT_FLINK_LOG_DIR=$FLINK_HOME_DIR_MANGLED/log
+FLINK_CONF_FILE="flink-conf.yaml"
+YAML_CONF=${FLINK_CONF_DIR}/${FLINK_CONF_FILE}
+
+### Exported environment variables ###
+export FLINK_CONF_DIR
+export FLINK_BIN_DIR
+export FLINK_PLUGINS_DIR
+# export /lib dir to access it during deployment of the Yarn staging files
+export FLINK_LIB_DIR
+# export /opt dir to access it for the SQL client
+export FLINK_OPT_DIR
+
+########################################################################################################################
+# ENVIRONMENT VARIABLES
+########################################################################################################################
+
+# read JAVA_HOME from config with no default value
+MY_JAVA_HOME=$(readFromConfig ${KEY_ENV_JAVA_HOME} "" "${YAML_CONF}")
+# check if config specified JAVA_HOME
+if [ -z "${MY_JAVA_HOME}" ]; then
+ # config did not specify JAVA_HOME. Use system JAVA_HOME
+ MY_JAVA_HOME="${JAVA_HOME}"
+fi
+# check if we have a valid JAVA_HOME and if java is not available
+if [ -z "${MY_JAVA_HOME}" ] && ! type java > /dev/null 2> /dev/null; then
+ echo "Please specify JAVA_HOME. Either in Flink config ./conf/flink-conf.yaml or as system-wide JAVA_HOME."
+ exit 1
+else
+ JAVA_HOME="${MY_JAVA_HOME}"
+fi
+
+UNAME=$(uname -s)
+if [ "${UNAME:0:6}" == "CYGWIN" ]; then
+ JAVA_RUN=java
+else
+ if [[ -d "$JAVA_HOME" ]]; then
+ JAVA_RUN="$JAVA_HOME"/bin/java
+ else
+ JAVA_RUN=java
+ fi
+fi
+
+# Define HOSTNAME if it is not already set
+if [ -z "${HOSTNAME}" ]; then
+ HOSTNAME=`hostname`
+fi
+
+IS_NUMBER="^[0-9]+$"
+
+# Verify that NUMA tooling is available
+command -v numactl >/dev/null 2>&1
+if [[ $? -ne 0 ]]; then
+ FLINK_TM_COMPUTE_NUMA="false"
+else
+ # Define FLINK_TM_COMPUTE_NUMA if it is not already set
+ if [ -z "${FLINK_TM_COMPUTE_NUMA}" ]; then
+ FLINK_TM_COMPUTE_NUMA=$(readFromConfig ${KEY_TASKM_COMPUTE_NUMA} "false" "${YAML_CONF}")
+ fi
+fi
+
+if [ -z "${MAX_LOG_FILE_NUMBER}" ]; then
+ MAX_LOG_FILE_NUMBER=$(readFromConfig ${KEY_ENV_LOG_MAX} ${DEFAULT_ENV_LOG_MAX} "${YAML_CONF}")
+ export MAX_LOG_FILE_NUMBER
+fi
+
+if [ -z "${FLINK_LOG_DIR}" ]; then
+ FLINK_LOG_DIR=$(readFromConfig ${KEY_ENV_LOG_DIR} "${DEFAULT_FLINK_LOG_DIR}" "${YAML_CONF}")
+fi
+
+if [ -z "${YARN_CONF_DIR}" ]; then
+ YARN_CONF_DIR=$(readFromConfig ${KEY_ENV_YARN_CONF_DIR} "${DEFAULT_YARN_CONF_DIR}" "${YAML_CONF}")
+fi
+
+if [ -z "${HADOOP_CONF_DIR}" ]; then
+ HADOOP_CONF_DIR=$(readFromConfig ${KEY_ENV_HADOOP_CONF_DIR} "${DEFAULT_HADOOP_CONF_DIR}" "${YAML_CONF}")
+fi
+
+if [ -z "${HBASE_CONF_DIR}" ]; then
+ HBASE_CONF_DIR=$(readFromConfig ${KEY_ENV_HBASE_CONF_DIR} "${DEFAULT_HBASE_CONF_DIR}" "${YAML_CONF}")
+fi
+
+if [ -z "${FLINK_PID_DIR}" ]; then
+ FLINK_PID_DIR=$(readFromConfig ${KEY_ENV_PID_DIR} "${DEFAULT_ENV_PID_DIR}" "${YAML_CONF}")
+fi
+
+if [ -z "${FLINK_ENV_JAVA_OPTS}" ]; then
+ FLINK_ENV_JAVA_OPTS=$(readFromConfig ${KEY_ENV_JAVA_OPTS} "${DEFAULT_ENV_JAVA_OPTS}" "${YAML_CONF}")
+
+ # Remove leading and ending double quotes (if present) of value
+ FLINK_ENV_JAVA_OPTS="$( echo "${FLINK_ENV_JAVA_OPTS}" | sed -e 's/^"//' -e 's/"$//' )"
+fi
+
+if [ -z "${FLINK_ENV_JAVA_OPTS_JM}" ]; then
+ FLINK_ENV_JAVA_OPTS_JM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_JM} "${DEFAULT_ENV_JAVA_OPTS_JM}" "${YAML_CONF}")
+ # Remove leading and ending double quotes (if present) of value
+ FLINK_ENV_JAVA_OPTS_JM="$( echo "${FLINK_ENV_JAVA_OPTS_JM}" | sed -e 's/^"//' -e 's/"$//' )"
+fi
+
+if [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then
+ FLINK_ENV_JAVA_OPTS_TM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_TM} "${DEFAULT_ENV_JAVA_OPTS_TM}" "${YAML_CONF}")
+ # Remove leading and ending double quotes (if present) of value
+ FLINK_ENV_JAVA_OPTS_TM="$( echo "${FLINK_ENV_JAVA_OPTS_TM}" | sed -e 's/^"//' -e 's/"$//' )"
+fi
+
+if [ -z "${FLINK_ENV_JAVA_OPTS_HS}" ]; then
+ FLINK_ENV_JAVA_OPTS_HS=$(readFromConfig ${KEY_ENV_JAVA_OPTS_HS} "${DEFAULT_ENV_JAVA_OPTS_HS}" "${YAML_CONF}")
+ # Remove leading and ending double quotes (if present) of value
+ FLINK_ENV_JAVA_OPTS_HS="$( echo "${FLINK_ENV_JAVA_OPTS_HS}" | sed -e 's/^"//' -e 's/"$//' )"
+fi
+
+if [ -z "${FLINK_ENV_JAVA_OPTS_CLI}" ]; then
+ FLINK_ENV_JAVA_OPTS_CLI=$(readFromConfig ${KEY_ENV_JAVA_OPTS_CLI} "${DEFAULT_ENV_JAVA_OPTS_CLI}" "${YAML_CONF}")
+ # Remove leading and ending double quotes (if present) of value
+ FLINK_ENV_JAVA_OPTS_CLI="$( echo "${FLINK_ENV_JAVA_OPTS_CLI}" | sed -e 's/^"//' -e 's/"$//' )"
+fi
+
+if [ -z "${FLINK_SSH_OPTS}" ]; then
+ FLINK_SSH_OPTS=$(readFromConfig ${KEY_ENV_SSH_OPTS} "${DEFAULT_ENV_SSH_OPTS}" "${YAML_CONF}")
+fi
+
+# Define ZK_HEAP if it is not already set
+if [ -z "${ZK_HEAP}" ]; then
+ ZK_HEAP=$(readFromConfig ${KEY_ZK_HEAP_MB} 0 "${YAML_CONF}")
+fi
+
+# High availability
+if [ -z "${HIGH_AVAILABILITY}" ]; then
+ HIGH_AVAILABILITY=$(readFromConfig ${KEY_HIGH_AVAILABILITY} "" "${YAML_CONF}")
+ if [ -z "${HIGH_AVAILABILITY}" ]; then
+ # Try deprecated value
+ DEPRECATED_HA=$(readFromConfig "recovery.mode" "" "${YAML_CONF}")
+ if [ -z "${DEPRECATED_HA}" ]; then
+ HIGH_AVAILABILITY="none"
+ elif [ ${DEPRECATED_HA} == "standalone" ]; then
+ # Standalone is now 'none'
+ HIGH_AVAILABILITY="none"
+ else
+ HIGH_AVAILABILITY=${DEPRECATED_HA}
+ fi
+ fi
+fi
+
+# Arguments for the JVM. Used for job and task manager JVMs.
+# DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys
+# JobManagerOptions#TOTAL_PROCESS_MEMORY and TaskManagerOptions#TOTAL_PROCESS_MEMORY for that!
+if [ -z "${JVM_ARGS}" ]; then
+ JVM_ARGS=""
+fi
+
+# Check if deprecated HADOOP_HOME is set, and specify config path to HADOOP_CONF_DIR if it's empty.
+if [ -z "$HADOOP_CONF_DIR" ]; then
+ if [ -n "$HADOOP_HOME" ]; then
+ # HADOOP_HOME is set. Check if its a Hadoop 1.x or 2.x HADOOP_HOME path
+ if [ -d "$HADOOP_HOME/conf" ]; then
+ # It's Hadoop 1.x
+ HADOOP_CONF_DIR="$HADOOP_HOME/conf"
+ fi
+ if [ -d "$HADOOP_HOME/etc/hadoop" ]; then
+ # It's Hadoop 2.2+
+ HADOOP_CONF_DIR="$HADOOP_HOME/etc/hadoop"
+ fi
+ fi
+fi
+
+# if neither HADOOP_CONF_DIR nor HADOOP_CLASSPATH are set, use some common default (if available)
+if [ -z "$HADOOP_CONF_DIR" ] && [ -z "$HADOOP_CLASSPATH" ]; then
+ if [ -d "/etc/hadoop/conf" ]; then
+ echo "Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or HADOOP_CLASSPATH was set."
+ HADOOP_CONF_DIR="/etc/hadoop/conf"
+ fi
+fi
+
+# Check if deprecated HBASE_HOME is set, and specify config path to HBASE_CONF_DIR if it's empty.
+if [ -z "$HBASE_CONF_DIR" ]; then
+ if [ -n "$HBASE_HOME" ]; then
+ # HBASE_HOME is set.
+ if [ -d "$HBASE_HOME/conf" ]; then
+ HBASE_CONF_DIR="$HBASE_HOME/conf"
+ fi
+ fi
+fi
+
+# try and set HBASE_CONF_DIR to some common default if it's not set
+if [ -z "$HBASE_CONF_DIR" ]; then
+ if [ -d "/etc/hbase/conf" ]; then
+ echo "Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set."
+ HBASE_CONF_DIR="/etc/hbase/conf"
+ fi
+fi
+
+INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}"
+
+if [ -n "${HBASE_CONF_DIR}" ]; then
+ INTERNAL_HADOOP_CLASSPATHS="${INTERNAL_HADOOP_CLASSPATHS}:${HBASE_CONF_DIR}"
+fi
+
+# Auxilliary function which extracts the name of host from a line which
+# also potentially includes topology information and the taskManager type
+extractHostName() {
+ # handle comments: extract first part of string (before first # character)
+ WORKER=`echo $1 | cut -d'#' -f 1`
+
+ # Extract the hostname from the network hierarchy
+ if [[ "$WORKER" =~ ^.*/([0-9a-zA-Z.-]+)$ ]]; then
+ WORKER=${BASH_REMATCH[1]}
+ fi
+
+ echo $WORKER
+}
+
+readMasters() {
+ MASTERS_FILE="${FLINK_CONF_DIR}/masters"
+
+ if [[ ! -f "${MASTERS_FILE}" ]]; then
+ echo "No masters file. Please specify masters in 'conf/masters'."
+ exit 1
+ fi
+
+ MASTERS=()
+ WEBUIPORTS=()
+
+ MASTERS_ALL_LOCALHOST=true
+ GOON=true
+ while $GOON; do
+ read line || GOON=false
+ HOSTWEBUIPORT=$( extractHostName $line)
+
+ if [ -n "$HOSTWEBUIPORT" ]; then
+ HOST=$(echo $HOSTWEBUIPORT | cut -f1 -d:)
+ WEBUIPORT=$(echo $HOSTWEBUIPORT | cut -s -f2 -d:)
+ MASTERS+=(${HOST})
+
+ if [ -z "$WEBUIPORT" ]; then
+ WEBUIPORTS+=(0)
+ else
+ WEBUIPORTS+=(${WEBUIPORT})
+ fi
+
+ if [ "${HOST}" != "localhost" ] && [ "${HOST}" != "127.0.0.1" ] ; then
+ MASTERS_ALL_LOCALHOST=false
+ fi
+ fi
+ done < "$MASTERS_FILE"
+}
+
+readWorkers() {
+ WORKERS_FILE="${FLINK_CONF_DIR}/workers"
+
+ if [[ ! -f "$WORKERS_FILE" ]]; then
+ echo "No workers file. Please specify workers in 'conf/workers'."
+ exit 1
+ fi
+
+ WORKERS=()
+
+ WORKERS_ALL_LOCALHOST=true
+ GOON=true
+ while $GOON; do
+ read line || GOON=false
+ HOST=$( extractHostName $line)
+ if [ -n "$HOST" ] ; then
+ WORKERS+=(${HOST})
+ if [ "${HOST}" != "localhost" ] && [ "${HOST}" != "127.0.0.1" ] ; then
+ WORKERS_ALL_LOCALHOST=false
+ fi
+ fi
+ done < "$WORKERS_FILE"
+}
+
+# starts or stops TMs on all workers
+# TMWorkers start|stop
+TMWorkers() {
+ CMD=$1
+
+ readWorkers
+
+ if [ ${WORKERS_ALL_LOCALHOST} = true ] ; then
+ # all-local setup
+ for worker in ${WORKERS[@]}; do
+ "${FLINK_BIN_DIR}"/taskmanager.sh "${CMD}"
+ done
+ else
+ # non-local setup
+ # start/stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) when available
+ command -v pdsh >/dev/null 2>&1
+ if [[ $? -ne 0 ]]; then
+ for worker in ${WORKERS[@]}; do
+ ssh -n $FLINK_SSH_OPTS $worker -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\" &"
+ done
+ else
+ PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w $(IFS=, ; echo "${WORKERS[*]}") \
+ "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\""
+ fi
+ fi
+}
+
+runBashJavaUtilsCmd() {
+ local cmd=$1
+ local conf_dir=$2
+ local class_path=$3
+ local dynamic_args=${@:4}
+ class_path=`manglePathList "${class_path}"`
+
+ local output=`"${JAVA_RUN}" -classpath "${class_path}" org.apache.flink.runtime.util.bash.BashJavaUtils ${cmd} --configDir "${conf_dir}" $dynamic_args 2>&1 | tail -n 1000`
+ if [[ $? -ne 0 ]]; then
+ echo "[ERROR] Cannot run BashJavaUtils to execute command ${cmd}." 1>&2
+ # Print the output in case the user redirect the log to console.
+ echo "$output" 1>&2
+ exit 1
+ fi
+
+ echo "$output"
+}
+
+extractExecutionResults() {
+ local output="$1"
+ local expected_lines="$2"
+ local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:"
+ local execution_results
+ local num_lines
+
+ execution_results=$(echo "${output}" | grep ${EXECUTION_PREFIX})
+ num_lines=$(echo "${execution_results}" | wc -l)
+ # explicit check for empty result, becuase if execution_results is empty, then wc returns 1
+ if [[ -z ${execution_results} ]]; then
+ echo "[ERROR] The execution result is empty." 1>&2
+ exit 1
+ fi
+ if [[ ${num_lines} -ne ${expected_lines} ]]; then
+ echo "[ERROR] The execution results has unexpected number of lines, expected: ${expected_lines}, actual: ${num_lines}." 1>&2
+ echo "[ERROR] An execution result line is expected following the prefix '${EXECUTION_PREFIX}'" 1>&2
+ echo "$output" 1>&2
+ exit 1
+ fi
+
+ echo "${execution_results//${EXECUTION_PREFIX}/}"
+}
+
+extractLoggingOutputs() {
+ local output="$1"
+ local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:"
+
+ echo "${output}" | grep -v ${EXECUTION_PREFIX}
+}
+
+parseResourceParamsAndExportLogs() {
+ local cmd=$1
+ java_utils_output=$(runBashJavaUtilsCmd ${cmd} "${FLINK_CONF_DIR}" "${FLINK_BIN_DIR}/bash-java-utils.jar:$(findFlinkDistJar)" "$@")
+ logging_output=$(extractLoggingOutputs "${java_utils_output}")
+ params_output=$(extractExecutionResults "${java_utils_output}" 2)
+
+ if [[ $? -ne 0 ]]; then
+ echo "[ERROR] Could not get JVM parameters and dynamic configurations properly."
+ echo "[ERROR] Raw output from BashJavaUtils:"
+ echo "$java_utils_output"
+ exit 1
+ fi
+
+ jvm_params=$(echo "${params_output}" | head -n1)
+ export JVM_ARGS="${JVM_ARGS} ${jvm_params}"
+ export DYNAMIC_PARAMETERS=$(IFS=" " echo "${params_output}" | tail -n1)
+
+ export FLINK_INHERITED_LOGS="
+$FLINK_INHERITED_LOGS
+
+RESOURCE_PARAMS extraction logs:
+jvm_params: $jvm_params
+dynamic_configs: $DYNAMIC_PARAMETERS
+logs: $logging_output
+"
+}
+
+parseJmArgsAndExportLogs() {
+ parseResourceParamsAndExportLogs GET_JM_RESOURCE_PARAMS
+}
+
+parseTmArgsAndExportLogs() {
+ parseResourceParamsAndExportLogs GET_TM_RESOURCE_PARAMS
+}
diff --git a/MPE/flink/bin/find-flink-home.sh b/MPE/flink/bin/find-flink-home.sh
new file mode 100644
index 0000000..e0fe95f
--- /dev/null
+++ b/MPE/flink/bin/find-flink-home.sh
@@ -0,0 +1,28 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+CURRENT_DIR="$( cd "$(dirname "$0")" ; pwd -P )"
+FIND_FLINK_HOME_PYTHON_SCRIPT="$CURRENT_DIR/find_flink_home.py"
+
+if [ ! -f "$FIND_FLINK_HOME_PYTHON_SCRIPT" ]; then
+ export FLINK_HOME="$( cd "$CURRENT_DIR"/.. ; pwd -P )"
+else
+ PYFLINK_PYTHON="${PYFLINK_PYTHON:-"python"}"
+ export FLINK_HOME=$("$FIND_FLINK_HOME_PYTHON_SCRIPT")
+fi
diff --git a/MPE/flink/bin/flink b/MPE/flink/bin/flink
new file mode 100644
index 0000000..3413463
--- /dev/null
+++ b/MPE/flink/bin/flink
@@ -0,0 +1,55 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+target="$0"
+# For the case, the executable has been directly symlinked, figure out
+# the correct bin path by following its symlink up to an upper bound.
+# Note: we can't use the readlink utility here if we want to be POSIX
+# compatible.
+iteration=0
+while [ -L "$target" ]; do
+ if [ "$iteration" -gt 100 ]; then
+ echo "Cannot resolve path: You have a cyclic symlink in $target."
+ break
+ fi
+ ls=`ls -ld -- "$target"`
+ target=`expr "$ls" : '.* -> \(.*\)$'`
+ iteration=$((iteration + 1))
+done
+
+# Convert relative path to absolute path
+bin=`dirname "$target"`
+
+# get flink config
+. "$bin"/config.sh
+
+if [ "$FLINK_IDENT_STRING" = "" ]; then
+ FLINK_IDENT_STRING="$USER"
+fi
+
+CC_CLASSPATH=`constructFlinkClassPath`
+
+log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-client-$HOSTNAME.log
+log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)
+
+# Add Client-specific JVM options
+FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_CLI}"
+
+# Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems
+exec "${JAVA_RUN}" $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"
diff --git a/MPE/flink/bin/flink-console.sh b/MPE/flink/bin/flink-console.sh
new file mode 100644
index 0000000..6ebe2ac
--- /dev/null
+++ b/MPE/flink/bin/flink-console.sh
@@ -0,0 +1,111 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Start a Flink service as a console application. Must be stopped with Ctrl-C
+# or with SIGTERM by kill or the controlling process.
+USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|kubernetes-session|kubernetes-application|kubernetes-taskmanager) [args]"
+
+SERVICE=$1
+ARGS=("${@:2}") # get remaining arguments as array
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+case $SERVICE in
+ (taskexecutor)
+ CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
+ ;;
+
+ (historyserver)
+ CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer
+ ;;
+
+ (zookeeper)
+ CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
+ ;;
+
+ (standalonesession)
+ CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
+ ;;
+
+ (standalonejob)
+ CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint
+ ;;
+
+ (kubernetes-session)
+ CLASS_TO_RUN=org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint
+ ;;
+
+ (kubernetes-application)
+ CLASS_TO_RUN=org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint
+ ;;
+
+ (kubernetes-taskmanager)
+ CLASS_TO_RUN=org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner
+ ;;
+
+ (*)
+ echo "Unknown service '${SERVICE}'. $USAGE."
+ exit 1
+ ;;
+esac
+
+FLINK_TM_CLASSPATH=`constructFlinkClassPath`
+
+if [ "$FLINK_IDENT_STRING" = "" ]; then
+ FLINK_IDENT_STRING="$USER"
+fi
+
+pid=$FLINK_PID_DIR/flink-$FLINK_IDENT_STRING-$SERVICE.pid
+mkdir -p "$FLINK_PID_DIR"
+# The lock needs to be released after use because this script is started foreground
+command -v flock >/dev/null 2>&1
+flock_exist=$?
+if [[ ${flock_exist} -eq 0 ]]; then
+ exec 200<"$FLINK_PID_DIR"
+ flock 200
+fi
+# Remove the pid file when all the processes are dead
+if [ -f "$pid" ]; then
+ all_dead=0
+ while read each_pid; do
+ # Check whether the process is still running
+ kill -0 $each_pid > /dev/null 2>&1
+ [[ $? -eq 0 ]] && all_dead=1
+ done < "$pid"
+ [ ${all_dead} -eq 0 ] && rm $pid
+fi
+id=$([ -f "$pid" ] && echo $(wc -l < "$pid") || echo "0")
+
+FLINK_LOG_PREFIX="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-${SERVICE}-${id}-${HOSTNAME}"
+log="${FLINK_LOG_PREFIX}.log"
+
+log_setting=("-Dlog.file=${log}" "-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlog4j.configurationFile=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml")
+
+echo "Starting $SERVICE as a console application on host $HOSTNAME."
+
+# Add the current process id to pid file
+echo $$ >> "$pid" 2>/dev/null
+
+# Release the lock because the java process runs in the foreground and would block other processes from modifying the pid file
+[[ ${flock_exist} -eq 0 ]] && flock -u 200
+
+exec "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}"
diff --git a/MPE/flink/bin/flink-daemon.sh b/MPE/flink/bin/flink-daemon.sh
new file mode 100644
index 0000000..67fe698
--- /dev/null
+++ b/MPE/flink/bin/flink-daemon.sh
@@ -0,0 +1,194 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Start/stop a Flink daemon.
+USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]"
+
+STARTSTOP=$1
+DAEMON=$2
+ARGS=("${@:3}") # get remaining arguments as array
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+case $DAEMON in
+ (taskexecutor)
+ CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
+ ;;
+
+ (zookeeper)
+ CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
+ ;;
+
+ (historyserver)
+ CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer
+ ;;
+
+ (standalonesession)
+ CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
+ ;;
+
+ (standalonejob)
+ CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint
+ ;;
+
+ (*)
+ echo "Unknown daemon '${DAEMON}'. $USAGE."
+ exit 1
+ ;;
+esac
+
+if [ "$FLINK_IDENT_STRING" = "" ]; then
+ FLINK_IDENT_STRING="$USER"
+fi
+
+FLINK_TM_CLASSPATH=`constructFlinkClassPath`
+
+pid=$FLINK_PID_DIR/flink-$FLINK_IDENT_STRING-$DAEMON.pid
+
+mkdir -p "$FLINK_PID_DIR"
+
+# Log files for daemons are indexed from the process ID's position in the PID
+# file. The following lock prevents a race condition during daemon startup
+# when multiple daemons read, index, and write to the PID file concurrently.
+# The lock is created on the PID directory since a lock file cannot be safely
+# removed. The daemon is started with the lock closed and the lock remains
+# active in this script until the script exits.
+command -v flock >/dev/null 2>&1
+if [[ $? -eq 0 ]]; then
+ exec 200<"$FLINK_PID_DIR"
+ flock 200
+fi
+
+# Ascending ID depending on number of lines in pid file.
+# This allows us to start multiple daemon of each type.
+id=$([ -f "$pid" ] && echo $(wc -l < "$pid") || echo "0")
+
+FLINK_LOG_PREFIX="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-${DAEMON}-${id}-${HOSTNAME}"
+log="${FLINK_LOG_PREFIX}.log"
+out="${FLINK_LOG_PREFIX}.out"
+
+log_setting=("-Dlog.file=${log}" "-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j.properties" "-Dlog4j.configurationFile=file:${FLINK_CONF_DIR}/log4j.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback.xml")
+
+function guaranteed_kill {
+ to_stop_pid=$1
+ daemon=$2
+
+ # send sigterm for graceful shutdown
+ kill $to_stop_pid
+ # if timeout exists, use it
+ if command -v timeout &> /dev/null ; then
+ # wait 10 seconds for process to stop. By default, Flink kills the JVM 5 seconds after sigterm.
+ timeout 10 tail --pid=$to_stop_pid -f /dev/null
+ if [ "$?" -eq 124 ]; then
+ echo "Daemon $daemon didn't stop within 10 seconds. Killing it."
+ # send sigkill
+ kill -9 $to_stop_pid
+ fi
+ fi
+}
+
+case $STARTSTOP in
+
+ (start)
+
+ # Print a warning if daemons are already running on host
+ if [ -f "$pid" ]; then
+ active=()
+ while IFS='' read -r p || [[ -n "$p" ]]; do
+ kill -0 $p >/dev/null 2>&1
+ if [ $? -eq 0 ]; then
+ active+=($p)
+ fi
+ done < "${pid}"
+
+ count="${#active[@]}"
+
+ if [ ${count} -gt 0 ]; then
+ echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME."
+ fi
+ fi
+
+ # Evaluate user options for local variable expansion
+ FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS})
+
+ echo "Starting $DAEMON daemon on host $HOSTNAME."
+ "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 200<&- 2>&1 < /dev/null &
+
+ mypid=$!
+
+ # Add to pid file if successful start
+ if [[ ${mypid} =~ ${IS_NUMBER} ]] && kill -0 $mypid > /dev/null 2>&1 ; then
+ echo $mypid >> "$pid"
+ else
+ echo "Error starting $DAEMON daemon."
+ exit 1
+ fi
+ ;;
+
+ (stop)
+ if [ -f "$pid" ]; then
+ # Remove last in pid file
+ to_stop=$(tail -n 1 "$pid")
+
+ if [ -z $to_stop ]; then
+ rm "$pid" # If all stopped, clean up pid file
+ echo "No $DAEMON daemon to stop on host $HOSTNAME."
+ else
+ sed \$d "$pid" > "$pid.tmp" # all but last line
+
+ # If all stopped, clean up pid file
+ [ $(wc -l < "$pid.tmp") -eq 0 ] && rm "$pid" "$pid.tmp" || mv "$pid.tmp" "$pid"
+
+ if kill -0 $to_stop > /dev/null 2>&1; then
+ echo "Stopping $DAEMON daemon (pid: $to_stop) on host $HOSTNAME."
+ guaranteed_kill $to_stop $DAEMON
+ else
+ echo "No $DAEMON daemon (pid: $to_stop) is running anymore on $HOSTNAME."
+ fi
+ fi
+ else
+ echo "No $DAEMON daemon to stop on host $HOSTNAME."
+ fi
+ ;;
+
+ (stop-all)
+ if [ -f "$pid" ]; then
+ mv "$pid" "${pid}.tmp"
+
+ while read to_stop; do
+ if kill -0 $to_stop > /dev/null 2>&1; then
+ echo "Stopping $DAEMON daemon (pid: $to_stop) on host $HOSTNAME."
+ guaranteed_kill $to_stop $DAEMON
+ else
+ echo "Skipping $DAEMON daemon (pid: $to_stop), because it is not running anymore on $HOSTNAME."
+ fi
+ done < "${pid}.tmp"
+ rm "${pid}.tmp"
+ fi
+ ;;
+
+ (*)
+ echo "Unexpected argument '$STARTSTOP'. $USAGE."
+ exit 1
+ ;;
+
+esac
diff --git a/MPE/flink/bin/historyserver.sh b/MPE/flink/bin/historyserver.sh
new file mode 100644
index 0000000..3bc3049
--- /dev/null
+++ b/MPE/flink/bin/historyserver.sh
@@ -0,0 +1,39 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Start/stop a Flink HistoryServer
+USAGE="Usage: historyserver.sh (start|start-foreground|stop)"
+
+STARTSTOP=$1
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
+ export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_HS}"
+ args=("--configDir" "${FLINK_CONF_DIR}")
+fi
+
+if [[ $STARTSTOP == "start-foreground" ]]; then
+ exec "${FLINK_BIN_DIR}"/flink-console.sh historyserver "${args[@]}"
+else
+ "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP historyserver "${args[@]}"
+fi
diff --git a/MPE/flink/bin/jobmanager.sh b/MPE/flink/bin/jobmanager.sh
new file mode 100644
index 0000000..35fbe2c
--- /dev/null
+++ b/MPE/flink/bin/jobmanager.sh
@@ -0,0 +1,64 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Start/stop a Flink JobManager.
+USAGE="Usage: jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all"
+
+STARTSTOP=$1
+HOST=$2 # optional when starting multiple instances
+WEBUIPORT=$3 # optional when starting multiple instances
+
+if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
+ echo $USAGE
+ exit 1
+fi
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+ENTRYPOINT=standalonesession
+
+if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
+ # Add JobManager-specific JVM options
+ export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"
+ parseJmArgsAndExportLogs "${ARGS[@]}"
+
+ args=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "cluster")
+ if [ ! -z $HOST ]; then
+ args+=("--host")
+ args+=("${HOST}")
+ fi
+
+ if [ ! -z $WEBUIPORT ]; then
+ args+=("--webui-port")
+ args+=("${WEBUIPORT}")
+ fi
+
+ if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then
+ args+=(${DYNAMIC_PARAMETERS[@]})
+ fi
+fi
+
+if [[ $STARTSTOP == "start-foreground" ]]; then
+ exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${args[@]}"
+else
+ "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${args[@]}"
+fi
diff --git a/MPE/flink/bin/kubernetes-jobmanager.sh b/MPE/flink/bin/kubernetes-jobmanager.sh
new file mode 100644
index 0000000..0513123
--- /dev/null
+++ b/MPE/flink/bin/kubernetes-jobmanager.sh
@@ -0,0 +1,42 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Start a Flink JobManager for native Kubernetes.
+# NOTE: This script is not meant to be started manually. It will be used by native Kubernetes integration.
+
+USAGE="Usage: kubernetes-jobmanager.sh kubernetes-session|kubernetes-application [args]"
+
+ENTRY_POINT_NAME=$1
+
+ARGS=("${@:2}")
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+# Add JobManager specific JVM options
+export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"
+parseJmArgsAndExportLogs "${ARGS[@]}"
+
+if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then
+ ARGS=(${DYNAMIC_PARAMETERS[@]} "${ARGS[@]}")
+fi
+
+exec "${FLINK_BIN_DIR}"/flink-console.sh ${ENTRY_POINT_NAME} "${ARGS[@]}"
diff --git a/MPE/flink/bin/kubernetes-session.sh b/MPE/flink/bin/kubernetes-session.sh
new file mode 100644
index 0000000..559a776
--- /dev/null
+++ b/MPE/flink/bin/kubernetes-session.sh
@@ -0,0 +1,39 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+# get Flink config
+. "$bin"/config.sh
+
+if [ "$FLINK_IDENT_STRING" = "" ]; then
+ FLINK_IDENT_STRING="$USER"
+fi
+
+JVM_ARGS="$JVM_ARGS -Xmx512m"
+
+CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`
+
+log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-k8s-session-$HOSTNAME.log
+log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-session.properties -Dlog4j.configurationFile=file:"$FLINK_CONF_DIR"/log4j-session.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback-session.xml"
+
+export FLINK_CONF_DIR
+
+"$JAVA_RUN" $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.kubernetes.cli.KubernetesSessionCli "$@"
diff --git a/MPE/flink/bin/kubernetes-taskmanager.sh b/MPE/flink/bin/kubernetes-taskmanager.sh
new file mode 100644
index 0000000..b11fb89
--- /dev/null
+++ b/MPE/flink/bin/kubernetes-taskmanager.sh
@@ -0,0 +1,45 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Start a Flink TaskManager for native Kubernetes.
+# NOTE: This script is not meant to be started manually. It will be used by native Kubernetes integration.
+
+USAGE="Usage: kubernetes-taskmanager.sh [args]"
+
+ENTRYPOINT=kubernetes-taskmanager
+
+ARGS=("${@:1}")
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+# if no other JVM options are set, set the GC to G1
+if [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then
+ export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"
+fi
+
+# Add TaskManager specific JVM options
+export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"
+export JVM_ARGS="$JVM_ARGS $FLINK_TM_JVM_MEM_OPTS"
+
+ARGS=("--configDir" "${FLINK_CONF_DIR}" "${ARGS[@]}")
+
+exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${ARGS[@]}"
diff --git a/MPE/flink/bin/mesos-appmaster-job.sh b/MPE/flink/bin/mesos-appmaster-job.sh
new file mode 100644
index 0000000..5ae7396
--- /dev/null
+++ b/MPE/flink/bin/mesos-appmaster-job.sh
@@ -0,0 +1,23 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=$(dirname "$0")
+bin=$(cd "${bin}" || exit; pwd)
+
+exec "${bin}"/mesos-jobmanager.sh "org.apache.flink.mesos.entrypoint.MesosJobClusterEntrypoint" "$@"
diff --git a/MPE/flink/bin/mesos-appmaster.sh b/MPE/flink/bin/mesos-appmaster.sh
new file mode 100644
index 0000000..2939e31
--- /dev/null
+++ b/MPE/flink/bin/mesos-appmaster.sh
@@ -0,0 +1,23 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=$(dirname "$0")
+bin=$(cd "${bin}" || exit; pwd)
+
+exec "${bin}"/mesos-jobmanager.sh "org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint" "$@"
diff --git a/MPE/flink/bin/mesos-jobmanager.sh b/MPE/flink/bin/mesos-jobmanager.sh
new file mode 100644
index 0000000..b786e18
--- /dev/null
+++ b/MPE/flink/bin/mesos-jobmanager.sh
@@ -0,0 +1,52 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+ENTRY_POINT=$1
+ARGS=("${@:2}")
+
+bin=$(dirname "$0")
+bin=$(cd "${bin}" || exit; pwd)
+
+# get Flink config
+. "${bin}"/config.sh
+
+parseJmArgsAndExportLogs "${ARGS[@]}"
+
+if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then
+ ARGS=(${DYNAMIC_PARAMETERS[@]} "${ARGS[@]}")
+fi
+
+if [ "$FLINK_IDENT_STRING" = "" ]; then
+ FLINK_IDENT_STRING="$USER"
+fi
+
+CC_CLASSPATH=$(manglePathList "$(constructFlinkClassPath):${INTERNAL_HADOOP_CLASSPATHS}")
+
+log="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-mesos-appmaster-${HOSTNAME}.log"
+log_setting="-Dlog.file=${log} -Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j.properties -Dlog4j.configurationFile=file:${FLINK_CONF_DIR}/log4j.properties -Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback.xml"
+
+"${JAVA_RUN}" ${JVM_ARGS} -classpath ${CC_CLASSPATH} ${log_setting} ${ENTRY_POINT} "${ARGS[@]}"
+
+rc=$?
+
+if [[ ${rc} -ne 0 ]]; then
+ echo "Error while starting the mesos application master. Please check ${log} for more details."
+fi
+
+exit ${rc}
diff --git a/MPE/flink/bin/mesos-taskmanager.sh b/MPE/flink/bin/mesos-taskmanager.sh
new file mode 100644
index 0000000..6c65c2a
--- /dev/null
+++ b/MPE/flink/bin/mesos-taskmanager.sh
@@ -0,0 +1,43 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+# get Flink config
+. "$bin"/config.sh
+
+CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`
+
+log=flink-taskmanager.log
+log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlog4j.configurationFile=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml"
+
+# Add precomputed memory JVM options
+if [ -z "${FLINK_ENV_JAVA_OPTS_MEM}" ]; then
+ FLINK_ENV_JAVA_OPTS_MEM=""
+fi
+export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_MEM}"
+
+# Add TaskManager-specific JVM options
+export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"
+
+ENTRY_POINT=org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner
+
+exec "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} -classpath "$CC_CLASSPATH" $log_setting ${ENTRY_POINT} "$@"
+
diff --git a/MPE/flink/bin/pyflink-shell.sh b/MPE/flink/bin/pyflink-shell.sh
new file mode 100644
index 0000000..a616abb
--- /dev/null
+++ b/MPE/flink/bin/pyflink-shell.sh
@@ -0,0 +1,84 @@
+#!/bin/bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+. "$bin"/find-flink-home.sh
+
+_FLINK_HOME_DETERMINED=1
+
+. "$FLINK_HOME"/bin/config.sh
+
+FLINK_CLASSPATH=`constructFlinkClassPath`
+PYTHON_JAR_PATH=`echo "$FLINK_OPT_DIR"/flink-python*.jar`
+
+
+PYFLINK_PYTHON="${PYFLINK_PYTHON:-"python"}"
+
+# So that python can find out Flink's Jars
+export FLINK_BIN_DIR=$FLINK_BIN_DIR
+export FLINK_HOME
+
+# Add pyflink & py4j & cloudpickle to PYTHONPATH
+export PYTHONPATH="$FLINK_OPT_DIR/python/pyflink.zip:$PYTHONPATH"
+PY4J_ZIP=`echo "$FLINK_OPT_DIR"/python/py4j-*-src.zip`
+CLOUDPICKLE_ZIP=`echo "$FLINK_OPT_DIR"/python/cloudpickle-*-src.zip`
+export PYTHONPATH="$PY4J_ZIP:$CLOUDPICKLE_ZIP:$PYTHONPATH"
+
+PARSER="org.apache.flink.client.python.PythonShellParser"
+function parse_options() {
+ "${JAVA_RUN}" ${JVM_ARGS} -cp ${FLINK_CLASSPATH}:${PYTHON_JAR_PATH} ${PARSER} "$@"
+ printf "%d\0" $?
+}
+
+# Turn off posix mode since it does not allow process substitution
+set +o posix
+# If the command has option --help | -h, the script will directly
+# run the PythonShellParser program to stdout the help message.
+if [[ "$@" =~ '--help' ]] || [[ "$@" =~ '-h' ]]; then
+ "${JAVA_RUN}" ${JVM_ARGS} -cp ${FLINK_CLASSPATH}:${PYTHON_JAR_PATH} ${PARSER} "$@"
+ exit 0
+fi
+OPTIONS=()
+while IFS= read -d '' -r ARG; do
+ OPTIONS+=("$ARG")
+done < <(parse_options "$@")
+
+COUNT=${#OPTIONS[@]}
+LAST=$((COUNT - 1))
+LAUNCHER_EXIT_CODE=${OPTIONS[$LAST]}
+
+# Certain JVM failures result in errors being printed to stdout (instead of stderr), which causes
+# the code that parses the output of the launcher to get confused. In those cases, check if the
+# exit code is an integer, and if it's not, handle it as a special error case.
+if ! [[ ${LAUNCHER_EXIT_CODE} =~ ^[0-9]+$ ]]; then
+ echo "${OPTIONS[@]}" | head -n-1 1>&2
+ exit 1
+fi
+
+if [[ ${LAUNCHER_EXIT_CODE} != 0 ]]; then
+ exit ${LAUNCHER_EXIT_CODE}
+fi
+
+OPTIONS=("${OPTIONS[@]:0:$LAST}")
+
+export SUBMIT_ARGS=${OPTIONS[@]}
+
+# -i: interactive
+# -m: execute shell.py in the zip package
+${PYFLINK_PYTHON} -i -m pyflink.shell
diff --git a/MPE/flink/bin/set_flink_yarn_env.sh b/MPE/flink/bin/set_flink_yarn_env.sh
new file mode 100644
index 0000000..cb0e63e
--- /dev/null
+++ b/MPE/flink/bin/set_flink_yarn_env.sh
@@ -0,0 +1,7 @@
+#!/bin/bash
+
+echo -e "\n#flink\nexport FLINK_HOME=/data/tsg/olap/flink-1.13.1\nexport PATH=\$FLINK_HOME/bin:\$PATH" >> /etc/profile.d/flink.sh
+chmod +x /etc/profile.d/flink.sh
+source /etc/profile
+
+
diff --git a/MPE/flink/bin/sql-client.sh b/MPE/flink/bin/sql-client.sh
new file mode 100644
index 0000000..759f0c6
--- /dev/null
+++ b/MPE/flink/bin/sql-client.sh
@@ -0,0 +1,88 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+################################################################################
+# Adopted from "flink" bash script
+################################################################################
+
+target="$0"
+# For the case, the executable has been directly symlinked, figure out
+# the correct bin path by following its symlink up to an upper bound.
+# Note: we can't use the readlink utility here if we want to be POSIX
+# compatible.
+iteration=0
+while [ -L "$target" ]; do
+ if [ "$iteration" -gt 100 ]; then
+ echo "Cannot resolve path: You have a cyclic symlink in $target."
+ break
+ fi
+ ls=`ls -ld -- "$target"`
+ target=`expr "$ls" : '.* -> \(.*\)$'`
+ iteration=$((iteration + 1))
+done
+
+# Convert relative path to absolute path
+bin=`dirname "$target"`
+
+# get flink config
+. "$bin"/config.sh
+
+if [ "$FLINK_IDENT_STRING" = "" ]; then
+ FLINK_IDENT_STRING="$USER"
+fi
+
+CC_CLASSPATH=`constructFlinkClassPath`
+
+################################################################################
+# SQL client specific logic
+################################################################################
+
+log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-sql-client-$HOSTNAME.log
+log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)
+
+# get path of jar in /opt if it exist
+FLINK_SQL_CLIENT_JAR=$(find "$FLINK_OPT_DIR" -regex ".*flink-sql-client.*.jar")
+
+# add flink-python jar to the classpath
+if [[ ! "$CC_CLASSPATH" =~ .*flink-python.*.jar ]]; then
+ FLINK_PYTHON_JAR=$(find "$FLINK_OPT_DIR" -regex ".*flink-python.*.jar")
+ if [ -n "$FLINK_PYTHON_JAR" ]; then
+ CC_CLASSPATH="$CC_CLASSPATH:$FLINK_PYTHON_JAR"
+ fi
+fi
+
+# check if SQL client is already in classpath and must not be shipped manually
+if [[ "$CC_CLASSPATH" =~ .*flink-sql-client.*.jar ]]; then
+
+ # start client without jar
+ exec "$JAVA_RUN" $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.table.client.SqlClient "$@"
+
+# check if SQL client jar is in /opt
+elif [ -n "$FLINK_SQL_CLIENT_JAR" ]; then
+
+ # start client with jar
+ exec "$JAVA_RUN" $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS:$FLINK_SQL_CLIENT_JAR"`" org.apache.flink.table.client.SqlClient "$@" --jar "`manglePath $FLINK_SQL_CLIENT_JAR`"
+
+# write error message to stderr
+else
+ (>&2 echo "[ERROR] Flink SQL Client JAR file 'flink-sql-client*.jar' neither found in classpath nor /opt directory should be located in $FLINK_OPT_DIR.")
+
+ # exit to force process failure
+ exit 1
+fi
diff --git a/MPE/flink/bin/standalone-job.sh b/MPE/flink/bin/standalone-job.sh
new file mode 100644
index 0000000..b4cfa20
--- /dev/null
+++ b/MPE/flink/bin/standalone-job.sh
@@ -0,0 +1,55 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Start/stop a Flink JobManager.
+USAGE="Usage: standalone-job.sh ((start|start-foreground))|stop [args]"
+
+STARTSTOP=$1
+ENTRY_POINT_NAME="standalonejob"
+
+if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]]; then
+ echo $USAGE
+ exit 1
+fi
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+# Startup parameters
+ARGS=("${@:2}")
+
+if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
+ # Add cluster entry point specific JVM options
+ export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"
+ parseJmArgsAndExportLogs "${ARGS[@]}"
+
+ if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then
+ ARGS=(${DYNAMIC_PARAMETERS[@]} "${ARGS[@]}")
+ fi
+fi
+
+ARGS=("--configDir" "${FLINK_CONF_DIR}" "${ARGS[@]}")
+
+if [[ $STARTSTOP == "start-foreground" ]]; then
+ exec "${FLINK_BIN_DIR}"/flink-console.sh ${ENTRY_POINT_NAME} "${ARGS[@]}"
+else
+ "${FLINK_BIN_DIR}"/flink-daemon.sh ${STARTSTOP} ${ENTRY_POINT_NAME} "${ARGS[@]}"
+fi
diff --git a/MPE/flink/bin/start-cluster.sh b/MPE/flink/bin/start-cluster.sh
new file mode 100644
index 0000000..720b33c
--- /dev/null
+++ b/MPE/flink/bin/start-cluster.sh
@@ -0,0 +1,53 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+# Start the JobManager instance(s)
+shopt -s nocasematch
+if [[ $HIGH_AVAILABILITY == "zookeeper" ]]; then
+ # HA Mode
+ readMasters
+
+ echo "Starting HA cluster with ${#MASTERS[@]} masters."
+
+ for ((i=0;i<${#MASTERS[@]};++i)); do
+ master=${MASTERS[i]}
+ webuiport=${WEBUIPORTS[i]}
+
+ if [ ${MASTERS_ALL_LOCALHOST} = true ] ; then
+ "${FLINK_BIN_DIR}"/jobmanager.sh start "${master}" "${webuiport}"
+ else
+ ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/jobmanager.sh\" start ${master} ${webuiport} &"
+ fi
+ done
+
+else
+ echo "Starting cluster."
+
+ # Start single JobManager on this machine
+ "$FLINK_BIN_DIR"/jobmanager.sh start
+fi
+shopt -u nocasematch
+
+# Start TaskManager instance(s)
+TMWorkers start
diff --git a/MPE/flink/bin/start-zookeeper-quorum.sh b/MPE/flink/bin/start-zookeeper-quorum.sh
new file mode 100644
index 0000000..d5a7593
--- /dev/null
+++ b/MPE/flink/bin/start-zookeeper-quorum.sh
@@ -0,0 +1,46 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+# Starts a ZooKeeper quorum as configured in $FLINK_CONF/zoo.cfg
+
+ZK_CONF="$FLINK_CONF_DIR/zoo.cfg"
+if [ ! -f "$ZK_CONF" ]; then
+ echo "[ERROR] No ZooKeeper configuration file found in '$ZK_CONF'."
+ exit 1
+fi
+
+# Extract server.X from ZooKeeper config and start instances
+while read server ; do
+ server=$(echo -e "${server}" | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//') # trim
+
+ # match server.id=address[:port[:port]]
+ if [[ $server =~ ^server\.([0-9]+)[[:space:]]*\=[[:space:]]*([^: \#]+) ]]; then
+ id=${BASH_REMATCH[1]}
+ address=${BASH_REMATCH[2]}
+
+ ssh -n $FLINK_SSH_OPTS $address -- "nohup /bin/bash -l $FLINK_BIN_DIR/zookeeper.sh start $id &"
+ else
+ echo "[WARN] Parse error. Skipping config entry '$server'."
+ fi
+done < <(grep "^server\." "$ZK_CONF")
diff --git a/MPE/flink/bin/stop-cluster.sh b/MPE/flink/bin/stop-cluster.sh
new file mode 100644
index 0000000..d29b4f3
--- /dev/null
+++ b/MPE/flink/bin/stop-cluster.sh
@@ -0,0 +1,47 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+# Stop TaskManager instance(s)
+TMWorkers stop
+
+# Stop JobManager instance(s)
+shopt -s nocasematch
+if [[ $HIGH_AVAILABILITY == "zookeeper" ]]; then
+ # HA Mode
+ readMasters
+
+ if [ ${MASTERS_ALL_LOCALHOST} = true ] ; then
+ for master in ${MASTERS[@]}; do
+ "$FLINK_BIN_DIR"/jobmanager.sh stop
+ done
+ else
+ for master in ${MASTERS[@]}; do
+ ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/jobmanager.sh\" stop &"
+ done
+ fi
+
+else
+ "$FLINK_BIN_DIR"/jobmanager.sh stop
+fi
+shopt -u nocasematch
diff --git a/MPE/flink/bin/stop-zookeeper-quorum.sh b/MPE/flink/bin/stop-zookeeper-quorum.sh
new file mode 100644
index 0000000..ad79de8
--- /dev/null
+++ b/MPE/flink/bin/stop-zookeeper-quorum.sh
@@ -0,0 +1,46 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+# Stops a ZooKeeper quorum as configured in $FLINK_CONF/zoo.cfg
+
+ZK_CONF="$FLINK_CONF_DIR/zoo.cfg"
+if [ ! -f "$ZK_CONF" ]; then
+ echo "[ERROR] No ZooKeeper configuration file found in '$ZK_CONF'."
+ exit 1
+fi
+
+# Extract server.X from ZooKeeper config and stop instances
+while read server ; do
+ server=$(echo -e "${server}" | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//') # trim
+
+ # match server.id=address[:port[:port]]
+ if [[ $server =~ ^server\.([0-9]+)[[:space:]]*\=[[:space:]]*([^: \#]+) ]]; then
+ id=${BASH_REMATCH[1]}
+ server=${BASH_REMATCH[2]}
+
+ ssh -n $FLINK_SSH_OPTS $server -- "nohup /bin/bash -l $FLINK_BIN_DIR/zookeeper.sh stop &"
+ else
+ echo "[WARN] Parse error. Skipping config entry '$server'."
+ fi
+done < <(grep "^server\." "$ZK_CONF")
diff --git a/MPE/flink/bin/taskmanager.sh b/MPE/flink/bin/taskmanager.sh
new file mode 100644
index 0000000..fdc6514
--- /dev/null
+++ b/MPE/flink/bin/taskmanager.sh
@@ -0,0 +1,80 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Start/stop a Flink TaskManager.
+USAGE="Usage: taskmanager.sh (start|start-foreground|stop|stop-all)"
+
+STARTSTOP=$1
+
+ARGS=("${@:2}")
+
+if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
+ echo $USAGE
+ exit 1
+fi
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+ENTRYPOINT=taskexecutor
+
+if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
+
+ # if no other JVM options are set, set the GC to G1
+ if [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then
+ export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"
+ fi
+
+ # Add TaskManager-specific JVM options
+ export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"
+
+ # Startup parameters
+
+ parseTmArgsAndExportLogs "${ARGS[@]}"
+
+ if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then
+ ARGS=(${DYNAMIC_PARAMETERS[@]} "${ARGS[@]}")
+ fi
+
+ ARGS=("--configDir" "${FLINK_CONF_DIR}" "${ARGS[@]}")
+fi
+
+if [[ $STARTSTOP == "start-foreground" ]]; then
+ exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${ARGS[@]}"
+else
+ if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then
+ # Start a single TaskManager
+ "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
+ else
+ # Example output from `numactl --show` on an AWS c4.8xlarge:
+ # policy: default
+ # preferred node: current
+ # physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
+ # cpubind: 0 1
+ # nodebind: 0 1
+ # membind: 0 1
+ read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ")
+ for NODE_ID in "${NODE_LIST[@]:1}"; do
+ # Start a TaskManager for each NUMA node
+ numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
+ done
+ fi
+fi
diff --git a/MPE/flink/bin/yarn-session.sh b/MPE/flink/bin/yarn-session.sh
new file mode 100644
index 0000000..f36ca34
--- /dev/null
+++ b/MPE/flink/bin/yarn-session.sh
@@ -0,0 +1,38 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+# get Flink config
+. "$bin"/config.sh
+
+if [ "$FLINK_IDENT_STRING" = "" ]; then
+ FLINK_IDENT_STRING="$USER"
+fi
+
+JVM_ARGS="$JVM_ARGS -Xmx512m"
+
+CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`
+
+log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-yarn-session-$HOSTNAME.log
+log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-session.properties -Dlog4j.configurationFile=file:"$FLINK_CONF_DIR"/log4j-session.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback-session.xml"
+
+"$JAVA_RUN" $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.yarn.cli.FlinkYarnSessionCli -j "$FLINK_LIB_DIR"/flink-dist*.jar "$@"
+
diff --git a/MPE/flink/bin/zookeeper.sh b/MPE/flink/bin/zookeeper.sh
new file mode 100644
index 0000000..9297401
--- /dev/null
+++ b/MPE/flink/bin/zookeeper.sh
@@ -0,0 +1,68 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Start/stop a ZooKeeper quorum peer.
+USAGE="Usage: zookeeper.sh ((start|start-foreground) peer-id)|stop|stop-all"
+
+STARTSTOP=$1
+PEER_ID=$2
+
+if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
+ echo $USAGE
+ exit 1
+fi
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+ZK_CONF="$FLINK_CONF_DIR/zoo.cfg"
+if [ ! -f "$ZK_CONF" ]; then
+ echo "[ERROR] No ZooKeeper configuration file found in '$ZK_CONF'."
+ exit 1
+fi
+
+if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
+ if [ -z $PEER_ID ]; then
+ echo "[ERROR] Missing peer id argument. $USAGE."
+ exit 1
+ fi
+
+ if [[ ! ${ZK_HEAP} =~ ${IS_NUMBER} ]]; then
+ echo "[ERROR] Configured ZooKeeper JVM heap size is not a number. Please set '$KEY_ZK_HEAP_MB' in $FLINK_CONF_FILE."
+ exit 1
+ fi
+
+ if [ "$ZK_HEAP" -gt 0 ]; then
+ export JVM_ARGS="$JVM_ARGS -Xms"$ZK_HEAP"m -Xmx"$ZK_HEAP"m"
+ fi
+
+ # Startup parameters
+ args=("--zkConfigFile" "${ZK_CONF}" "--peerId" "${PEER_ID}")
+fi
+
+# the JMX log4j integration in ZK 3.4 does not work log4j 2
+export JVM_ARGS="$JVM_ARGS -Dzookeeper.jmx.log4j.disable=true"
+
+if [[ $STARTSTOP == "start-foreground" ]]; then
+ "${FLINK_BIN_DIR}"/flink-console.sh zookeeper "${args[@]}"
+else
+ "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP zookeeper "${args[@]}"
+fi