[docs]defjoin_s3_uri(bucket,key):""" Join AWS S3 URI from bucket and key. :type bucket: str :type key: str :rtype: str """return"s3://{}/{}".format(bucket,key)
[docs]defs3_key_smart_join(parts,is_dir):""" Note, it assume that there's no such double slack in your path. It ensure that there's only one consecutive "/" in the s3 key. :type parts: typing.List[str] :param parts: list of s3 key path parts, could have "/" :type is_dir: bool :param is_dir: if True, the s3 key ends with "/". otherwise enforce no tailing "/". :rtype: str Example:: >>> s3_key_smart_join(parts=["/a/", "b/", "/c"], is_dir=True) a/b/c/ >>> s3_key_smart_join(parts=["/a/", "b/", "/c"], is_dir=False) a/b/c """new_parts=list()forpartinparts:new_parts.extend([chunkforchunkinpart.split("/")ifchunk])key="/".join(new_parts)ifis_dir:returnkey+"/"else:returnkey
[docs]defmake_s3_console_url(bucket=None,prefix=None,s3_uri=None):""" Return an AWS Console url that you can use to open it in your browser. :type bucket: str :type prefix: str :type s3_uri: str :rtype: str """ifs3_uriisNone:if(bucketisNone)or(prefixisNone):raiseValueErrorelse:passelse:if(bucketisnotNone)or(prefixisnotNone):raiseValueErrorbucket,prefix=split_s3_uri(s3_uri)ifprefix.endswith("/"):s3_type="buckets"else:s3_type="object"return"https://s3.console.aws.amazon.com/s3/{s3_type}/{bucket}?prefix={prefix}".format(s3_type=s3_type,bucket=bucket,prefix=prefix)
[docs]defs3_uri_to_url(s3_uri):""" Convert a S3 URI to AWS S3 Console url for preview. :type s3_uri :rtype: str """bucket,key=split_s3_uri(s3_uri)returnmake_s3_console_url(bucket=bucket,prefix=key)
[docs]defensure_s3_object(s3_key_or_uri):""" Raise exception if the string is not in valid format for a AWS S3 object """ifs3_key_or_uri.endswith("/"):raiseValueError("'{}' doesn't represent s3 object!".format(s3_key_or_uri))
[docs]defensure_s3_dir(s3_key_or_uri):""" Raise exception if the string is not in valid format for a AWS S3 directory """ifnots3_key_or_uri.endswith("/"):raiseValueError("'{}' doesn't represent s3 dir!".format(s3_key_or_uri))
[docs]defstrip_comment_line_with_symbol(line,start):# pragma: no cover""" Strip comments from line string. """parts=line.split(start)counts=[len(findall(r'(?:^|[^"\\]|(?:\\\\|\\")+)(")',part))forpartinparts]total=0fornr,countinenumerate(counts):total+=countiftotal%2==0:returnstart.join(parts[:nr+1]).rstrip()else:# pragma: no coverreturnline.rstrip()
[docs]defstrip_comments(string,comment_symbols=frozenset(('#','//'))):# pragma: no cover""" Strip comments from json string. :param string: A string containing json with comments started by comment_symbols. :param comment_symbols: Iterable of symbols that start a line comment (default # or //). :return: The string with the comments removed. """lines=string.splitlines()forkinrange(len(lines)):forsymbolincomment_symbols:lines[k]=strip_comment_line_with_symbol(lines[k],start=symbol)return'\n'.join(lines)
[docs]defremove_if_exists(abspath):""" Remove a file or a directory (and it's files) if exists. :type abspath: str """ifnotos.path.exists(abspath):returnifos.path.isdir(abspath):shutil.rmtree(abspath)else:os.remove(abspath)
[docs]defmakedir_if_not_exists(abspath):""" Make a directory and all required parent folder if not exists. :type abspath: str """ifos.path.exists(abspath):returnos.makedirs(abspath)
[docs]defcopy_python_code(from_dir,to_dir):""" Copy all python source code from one directory to another. Skip ``__pycache__``, ``.pyc`` and ``.pyo`` files. :type from_dir: str :type to_dir: str """remove_if_exists(to_dir)pycache="__pycache__"fordirname,_,basename_listinos.walk(from_dir):relpath=os.path.relpath(dirname,from_dir)target_dir=os.path.abspath(os.path.join(to_dir,relpath))iftarget_dir.endswith(pycache):continueelse:makedir_if_not_exists(target_dir)forbasenameinbasename_list:# ignore .pyc, the compiled byte code# and .pyo, the optimized import cacheifbasename.endswith(".pyc")orbasename.endswith(".pyo"):continueelse:source_path=os.path.join(dirname,basename)target_path=os.path.join(target_dir,basename)shutil.copyfile(source_path,target_path)