# -*- coding: utf-8 -*-
"""
This module implements the ``pygitrepo`` Python development workflow
best practice logics.
"""
from __future__ import print_function, unicode_literals
try:
import typing
except ImportError: # pragma: no cover
pass
import os
import subprocess
import functools
from zipfile import ZipFile
from .pkg.mini_six import input
from .pkg.fingerprint import fingerprint
from .repo_config import RepoConfig
from .operation_system import (
IS_WINDOWS, IS_MACOS, IS_LINUX,
OPEN_COMMAND
)
from .helpers import (
remove_if_exists, makedir_if_not_exists,
join_s3_uri,
split_s3_uri,
make_s3_console_url,
s3_uri_to_url,
s3_key_smart_join,
ensure_s3_dir,
copy_python_code,
)
from .color_print import (
Fore, Style, TAB,
pgr_print, pgr_print_done, print_path, print_line, colorful_path,
)
[docs]def subcommand(
name=None,
help=None,
):
"""
A decorator that mark a function / class method a sub command for CLI.
:type name: str
:type help: str
"""
def real_deco(func):
if name is None:
func._subcommand_name = func.__name__.replace("_", "-")
else:
func._subcommand_name = name.replace("_", "-")
func._subcommand_help = help
func._is_subcommand = True
@functools.wraps(func)
def wrapper(*args, **kwargs):
res = func(*args, **kwargs)
return res
return wrapper
return real_deco
[docs]class Actions(object):
"""
A class container that includes ``pgr`` CLI interface logic.
Each command method wrapped with a :func:`subcommand` decorator is a
underlying logic for ``pgr ${subcommand}``.
Usually each function has a ``**kwargs`` optional keyword arguments.
It can be used to store optional command line arguments.
"""
# ==============================================================================
# Python Package Development Related
# ==============================================================================
[docs] @subcommand(
help="** Create virtualenv for python package development.",
)
def venv_up(self, config, _dry_run=False, **kwargs):
"""
:type config: RepoConfig
"""
pgr_print(
"{cyan}create a virtualenv at {reset}{dir_venv}".format(
cyan=Fore.CYAN,
reset=Style.RESET_ALL,
dir_venv=config.dir_venv,
)
)
if os.path.exists(config.dir_venv):
pgr_print(
"{cyan}{tab}skip, virtualenv directory already exists!".format(
cyan=Fore.CYAN,
tab=TAB,
)
)
else:
if _dry_run is False:
# call the global virtualenv command
subprocess.call([
"virtualenv",
"-p",
config.path_bin_global_python,
config.dir_venv,
])
# call pip install command
subprocess.call([
config.path_venv_bin_pip,
"install",
"--upgrade",
"pip",
])
pgr_print_done(indent=1)
[docs] @subcommand(
help="** Remove the virtualenv for this project.",
)
def venv_remove(self, config, _dry_run=False, **kwargs):
"""
:type config: RepoConfig
"""
pgr_print(
"{cyan}remove virtualenv at {reset}{dir_venv}".format(
cyan=Fore.CYAN,
reset=Style.RESET_ALL,
dir_venv=config.dir_venv,
)
)
if os.path.exists(config.dir_venv):
if _dry_run is False:
remove_if_exists(config.dir_venv)
pgr_print_done(indent=1)
else:
pgr_print(
"{cyan}{tab}skip, virtualenv directory doesn't exists!".format(
cyan=Fore.CYAN,
tab=TAB,
)
)
[docs] @subcommand(
help="** Clean temp files.",
)
def clean(
self,
config,
ignore_tox=False,
_dry_run=False,
**kwargs
):
"""
:type config: RepoConfig
"""
pgr_print("{cyan}remove all temp files ...".format(cyan=Fore.CYAN))
to_delete = [
config.dir_pypi_build,
config.dir_sphinx_doc_build,
config.dir_coverage_annotate,
config.dir_pytest_cache,
]
if ignore_tox is False:
to_delete.append(config.dir_tox_cache)
for p in to_delete:
pgr_print(
"{cyan} clean {reset}{path} {cyan}...".format(
cyan=Fore.CYAN,
reset=Style.RESET_ALL,
path=p,
)
)
if _dry_run is False:
remove_if_exists(p)
pgr_print_done(indent=1)
[docs] @subcommand(
help="Uninstall the package from virtualenv.",
)
def pip_uninstall(self, config, _dry_run=False, **kwargs):
"""
:type config: RepoConfig
"""
pgr_print(
"{cyan}uninstall '{package_name}' from virtualenv".format(
cyan=Fore.CYAN,
package_name=config.PACKAGE_NAME.get_value(),
)
)
if _dry_run is False:
subprocess.call([
config.path_venv_bin_pip,
"uninstall",
"-y",
config.PACKAGE_NAME.get_value(),
])
pgr_print_done(indent=1)
[docs] @subcommand(
help="** Install the package to virtualenv in editable mode.",
)
def pip_dev_install(self, config, _dry_run=False, **kwargs):
"""
:type config: RepoConfig
"""
self.pip_uninstall(config, _dry_run=_dry_run, **kwargs)
pgr_print(
"{cyan}install '{package_name}' to virtualenv in editable mode".format(
cyan=Fore.CYAN,
package_name=config.PACKAGE_NAME.get_value(),
)
)
if _dry_run is False:
subprocess.call([
config.path_venv_bin_pip,
"install",
"--editable",
config.dir_project_root,
])
pgr_print_done(indent=1)
[docs] @subcommand(
help="Install the package to virtualenv in regular mode.",
)
def pip_install(self, config, _dry_run=False, **kwargs):
"""
:type config: RepoConfig
"""
self.pip_uninstall(config, _dry_run=_dry_run, **kwargs)
pgr_print(
"{cyan}install '{package_name}' to virtualenv".format(
cyan=Fore.CYAN,
package_name=config.PACKAGE_NAME.get_value(),
)
)
if _dry_run is False:
subprocess.call([
config.path_venv_bin_pip,
"install",
config.dir_project_root,
])
pgr_print_done(indent=1)
[docs] @subcommand(
help="** Display useful information",
)
def info(self, config, _dry_run=False, **kwargs):
"""
:type config: RepoConfig
"""
pgr_print(
"{cyan}Display useful information, {green}path exists{cyan}, {red}path not exists".format(
cyan=Fore.CYAN,
green=Fore.GREEN,
red=Fore.RED,
)
)
print_path(
"virtual environment directory",
config.dir_venv,
)
print_path(
"venv python executable path",
config.path_venv_bin_python
)
print_path(
"venv pip executable path",
config.path_venv_bin_pip,
)
if IS_WINDOWS:
print_line(
"- {cyan}activate venv command: {path}".format(
cyan=Fore.CYAN,
path=colorful_path(config.path_venv_bin_activate),
)
)
elif IS_MACOS or IS_LINUX:
print_line(
"- {cyan}activate venv command: {reset}source {path}".format(
cyan=Fore.CYAN,
reset=Style.RESET_ALL,
path=colorful_path(config.path_venv_bin_activate),
)
)
else:
raise NotImplementedError
print_line(
"- {cyan}deactivate venv command: {reset}deactivate".format(
cyan=Fore.CYAN,
reset=Style.RESET_ALL,
)
)
print_path(
"site-packages directory",
config.dir_venv_site_packages,
)
print_path(
"site-packages64 directory",
config.dir_venv_site_packages_64,
)
[docs] @subcommand(
help="** Install dev dependencies in requirements-dev.txt",
)
def req_dev(self, config, _dry_run=False, **kwargs):
"""
:type config: RepoConfig
"""
pgr_print(
"{cyan}install dependencies in {reset}requirements-dev.txt to virtualenv".format(
cyan=Fore.CYAN,
reset=Style.RESET_ALL,
)
)
if _dry_run is False:
subprocess.call([
config.path_venv_bin_pip,
"install",
"-r",
config.path_requirements_dev_file,
])
pgr_print_done(indent=1)
[docs] @subcommand(
help="Install doc dependencies in requirements-doc.txt",
)
def req_doc(self, config, _dry_run=False, **kwargs):
"""
:type config: RepoConfig
"""
pgr_print(
"{cyan}install dependencies in {reset}requirements-doc.txt to virtualenv".format(
cyan=Fore.CYAN,
reset=Style.RESET_ALL,
)
)
if _dry_run is False:
subprocess.call([
config.path_venv_bin_pip,
"install",
"-r",
config.path_requirements_doc_file,
])
pgr_print_done(indent=1)
[docs] @subcommand(
help="Install dev dependencies in requirements-test.txt",
)
def req_test(self, config, _dry_run=False, **kwargs):
"""
:type config: RepoConfig
"""
pgr_print(
"{cyan}install dependencies in {reset}requirements-dev.txt to virtualenv".format(
cyan=Fore.CYAN,
reset=Style.RESET_ALL,
)
)
if _dry_run is False:
subprocess.call([
config.path_venv_bin_pip,
"install",
"-r",
config.path_requirements_test_file,
])
pgr_print_done(indent=1)
[docs] @subcommand(
help="Display requirements file content",
)
def req_info(self, config, _dry_run=False, **kwargs):
"""
:type config: RepoConfig
"""
pgr_print(
"{cyan}dependencies in {reset}requirements.txt".format(
cyan=Fore.CYAN,
reset=Style.RESET_ALL,
)
)
print()
subprocess.call([
"cat",
config.path_requirements_file,
])
print()
pgr_print(
"{cyan}dependencies in {reset}requirements-dev.txt".format(
cyan=Fore.CYAN,
reset=Style.RESET_ALL,
)
)
print()
subprocess.call([
"cat",
config.path_requirements_dev_file,
])
print()
pgr_print(
"{cyan}dependencies in {reset}requirements-doc.txt".format(
cyan=Fore.CYAN,
reset=Style.RESET_ALL,
)
)
print()
subprocess.call([
"cat",
config.path_requirements_doc_file,
])
print()
pgr_print(
"{cyan}dependencies in {reset}requirements-test.txt".format(
cyan=Fore.CYAN,
reset=Style.RESET_ALL,
)
)
print()
subprocess.call([
"cat",
config.path_requirements_test_file,
])
print()
[docs] @subcommand(
name="test-only",
help="Run unit test with pytest.",
)
def test_pytest_only(self, config, _dry_run=False, **kwargs): # pragma: no cover
"""
:type config: RepoConfig
"""
pgr_print(
"{cyan}Run unit test in {reset}{dir_tests}".format(
cyan=Fore.CYAN,
reset=Style.RESET_ALL,
dir_tests=config.dir_tests,
)
)
if _dry_run is False:
cwd = os.getcwd()
os.chdir(config.dir_project_root)
subprocess.call([
config.path_venv_bin_pytest,
config.dir_tests,
"-s",
])
os.chdir(cwd)
pgr_print_done(indent=1)
[docs] @subcommand(
name="test",
help="** Run unit test with pytest. Start over, reuse nothing.",
)
def test_pytest(self, config, _dry_run=False, **kwargs): # pragma: no cover
"""
:type config: RepoConfig
"""
self.req_test(config, _dry_run=_dry_run, **kwargs)
self.pip_dev_install(config, _dry_run=_dry_run, **kwargs)
if _dry_run is False:
remove_if_exists(config.dir_pytest_cache)
self.test_pytest_only(config, _dry_run=_dry_run, **kwargs)
[docs] @subcommand(
name="cov-only",
help="Run code coverage test in pytest.",
)
def test_cov_only(self, config, _dry_run=False, **kwargs): # pragma: no cover
"""
:type config: RepoConfig
"""
pgr_print(
"{cyan}Run code coverage test in {reset}{dir_tests}".format(
cyan=Fore.CYAN,
reset=Style.RESET_ALL,
dir_tests=config.dir_tests,
)
)
if _dry_run is False:
cwd = os.getcwd()
os.chdir(config.dir_project_root)
subprocess.call([
config.path_venv_bin_pytest,
config.dir_tests,
"-s",
"--cov={}".format(config.PACKAGE_NAME.get_value()),
"--cov-report", "term-missing",
"--cov-report", "html:{}".format(config.dir_coverage_html),
])
os.chdir(cwd)
pgr_print_done(indent=1)
[docs] @subcommand(
name="cov",
help="** Run code coverage test in pytest. Start over, reuse nothing.",
)
def test_cov(self, config, _dry_run=False, **kwargs): # pragma: no cover
"""
:type config: RepoConfig
"""
self.req_test(config, _dry_run=_dry_run, **kwargs)
self.pip_dev_install(config, _dry_run=_dry_run, **kwargs)
if _dry_run is False:
remove_if_exists(config.dir_pytest_cache)
remove_if_exists(config.dir_coverage_annotate)
self.test_cov_only(config, _dry_run=_dry_run, **kwargs)
[docs] @subcommand(
help="** View code coverage test result in web browser.",
)
def view_cov(self, config, _dry_run=False, **kwargs):
"""
:type config: RepoConfig
"""
pgr_print(
"{cyan}Open code coverage test html result in web browser{reset}{cov_html_index}".format(
cyan=Fore.CYAN,
reset=Style.RESET_ALL,
cov_html_index=config.path_coverage_html_index,
)
)
if not os.path.exists(config.path_coverage_html_index):
raise ValueError("code coverage test results not available yet, you may run 'pgr cov' first!")
if _dry_run is False:
subprocess.call([OPEN_COMMAND, config.path_coverage_html_index])
pgr_print_done(indent=1)
[docs] @subcommand(
name="tox-only",
help="Run matrix test in tox with pytest",
)
def test_tox_only(self, config, _dry_run=False, **kwargs): # pragma: no cover
"""
:type config: RepoConfig
"""
pgr_print(
"{cyan}Run matrix test with tox settings in {reset}tox.ini".format(
cyan=Fore.CYAN,
reset=Style.RESET_ALL,
dir_tests=config.dir_tests,
)
)
if _dry_run is False:
subprocess.call([
config.path_venv_bin_tox,
"--workdir \"{}\"".format(config.dir_project_root),
])
pgr_print_done(indent=1)
[docs] @subcommand(
name="tox",
help="** Run matrix test in tox with pytest. Start over, reuse nothing.",
)
def test_tox(self, config, _dry_run=False, **kwargs): # pragma: no cover
"""
:type config: RepoConfig
"""
self.req_test(config, _dry_run=_dry_run, **kwargs)
self.pip_dev_install(config, _dry_run=_dry_run, **kwargs)
if _dry_run is False:
remove_if_exists(config.dir_tox_cache)
self.test_tox_only(config, _dry_run=_dry_run, **kwargs)
[docs] @subcommand(
help="Build local documents with sphinx-doc.",
)
def build_doc_only(self, config, _dry_run=False, **kwargs):
"""
:type config: RepoConfig
"""
pgr_print(
"{cyan}Build doc at {reset}{doc_build_html}".format(
cyan=Fore.CYAN,
reset=Style.RESET_ALL,
doc_build_html=config.dir_sphinx_doc_build_html,
)
)
if _dry_run is False:
# equivalent to source ./bin/activate
os.environ["PATH"] = config.dir_venv_bin + os.pathsep + os.environ.get("PATH", "")
os.environ["VIRTUAL_ENV"] = config.dir_venv
# call sphinx make html command
subprocess.call([
"make",
"-C", config.dir_sphinx_doc,
"html",
])
pgr_print_done(indent=1)
[docs] @subcommand(
help="** Build local documents with sphinx-doc, start over, reuse nothing.",
)
def build_doc(self, config, _dry_run=False, **kwargs):
"""
:type config: RepoConfig
"""
self.req_doc(config, _dry_run=_dry_run, **kwargs)
self.pip_dev_install(config, _dry_run=_dry_run, **kwargs)
if _dry_run is False:
remove_if_exists(config.dir_sphinx_doc_build)
remove_if_exists(os.path.join(
config.dir_sphinx_doc_source, config.PACKAGE_NAME.get_value()))
self.build_doc_only(config, _dry_run=_dry_run, **kwargs)
[docs] @subcommand(
help="Clear recently built local documents.",
)
def clean_doc(self, config, _dry_run=False, **kwargs):
"""
:type config: RepoConfig
"""
pgr_print(
"{cyan}Clean recently built doc at {reset}{doc_build}".format(
cyan=Fore.CYAN,
reset=Style.RESET_ALL,
doc_build=config.dir_sphinx_doc_build,
)
)
if _dry_run is False:
remove_if_exists(config.dir_sphinx_doc_build)
pgr_print_done(indent=1)
[docs] @subcommand(
help="** View recently build local documents.",
)
def view_doc(self, config, _dry_run=False, **kwargs):
"""
:type config: RepoConfig
"""
pgr_print(
"{cyan}Open recently built local html doc {reset}{doc_build_html}".format(
cyan=Fore.CYAN,
reset=Style.RESET_ALL,
doc_build_html=config.dir_sphinx_doc_build_html,
)
)
if os.path.exists(
os.path.join(config.dir_sphinx_doc_build_html, "index.html")
):
path_doc_index_html = os.path.join(config.dir_sphinx_doc_build_html, "index.html")
elif os.path.exists(
os.path.join(config.dir_sphinx_doc_build_html, "README.html")
):
path_doc_index_html = os.path.join(config.dir_sphinx_doc_build_html, "README.html")
else:
raise ValueError("documentation index html file not exists!")
if _dry_run is False:
subprocess.call([OPEN_COMMAND, path_doc_index_html])
pgr_print_done(indent=1)
def _deploy_doc_to_s3(
self,
config,
doc_version,
s3_uri_doc_dir,
doc_host_aws_profile,
_dry_run=False,
**kwargs
):
"""
Deploy local html doc to S3.
:type config: RepoConfig
:type doc_version: str
:param doc_version: "latest" or "versioned"
:type s3_uri_doc_dir: str
:type doc_host_aws_profile: str
"""
ensure_s3_dir(s3_uri_doc_dir)
pgr_print(
"{cyan}deploy doc from local to s3 as {doc_version} doc ...".format(
cyan=Fore.CYAN,
doc_version=doc_version,
)
)
pgr_print(
"{cyan}{tab}aws s3 rm {reset}{s3_uri_doc_dir}".format(
cyan=Fore.CYAN,
reset=Style.RESET_ALL,
tab=TAB,
s3_uri_doc_dir=s3_uri_doc_dir,
)
)
args = [
"aws", "s3", "rm", s3_uri_doc_dir,
"--recursive",
"--only-show-errors",
]
if doc_host_aws_profile is not None:
args.extend(["--profile", doc_host_aws_profile])
if _dry_run is False:
subprocess.call(args)
pgr_print(
"{cyan}{tab}aws s3 sync {reset}{dir_sphinx_doc_build_html} {s3_uri_doc_dir}".format(
cyan=Fore.CYAN,
reset=Style.RESET_ALL,
tab=TAB,
dir_sphinx_doc_build_html=config.dir_sphinx_doc_build_html,
s3_uri_doc_dir=s3_uri_doc_dir,
)
)
args = [
"aws", "s3", "sync",
config.dir_sphinx_doc_build_html, s3_uri_doc_dir,
"--only-show-errors",
]
if doc_host_aws_profile is not None:
args.extend(["--profile", doc_host_aws_profile])
if _dry_run is False:
subprocess.call(args)
s3_console_url = s3_uri_to_url(s3_uri_doc_dir)
pgr_print(
"{cyan}{tab}view {doc_version} doc at {reset}{s3_console_url}".format(
cyan=Fore.CYAN,
reset=Style.RESET_ALL,
tab=TAB,
doc_version=doc_version,
s3_console_url=s3_console_url,
)
)
pgr_print_done(indent=1)
[docs] @subcommand(
help="Deploy local html doc to S3 as versioned document",
)
def deploy_doc_to_versioned(self, config, _dry_run=False, **kwargs):
"""
:type config: RepoConfig
"""
self._deploy_doc_to_s3(
config,
doc_version="versioned",
s3_uri_doc_dir=config.s3_uri_doc_dir_versioned,
doc_host_aws_profile=config.DOC_HOST_AWS_PROFILE.get_value(),
_dry_run=_dry_run,
**kwargs
)
[docs] @subcommand(
help="Deploy local html doc to S3 as latest document",
)
def deploy_doc_to_latest(self, config, _dry_run=False, **kwargs):
"""
:type config: RepoConfig
"""
self._deploy_doc_to_s3(
config,
doc_version="latest",
s3_uri_doc_dir=config.s3_uri_doc_dir_latest,
doc_host_aws_profile=config.DOC_HOST_AWS_PROFILE.get_value(),
_dry_run=_dry_run,
**kwargs
)
[docs] @subcommand(
help="Deploy local html doc to S3 as versioned document, and also as latest document optionally.",
)
def deploy_doc(self, config, _dry_run=False, **kwargs):
"""
:type config: RepoConfig
"""
pgr_print("{cyan}deploy doc from local to s3 ...".format(cyan=Fore.CYAN))
self.deploy_doc_to_versioned(config, _dry_run=_dry_run, **kwargs)
pgr_print(
"{cyan}also deploy latest doc (y/n)?".format(
cyan=Fore.CYAN,
tab=TAB,
)
)
entered = str(input("")).lower().strip()
if entered in ["y", "yes"]:
self.deploy_doc_to_latest(config, _dry_run=_dry_run, **kwargs)
else:
pgr_print_done(indent=1)
[docs] @subcommand(
name="publish",
help="** Publish this Package to https://pypi.org/.",
)
def publish_to_pypi(self, config, _dry_run=False, **kwargs):
"""
:type config: RepoConfig
"""
self.req_dev(config, _dry_run=_dry_run, **kwargs)
self.pip_dev_install(config, _dry_run=_dry_run, **kwargs)
pgr_print(
"{cyan}Publish {package_name} to {reset}https://pypi.org/project/{package_name}/".format(
cyan=Fore.CYAN,
reset=Style.RESET_ALL,
package_name=config.PACKAGE_NAME.get_value()
)
)
if _dry_run is False:
remove_if_exists(config.dir_pypi_build)
remove_if_exists(config.dir_pypi_distribute)
remove_if_exists(config.dir_pypi_egg)
cwd = os.getcwd()
os.chdir(config.dir_project_root)
try:
if _dry_run is False:
subprocess.call([
config.path_venv_bin_python,
"setup.py",
"sdist",
"bdist_wheel",
"--universal",
])
subprocess.call([
config.path_venv_bin_twine,
"upload",
"dist/*",
])
except:
pass
os.chdir(cwd)
pgr_print_done(indent=1)
[docs] @subcommand(
help="Run Jupyter notebook locally.",
)
def run_jupyter_notebook(self, config, _dry_run=False, **kwargs):
"""
:type config: RepoConfig
"""
pgr_print("{cyan}run jupyter notebook ...".format(cyan=Fore.CYAN))
if _dry_run is False:
subprocess.call([
config.path_venv_bin_jupyter,
"notebook",
])
# ==============================================================================
# AWS Lambda Related
# ==============================================================================
[docs] @subcommand(
help="Build AWS Lambda source code zip file.",
)
def build_lambda_source_code(self, config, _dry_run=False, **kwargs):
"""
:type config: RepoConfig
"""
pgr_print(
"{cyan}build lambda source code at {reset}{path}".format(
cyan=Fore.CYAN,
reset=Style.RESET_ALL,
path=config.path_lambda_build_source
)
)
# filter all python source code need to add to the zip
to_zip_list = list()
for dirname, _, basename_list in os.walk(config.dir_python_lib):
# ignore __pycache__
if dirname.endswith("__pycache__"):
continue
for basename in basename_list:
# ignore .pyc, .pyo,
if basename.endswith(".pyc") or basename.endswith(".pyo"):
continue
source_path = os.path.join(dirname, basename)
archive_path = os.path.relpath(source_path, config.dir_project_root)
to_zip_list.append((source_path, archive_path))
if _dry_run is False:
makedir_if_not_exists(config.dir_lambda_build)
remove_if_exists(config.path_lambda_build_source)
with ZipFile(config.path_lambda_build_source, "w") as f:
for source_path, archive_path in to_zip_list:
f.write(source_path, archive_path)
pgr_print_done(indent=1)
def _upload_lambda_zip(
self,
config,
source_or_layer,
path,
_dry_run=False,
**kwargs
):
"""
:type config: RepoConfig
:param source_or_layer: "source code" or "layer"
"""
pgr_print(
"{cyan}upload lambda {source_or_layer} from {reset}{path} {cyan}to AWS S3".format(
cyan=Fore.CYAN,
reset=Style.RESET_ALL,
source_or_layer=source_or_layer,
path=path,
)
)
if source_or_layer == "source code":
s3_uri_lambda_deploy_versioned_dir = config.s3_uri_lambda_deploy_versioned_source_dir
elif source_or_layer == "layer":
s3_uri_lambda_deploy_versioned_dir = config.s3_uri_lambda_deploy_versioned_layer_dir
else:
raise ValueError
if os.path.exists(path):
source_md5 = fingerprint.of_file(path)
bucket, prefix = split_s3_uri(s3_uri_lambda_deploy_versioned_dir)
key = s3_key_smart_join(
parts=[prefix, "{}.zip".format(source_md5)],
is_dir=False,
)
s3_uri = join_s3_uri(bucket, key)
pgr_print(
"{cyan}{tab}upload to {reset}{s3_uri}".format(
cyan=Fore.CYAN,
tab=TAB,
reset=Style.RESET_ALL,
s3_uri=s3_uri,
)
)
args = [
"aws", "s3", "cp",
path, s3_uri,
]
aws_profile = config.AWS_LAMBDA_DEPLOY_AWS_PROFILE.get_value()
if aws_profile is not None:
args.extend(["--profile", aws_profile])
if _dry_run is False:
subprocess.call(args)
pgr_print_done(indent=1)
else:
pgr_print(
"{red}{tab}{path} {cyan}not found!".format(
tab=TAB,
red=Fore.RED,
cyan=Fore.CYAN,
path=path,
)
)
[docs] @subcommand(
help="Upload AWS Lambda source code zip file to S3.",
)
def upload_lambda_source_code(self, config, _dry_run=False, **kwargs):
"""
:type config: RepoConfig
"""
self._upload_lambda_zip(
config,
source_or_layer="source code",
path=config.path_lambda_build_source,
_dry_run=_dry_run,
**kwargs
)
def _find_docker(self):
"""
Find the absolute path of the docker executable
"""
if IS_MACOS or IS_LINUX:
locations = [
os.path.join("/", "usr", "bin", "docker"),
os.path.join("/", "usr", "local", "bin", "docker"),
]
for p in locations:
if os.path.exists(p):
return p
raise EnvironmentError("We cannot find docker, are you sure docker is installed?")
else:
raise EnvironmentError("We can only find docker bin on MacOS or Linux!")
[docs] @subcommand(
help="Build lambda layer using a AWS lambda runtime compatible docker image.",
)
def build_lambda_layer(self, config, _dry_run=False, **kwargs):
"""
:type config: RepoConfig
"""
config.ensure_attr_not_none(config.AWS_LAMBDA_BUILD_DOCKER_IMAGE.name)
config.ensure_attr_not_none(config.AWS_LAMBDA_BUILD_DOCKER_IMAGE_WORKSPACE_DIR.name)
pgr_print(
"{cyan}build lambda layer in {reset}{docker_image} {cyan}docker container at {reset}{path}".format(
cyan=Fore.CYAN,
reset=Style.RESET_ALL,
docker_image=config.AWS_LAMBDA_BUILD_DOCKER_IMAGE.get_value(),
path=config.path_lambda_build_layer
)
)
makedir_if_not_exists(config.dir_lambda_build)
path_bin_docker = self._find_docker()
container_only_build_lbd_layer_script_path = os.path.join(
config.AWS_LAMBDA_BUILD_DOCKER_IMAGE_WORKSPACE_DIR.get_value(),
"bin",
"container-only-build-lambda-layer.sh",
)
if _dry_run is False:
subprocess.call([
path_bin_docker, "run",
"-v", "{}:{}".format(
config.dir_project_root,
config.AWS_LAMBDA_BUILD_DOCKER_IMAGE_WORKSPACE_DIR.get_value(),
),
"--rm", config.AWS_LAMBDA_BUILD_DOCKER_IMAGE.get_value(),
"bash", container_only_build_lbd_layer_script_path,
])
[docs] @subcommand(
help="Upload AWS Lambda layer zip file to S3.",
)
def upload_lambda_layer(self, config, _dry_run=False, **kwargs):
"""
:type config: RepoConfig
"""
self._upload_lambda_zip(
config,
source_or_layer="layer",
path=config.path_lambda_build_layer,
_dry_run=_dry_run,
**kwargs
)
[docs] @subcommand(
help="Deploy recently built AWS lambda layer.",
)
def deploy_lambda_layer(self, config, _dry_run=False, **kwargs):
"""
:type config: RepoConfig
"""
pgr_print(
"{cyan}deploy a new version of lambda layer from AWS S3".format(
cyan=Fore.CYAN,
reset=Style.RESET_ALL,
)
)
if os.path.exists(config.path_lambda_build_layer):
source_md5 = fingerprint.of_file(config.path_lambda_build_layer)
bucket, prefix = split_s3_uri(config.s3_uri_lambda_deploy_versioned_layer_dir)
key = s3_key_smart_join(
parts=[prefix, "{}.zip".format(source_md5)],
is_dir=False,
)
s3_console_url = make_s3_console_url(bucket=bucket, prefix=key)
pgr_print(
"{cyan}{tab}run {reset}'aws lambda publish-layer-version' {cyan}command, preview layer file at {reset}{s3_console_url}".format(
cyan=Fore.CYAN,
tab=TAB,
reset=Style.RESET_ALL,
s3_console_url=s3_console_url,
)
)
args = [
"aws", "lambda", "publish-layer-version",
"--layer-name", config.aws_lambda_layer_name,
"--description",
"dependency layer for all functions in '{}' project".format(config.aws_lambda_layer_name),
"--content", "S3Bucket={},S3Key={}".format(
config.AWS_LAMBDA_DEPLOY_S3_BUCKET.get_value(), key,
),
"--compatible-runtimes", "python{}.{}".format(
config.DEV_PY_VER_MAJOR.get_value(),
config.DEV_PY_VER_MINOR.get_value()
),
]
aws_profile = config.AWS_LAMBDA_DEPLOY_AWS_PROFILE.get_value()
if aws_profile is not None:
args.extend(["--profile", aws_profile])
if _dry_run is False:
subprocess.call(args)
pgr_print(
"{cyan}{tab}open {reset}{url} {cyan}to view layer".format(
cyan=Fore.CYAN,
tab=TAB,
reset=Style.RESET_ALL,
url=config.url_lambda_layer_console,
)
)
pgr_print_done(indent=1)
else:
pgr_print(
"{red}{tab}{path} {cyan}not found!".format(
tab=TAB,
red=Fore.RED,
cyan=Fore.CYAN,
path=config.path_lambda_build_layer,
)
)
[docs] @subcommand(
name="bud-lambda-layer",
help="** Build, upload and deploy a new AWS lambda layer.",
)
def build_upload_deploy_lambda_layer(self, config, _dry_run=False, **kwargs):
"""
:type config: RepoConfig
"""
self.build_lambda_layer(config, _dry_run=_dry_run)
self.upload_lambda_layer(config, _dry_run=_dry_run, **kwargs)
self.deploy_lambda_layer(config, _dry_run=_dry_run, **kwargs)
def _ensure_chalice_lambda_app_dir(self, config):
"""
:type config: RepoConfig
"""
files = [
config.path_aws_chalice_config_json,
config.path_aws_chalice_app_py,
]
for p in files:
if not os.path.exists(p):
raise ValueError("{} not exists".format(p))
[docs] @subcommand(
help="** Deploy AWS Lambda app with AWS Chalice.",
)
def chalice_deploy(self, config, _dry_run=False, **kwargs):
"""
:type config: RepoConfig
"""
pgr_print(
"{cyan}Deploy AWS Lambda app with AWS Chalice".format(
cyan=Fore.CYAN,
reset=Style.RESET_ALL,
)
)
self._ensure_chalice_lambda_app_dir(config)
# reset the ``vendor`` dir
remove_if_exists(config.dir_aws_chalice_vendor)
makedir_if_not_exists(config.dir_aws_chalice_vendor)
# copy source code to ``vendor`` dir
copy_python_code(
config.dir_python_lib,
config.dir_aws_chalice_vendor_source,
)
# invoke chalice cli
args = [
config.path_venv_bin_chalice,
"--project-dir", config.dir_lambda_app,
"deploy",
]
args.extend(kwargs["_args"])
aws_profile = config.AWS_LAMBDA_DEPLOY_AWS_PROFILE.get_value()
if aws_profile is not None:
args.extend(["--profile", aws_profile])
if _dry_run is False:
subprocess.call(args)
[docs] @subcommand(
help="** Delete AWS Lambda app with AWS Chalice.",
)
def chalice_delete(self, config, _dry_run=False, **kwargs):
"""
:type config: RepoConfig
"""
pgr_print(
"{cyan}Delete AWS Lambda app with AWS Chalice".format(
cyan=Fore.CYAN,
reset=Style.RESET_ALL,
)
)
self._ensure_chalice_lambda_app_dir(config)
args = [
config.path_venv_bin_chalice,
"--project-dir", config.dir_lambda_app,
"delete",
]
args.extend(kwargs["_args"])
aws_profile = config.AWS_LAMBDA_DEPLOY_AWS_PROFILE.get_value()
if aws_profile is not None:
args.extend(["--profile", aws_profile])
if _dry_run is False:
subprocess.call(args)
actions = Actions()