import os import string import arcpy # import multiprocessing # Test Function (used for testing connection to class file) # ---------------------------------------------------------- def connection_test(text_message): revised_text_message = text_message + "Connection successful" return revised_text_message # Logger Function # ---------------- def configure_logging(log_file): import logging _logger = logging.getLogger(__name__) _logger.setLevel(logging.DEBUG) # Create file handler and send messages to file fh = logging.FileHandler(log_file) fh.setLevel(logging.DEBUG) # Create console handler and send messages to console ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) # Create formatter and and add it to the handlers formatter = logging.Formatter('Line:%(lineno)d|%(asctime)s|%(levelname)s|%(message)s') fh.setFormatter(formatter) ch.setFormatter(formatter) # Add the handlers to the logger _logger.addHandler(fh) _logger.addHandler(ch) return _logger # Function to Set Common System Environment Parameters # ----------------------------------------------------- def set_environment(logger, env): logger.info("Setting environment parameters... ") _connection_file_folder = "C:\\Users\\arcgissvc\\Documents\\connection_files\\" if env == "alpha": _portal_url = "https://win2008r2.com/portal/" _arcgis_server_url = "https://win2008r2.com/server/" _enterprise_db_conn = "PostgreSQL_945_arcgis_SDE.sde" elif env == "dev": _portal_url = "https://www-proxy-dev.nccs.nasa.gov/arcgis/" _arcgis_server_url = "https://www-proxy-dev.nccs.nasa.gov/server/" _enterprise_db_conn = "arcdb02_arcgis_sde.sde" elif env == "prod": _portal_url = "https://maps.nccs.nasa.gov/arcgis/" _arcgis_server_url = "https://maps.nccs.nasa.gov/server/" _enterprise_db_conn = "arcdb04_arcgis_sde.sde" elif env == "egis": _portal_url = "https://gis.adapt/arcgis/" _arcgis_server_url = "https://gis03.atusrvm.adapt.nccs.nasa.gov/server/" _enterprise_db_conn = "arcdb02_arcgis_sde.sde" elif env == "drg": _portal_url = "https://maps.disasters.nasa.gov/arcgis/" _arcgis_server_url = "https://maps.disasters.nasa.gov/ags03/" _enterprise_db_conn = "arcdb04_arcgis_sde.sde" elif env == "wingis01": _portal_url = "https://www-proxy-dev.nccs.nasa.gov/arcgis/" _arcgis_server_url = "https://www-proxy-dev.nccs.nasa.gov/server/" _enterprise_db_conn = "gs6062wingis01_arcgis_sde.sde" else: # Unknown _portal_url = "" _arcgis_server_url = "" _enterprise_db_conn = "" _result_array = [_connection_file_folder, _portal_url, _arcgis_server_url, _enterprise_db_conn] return _result_array # Create Directory Function # -------------------------- def create_directory(target_path): import errno try: os.makedirs(target_path) except OSError as exception: if exception.errno != errno.EEXIST: raise # Import CSV to List Function # ---------------------------- def import_from_csv(logger, csv_file): import csv logger.info("Importing the CSV list into memory") try: with open(csv_file, 'rb') as f: _reader = csv.reader(f) _csv_list_raw = list(_reader) logger.info("_csv_list_raw ==> " + str(_csv_list_raw)) _csv_list = [l[0] for l in _csv_list_raw] logger.info("_csv_list ==> " + str(_csv_list)) return _csv_list except Exception as e: logger.error("Unable to import the CSV list - " + str(e)) # Export Table to CSV Function # ----------------------------- def export_to_csv(logger, input_table, csv_file): import unicodecsv as csv logger.info("Exporting " + input_table + " to a CSV...") _input_table = input_table _csv_file = csv_file logger.info("_input_table ==> " + _input_table) logger.info("_csv_file ==> " + _csv_file) try: _field_list = arcpy.ListFields(_input_table) _field_names = [_field.name for _field in _field_list] with open(_csv_file, "wb") as _target_csv: writer = csv.writer(_target_csv) writer.writerow(_field_names) with arcpy.da.SearchCursor(_input_table, _field_names) as cursor: for row in cursor: writer.writerow(row) logger.info(_csv_file + " created.") # _csv_file.close() except Exception as e: logger.error("Unable to export the CSV file - " + str(e)) # Get Extents Function # --------------------- def get_extents(spatial_object): _desc = arcpy.Describe(spatial_object) _x_min = _desc.extent.XMin _y_min = _desc.extent.YMin _x_max = _desc.extent.XMax _y_max = _desc.extent.YMax _result_list = [_x_min, _y_min, _x_max, _y_max] return _result_list # Generate Random ID Function # ---------------------------- def id_generator(size, chars=string.ascii_letters + string.digits): import random return ''.join(random.choice(chars) for _ in range(size)) # Check String for Unicode Function # ---------------------------------- def unicode_replace(logger, check_string, action): # Options for action are 'clean' and 'info' if isinstance(check_string, str): logger.info("Basic string not changed ==> " + check_string) elif isinstance(check_string, unicode): logger.info("Unicode string DETECTED ==> " + check_string) if action == "clean": check_string = check_string.encode("ascii", "replace") logger.info("Unicode string REPLACED ==> " + check_string) else: logger.info("Unicode String not changed ==> " + check_string) else: logger.info("Not a string ==>" + check_string) return check_string # Replace Character Function # --------------------------- def replace_string(logger, original_string, char_from, char_to): logger.info("Replacing characters for string ==> " + original_string) logger.info("Original characters ==> " + char_from) logger.info("New characters ==> " + char_to) _new_string = original_string.replace(char_from, char_to) return _new_string # Synchronize Replica Function # ----------------------------- def synchronize_replica(logger, geodatabase_1_admin_conn_full_path, replica_name, geodatabase_2_admin_conn_full_path, direction, conflict_policy, conflict_definition): _geodatabase_1_admin_conn_full_path = geodatabase_1_admin_conn_full_path _replica_name = replica_name _geodatabase_2_admin_conn_full_path = geodatabase_2_admin_conn_full_path _direction = direction _conflict_policy = conflict_policy _conflict_definition = conflict_definition logger.info("_geodatabase_1_admin_conn_full_path ==> " + _geodatabase_1_admin_conn_full_path) logger.info("_replica_name ==> " + _replica_name) logger.info("_geodatabase_2_admin_conn_full_path ==> " + _geodatabase_2_admin_conn_full_path) logger.info("_direction ==> " + _direction) logger.info("_conflict_policy ==> " + _conflict_policy) logger.info("_conflict_definition ==> " + _conflict_definition) try: # Process: Synchronize replica changes logger.info("Synchronizing replicas...") arcpy.SynchronizeChanges_management(_geodatabase_1_admin_conn_full_path, _replica_name, _geodatabase_2_admin_conn_full_path, _direction, _conflict_policy, _conflict_definition, "DO_NOT_RECONCILE") logger.info("done") except Exception as e: logger.error("Unable to synchronize replicas - " + str(e)) # Parallel Processing Config Function # ------------------------------------ # def parallel_process_config(logger, process_count): # logger.info("Configuring parallel processing settings") # # _process_count = process_count # # _multiprocess_handler = multiprocessing.Pool(_process_count) # # return _multiprocess_handler # Copy File Function # ------------------- def copy_file(logger, source_directory, source_file_name, destination_directory, destination_file_name): from shutil import copyfile _source_directory = source_directory _source_file_name = source_file_name _destination_directory = destination_directory _destination_file_name = destination_file_name logger.info("_source_directory ==> " + _source_directory) logger.info("_source_file_name ==> " + _source_file_name) logger.info("_destination_directory ==> " + _destination_directory) logger.info("_destination_file_name ==> " + _destination_file_name) try: # Process: Copy file logger.info("Copying file to target location ==> " + _destination_directory + _destination_file_name) copyfile(_source_directory + _source_file_name, _destination_directory + _destination_file_name) logger.info("done") except Exception as e: logger.error("Unable to copy file - " + str(e)) # Get Item Modified Date Function # -------------------------------- def get_modified_date(logger, target_item): import datetime _target_item = target_item logger.info("_target_item ==> " + _target_item) try: # Process: Get item modified date logger.info("Getting modified date for item ==> " + _target_item) _item_modified_date = os.path.getmtime(_target_item) logger.info("_item_modified_date ==> " + str(_item_modified_date)) _item_modified_date_clean = datetime.datetime.fromtimestamp(_item_modified_date).strftime("%Y%m%d_%H%M") logger.info("_item_modified_date_clean ==> " + _item_modified_date_clean) return _item_modified_date_clean except Exception as e: logger.error("Unable to get item modified date - " + str(e)) _item_modified_date_clean = "" return _item_modified_date_clean # Zip File or Directory Function # ------------------------------- def manage_zip_archive(logger, source_item, target_zip_file, file_or_directory): _source_item = source_item _target_zip_file = target_zip_file _file_or_directory = file_or_directory # Options are "file" or "directory" only logger.info("_source_item ==> " + _source_item) logger.info("_target_zip_file ==> " + _target_zip_file) logger.info("_file_or_directory ==> " + _file_or_directory) if _file_or_directory == "directory": import shutil _root_directory = _source_item.rsplit("\\", 1)[0] + "\\" _base_directory = _source_item.rsplit("\\", 1)[1] logger.info("_root_directory ==> " + _root_directory) logger.info("_base_directory ==> " + _base_directory) # Zip the contents of the entire directory logger.info("Creating directory zip file ==> " + _target_zip_file) logger.info("Adding files to zip archive from directory ==> " + _source_item) shutil.make_archive(_source_item, "zip", _root_directory, _base_directory) else: import os import zipfile _zip_file = zipfile.ZipFile(_target_zip_file, "a", zipfile.ZIP_DEFLATED) if not os.path.exists(_target_zip_file): logger.info("Creating zip file ==> " + _target_zip_file) # Add the item to the zip file logger.info("Adding item to zip file ==> " + _source_item) _zip_file.write(_source_item, os.path.basename(_source_item)) _zip_file.close() logger.info("Zip process complete") # Get Portal Token from Portal Function # -------------------------------------- def get_portal_token(logger, username, password, portal_url, expiration_time): import requests from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) _target_url = portal_url + "sharing/rest/generateToken" _expiration_time = expiration_time logger.info("_target_url ==> " + _target_url) logger.info("_expiration_time ==> " + _expiration_time) # Set up the data payload for Portal data = {'username': username, 'password': password, 'client': 'requestip', 'expiration': _expiration_time, 'f': 'json'} headers = {"Content-type": "application/x-www-form-urlencoded", "Accept": "text/plain"} try: # Connect to URL and post parameters r = requests.post(_target_url, data=data, headers=headers, verify=False) # Read response if r.status_code != 200: logger.error("Error while fetching tokens from admin URL. Please check the URL and try again.") return "Error while fetching tokens from admin URL. Please check the URL and try again." else: # Extract the token _token = r.json() logger.info("_token ==> " + _token['token']) return _token['token'] except Exception as e: logger.error("Unable to generate token - " + str(e)) return "Failed" # Update Portal Item File Function # --------------------------------- def update_portal_item_file(logger, portal_url, token, portal_item_owner, portal_folder_id, portal_item_id, file_to_upload, file_type): import requests from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) _portal_url = portal_url _token = token _portal_item_owner = portal_item_owner _portal_folder_id = portal_folder_id _portal_item_id = portal_item_id _file_to_upload = file_to_upload _file_type = file_type # Acceptable options are: File geodatabase, CSV, Shapefile (for now) logger.info("_portal_url ==> " + _portal_url) logger.info("_token ==> " + _token) logger.info("_portal_item_owner ==> " + _portal_item_owner) logger.info("_portal_folder_id ==> " + _portal_folder_id) logger.info("_portal_item_id ==> " + _portal_item_id) logger.info("_file_to_upload ==> " + _file_to_upload) logger.info("_file_type ==> " + _file_type) _full_url = _portal_url + "sharing/rest/content/users/" + _portal_item_owner + "/" + _portal_folder_id + \ "/items/" + _portal_item_id + "/update" logger.info("_full_url ==> " + _full_url) # Set up the request # ------------------- _files = {"file": open(_file_to_upload, "rb")} params = {"token": _token, "f": "json", "overwrite": "true", "type": _file_type} logger.info("params ==> " + str(params)) try: # Connect to the URL and get results r = requests.post(_full_url, data=params, files=_files) # Read response if r.status_code != 200: logger.error("Error while attempting to post to the server") else: response = r.json() if "error" not in response: logger.info("Post to server was successful. Response ==> " + str(response)) else: logger.error("Error posting to the server. Response ==> " + str(response)) except Exception as e: logger.error("The post_to_server function returned an error - " + str(e))