diff options
| author | wangchengcheng <[email protected]> | 2023-07-27 15:43:51 +0800 |
|---|---|---|
| committer | wangchengcheng <[email protected]> | 2023-07-27 15:43:51 +0800 |
| commit | 124f687daace8b85e5c74abac04bcd0a92744a8d (patch) | |
| tree | 4f563326b1be67cfb51bf6a04f1ca4d953536e76 /MPE/druid/bin/post-index-task-main | |
| parent | 08686ae87f9efe7a590f48db74ed133b481c85b1 (diff) | |
P19 23.07 online-configP19
Diffstat (limited to 'MPE/druid/bin/post-index-task-main')
| -rw-r--r-- | MPE/druid/bin/post-index-task-main | 176 |
1 files changed, 176 insertions, 0 deletions
diff --git a/MPE/druid/bin/post-index-task-main b/MPE/druid/bin/post-index-task-main new file mode 100644 index 0000000..03436bc --- /dev/null +++ b/MPE/druid/bin/post-index-task-main @@ -0,0 +1,176 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import argparse +import base64 +import json +import re +import sys +import time +import urllib2 +import urlparse + +def read_task_file(args): + with open(args.file, 'r') as f: + contents = f.read() + # We don't use the parsed data, but we want to throw early if it's invalid + try: + json.loads(contents) + except Exception, e: + sys.stderr.write('Invalid JSON in task file "{0}": {1}\n'.format(args.file, repr(e))) + sys.exit(1) + return contents + +def add_basic_auth_header(args, req): + if (args.user is not None): + basic_auth_encoded = base64.b64encode('%s:%s' % (args.user, args.password)) + req.add_header("Authorization", "Basic %s" % basic_auth_encoded) + +# Keep trying until timeout_at, maybe die then +def post_task(args, task_json, timeout_at): + try: + url = args.url.rstrip("/") + "/druid/indexer/v1/task" + req = urllib2.Request(url, task_json, {'Content-Type' : 'application/json'}) + add_basic_auth_header(args, req) + timeleft = timeout_at - time.time() + response_timeout = min(max(timeleft, 5), 10) + response = urllib2.urlopen(req, None, response_timeout) + return response.read().rstrip() + except urllib2.URLError as e: + if isinstance(e, urllib2.HTTPError) and e.code >= 400 and e.code <= 500: + # 4xx (problem with the request) or 500 (something wrong on the server) + raise_friendly_error(e) + elif time.time() >= timeout_at: + # No futher retries + raise_friendly_error(e) + elif isinstance(e, urllib2.HTTPError) and e.code in [301, 302, 303, 305, 307] and \ + e.info().getheader("Location") is not None: + # Set the new location in args.url so it can be used by await_task_completion and re-issue the request + location = urlparse.urlparse(e.info().getheader("Location")) + args.url = "{0}://{1}".format(location.scheme, location.netloc) + sys.stderr.write("Redirect response received, setting url to [{0}]\n".format(args.url)) + return post_task(args, task_json, timeout_at) + else: + # If at first you don't succeed, try, try again! + sleep_time = 5 + if not args.quiet: + extra = '' + if hasattr(e, 'read'): + extra = e.read().rstrip() + sys.stderr.write("Waiting up to {0}s for indexing service [{1}] to become available. [Got: {2} {3}]".format(max(sleep_time, int(timeout_at - time.time())), args.url, str(e), extra).rstrip()) + sys.stderr.write("\n") + time.sleep(sleep_time) + return post_task(args, task_json, timeout_at) + +# Keep trying until timeout_at, maybe die then +def await_task_completion(args, task_id, timeout_at): + while True: + url = args.url.rstrip("/") + "/druid/indexer/v1/task/{0}/status".format(task_id) + req = urllib2.Request(url) + add_basic_auth_header(args, req) + timeleft = timeout_at - time.time() + response_timeout = min(max(timeleft, 5), 10) + response = urllib2.urlopen(req, None, response_timeout) + response_obj = json.loads(response.read()) + response_status_code = response_obj["status"]["statusCode"] + if response_status_code in ['SUCCESS', 'FAILED']: + return response_status_code + else: + if time.time() < timeout_at: + if not args.quiet: + sys.stderr.write("Task {0} still running...\n".format(task_id)) + timeleft = timeout_at - time.time() + time.sleep(min(5, timeleft)) + else: + raise Exception("Task {0} did not finish in time!".format(task_id)) + +def raise_friendly_error(e): + if isinstance(e, urllib2.HTTPError): + text = e.read().strip() + reresult = re.search(r'<pre>(.*?)</pre>', text, re.DOTALL) + if reresult: + text = reresult.group(1).strip() + raise Exception("HTTP Error {0}: {1}, check overlord log for more details.\n{2}".format(e.code, e.reason, text)) + raise e + +def await_load_completion(args, datasource, timeout_at): + while True: + url = args.coordinator_url.rstrip("/") + "/druid/coordinator/v1/loadstatus" + req = urllib2.Request(url) + add_basic_auth_header(args, req) + timeleft = timeout_at - time.time() + response_timeout = min(max(timeleft, 5), 10) + response = urllib2.urlopen(req, None, response_timeout) + response_obj = json.loads(response.read()) + load_status = response_obj.get(datasource, 0.0) + if load_status >= 100.0: + sys.stderr.write("{0} loading complete! You may now query your data\n".format(datasource)) + return + else: + if time.time() < timeout_at: + if not args.quiet: + sys.stderr.write("{0} is {1}% finished loading...\n".format(datasource, load_status)) + timeleft = timeout_at - time.time() + time.sleep(min(5, timeleft)) + else: + raise Exception("{0} was not loaded in time!".format(datasource)) + +def main(): + parser = argparse.ArgumentParser(description='Post Druid indexing tasks.') + parser.add_argument('--url', '-u', metavar='url', type=str, default='http://localhost:8090/', help='Druid Overlord url') + parser.add_argument('--coordinator-url', type=str, default='http://localhost:8081/', help='Druid Coordinator url') + parser.add_argument('--file', '-f', type=str, required=True, help='Query JSON file') + parser.add_argument('--submit-timeout', type=int, default=120, help='Timeout (in seconds) for submitting tasks') + parser.add_argument('--complete-timeout', type=int, default=14400, help='Timeout (in seconds) for completing tasks') + parser.add_argument('--load-timeout', type=int, default=14400, help='Timeout (in seconds) for waiting for tasks to load') + parser.add_argument('--quiet', '-q', action='store_true', help='Suppress retryable errors') + parser.add_argument('--user', type=str, default=None, help='Basic auth username') + parser.add_argument('--password', type=str, default=None, help='Basic auth password') + args = parser.parse_args() + + submit_timeout_at = time.time() + args.submit_timeout + complete_timeout_at = time.time() + args.complete_timeout + + task_contents = read_task_file(args) + task_json = json.loads(task_contents) + if task_json['type'] == "compact": + datasource = task_json['dataSource'] + else: + datasource = json.loads(task_contents)["spec"]["dataSchema"]["dataSource"] + sys.stderr.write("Beginning indexing data for {0}\n".format(datasource)) + + task_id = json.loads(post_task(args, task_contents, submit_timeout_at))["task"] + + sys.stderr.write('\033[1m' + "Task started: " + '\033[0m' + "{0}\n".format(task_id)) + sys.stderr.write('\033[1m' + "Task log: " + '\033[0m' + "{0}/druid/indexer/v1/task/{1}/log\n".format(args.url.rstrip("/"),task_id)) + sys.stderr.write('\033[1m' + "Task status: " + '\033[0m' + "{0}/druid/indexer/v1/task/{1}/status\n".format(args.url.rstrip("/"),task_id)) + + task_status = await_task_completion(args, task_id, complete_timeout_at) + sys.stderr.write("Task finished with status: {0}\n".format(task_status)) + if task_status != 'SUCCESS': + sys.exit(1) + + sys.stderr.write("Completed indexing data for {0}. Now loading indexed data onto the cluster...\n".format(datasource)) + load_timeout_at = time.time() + args.load_timeout + await_load_completion(args, datasource, load_timeout_at) + +try: + main() +except KeyboardInterrupt: + sys.exit(1) |
