Feat (crypto): Add URL-safe encoding option to AESCipher (#2984)
This commit is contained in:
parent
4be01a405c
commit
7021add193
2 changed files with 33 additions and 11 deletions
|
|
@ -225,18 +225,34 @@ def create_access_token(data: dict, token_expire_minutes: Optional[int] = None)
|
|||
return jwt.encode(to_encode, settings.auth_secret_key, "HS256")
|
||||
|
||||
|
||||
def encrypt_internal_message(m: Optional[str] = None) -> Optional[str]:
|
||||
"""Encrypt message with the internal secret key"""
|
||||
def encrypt_internal_message(
|
||||
m: Optional[str] = None, urlsafe: bool = False
|
||||
) -> Optional[str]:
|
||||
"""
|
||||
Encrypt message with the internal secret key
|
||||
|
||||
Args:
|
||||
m: Message to encrypt
|
||||
urlsafe: Whether to use URL-safe base64 encoding
|
||||
"""
|
||||
if not m:
|
||||
return None
|
||||
return AESCipher(key=settings.auth_secret_key).encrypt(m.encode())
|
||||
return AESCipher(key=settings.auth_secret_key).encrypt(m.encode(), urlsafe=urlsafe)
|
||||
|
||||
|
||||
def decrypt_internal_message(m: Optional[str] = None) -> Optional[str]:
|
||||
"""Decrypt message with the internal secret key"""
|
||||
def decrypt_internal_message(
|
||||
m: Optional[str] = None, urlsafe: bool = False
|
||||
) -> Optional[str]:
|
||||
"""
|
||||
Decrypt message with the internal secret key
|
||||
|
||||
Args:
|
||||
m: Message to decrypt
|
||||
urlsafe: Whether the message uses URL-safe base64 encoding
|
||||
"""
|
||||
if not m:
|
||||
return None
|
||||
return AESCipher(key=settings.auth_secret_key).decrypt(m)
|
||||
return AESCipher(key=settings.auth_secret_key).decrypt(m, urlsafe=urlsafe)
|
||||
|
||||
|
||||
def filter_dict_keys(data: dict, filter_keys: Optional[list[str]]) -> dict:
|
||||
|
|
|
|||
|
|
@ -63,10 +63,15 @@ class AESCipher:
|
|||
final_key += key
|
||||
return final_key[:output]
|
||||
|
||||
def decrypt(self, encrypted: str) -> str:
|
||||
def decrypt(self, encrypted: str, urlsafe: bool = False) -> str:
|
||||
"""Decrypts a string using AES-256-CBC."""
|
||||
passphrase = self.passphrase
|
||||
encrypted_bytes = base64.b64decode(encrypted)
|
||||
|
||||
if urlsafe:
|
||||
encrypted_bytes = base64.urlsafe_b64decode(encrypted)
|
||||
else:
|
||||
encrypted_bytes = base64.b64decode(encrypted)
|
||||
|
||||
assert encrypted_bytes[0:8] == b"Salted__"
|
||||
salt = encrypted_bytes[8:16]
|
||||
key_iv = self.bytes_to_key(passphrase.encode(), salt, 32 + 16)
|
||||
|
|
@ -78,13 +83,14 @@ class AESCipher:
|
|||
except UnicodeDecodeError as exc:
|
||||
raise ValueError("Wrong passphrase") from exc
|
||||
|
||||
def encrypt(self, message: bytes) -> str:
|
||||
def encrypt(self, message: bytes, urlsafe: bool = False) -> str:
|
||||
passphrase = self.passphrase
|
||||
salt = Random.new().read(8)
|
||||
key_iv = self.bytes_to_key(passphrase.encode(), salt, 32 + 16)
|
||||
key = key_iv[:32]
|
||||
iv = key_iv[32:]
|
||||
aes = AES.new(key, AES.MODE_CBC, iv)
|
||||
return base64.b64encode(
|
||||
b"Salted__" + salt + aes.encrypt(self.pad(message))
|
||||
encoded = b"Salted__" + salt + aes.encrypt(self.pad(message))
|
||||
return (
|
||||
base64.urlsafe_b64encode(encoded) if urlsafe else base64.b64encode(encoded)
|
||||
).decode()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue