"""Methods to utilize for inputting credentials for the FFIEC data connection.
This module provides secure methods for inputting credentials for the FFIEC webservice data connection.
Credentials may be input via environment variables, or passing them as arguments into the class structure. Wherever possible, the credentials should not be stored in source code.
"""
import os
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Dict, Optional
import requests
from ffiec_data_connect import constants
from ffiec_data_connect.exceptions import (
ConnectionError,
CredentialError,
raise_exception,
)
[docs]
class CredentialType(Enum):
"""Enumerated values that represent the methods through which credentials are provided to the FFIEC webservice via the package.
Args:
Enum (integer): Integer that represents the credential input method
"""
NO_CREDENTIALS = 0
SET_FROM_INIT = 1
SET_FROM_ENV = 2
[docs]
class OAuth2Credentials:
"""
OAuth2-based credentials for REST API access - Phase 0 Implementation.
This credential type supports the new FFIEC REST API that uses OAuth2
Bearer tokens for authentication instead of username/password pairs.
Key Features:
- OAuth2 Bearer token authentication (90-day lifecycle)
- Token expiration tracking and validation
- Immutable after initialization for security
- Compatible with automatic protocol selection
Args:
username: FFIEC username (for UserID header)
bearer_token: OAuth2 bearer token (90-day lifecycle)
token_expires: Optional token expiration datetime
Example::
# Create OAuth2 credentials for REST API
creds = OAuth2Credentials(
username="your_ffiec_username",
bearer_token="your_90_day_bearer_token",
token_expires=datetime(2024, 3, 15) # Optional
)
# Use with existing methods (automatic REST API selection)
periods = collect_reporting_periods(session, creds)
"""
def __init__(
self, username: str, bearer_token: str, token_expires: Optional[datetime] = None
):
"""
Initialize OAuth2 credentials for REST API access.
Args:
username: FFIEC username (for UserID header)
bearer_token: OAuth2 bearer token (90-day lifecycle)
token_expires: Token expiration datetime (optional)
Raises:
CredentialError: If required credentials are missing or invalid
"""
# Validate required parameters
if not username or not username.strip():
raise_exception(
CredentialError,
"Username is required for OAuth2 credentials",
"OAuth2 credentials require a valid FFIEC username for the UserID header. "
"Please provide your FFIEC webservice username.",
credential_source="oauth2_init",
)
if not bearer_token or not bearer_token.strip():
raise_exception(
CredentialError,
"Bearer token is required for OAuth2 credentials",
"OAuth2 credentials require a valid bearer token. "
"Please obtain a 90-day bearer token from your FFIEC PWS account.",
credential_source="oauth2_init",
)
# Set credentials (immutable after this point)
self._username = username.strip()
self._bearer_token = bearer_token.strip()
self._token_expires = token_expires
# Set credential type for compatibility (before marking initialized)
self.credential_source = CredentialType.SET_FROM_INIT
self._initialized = True
# Validate token format (test-friendly JWT validation)
if not (
self._bearer_token.startswith("ey")
and self._bearer_token.endswith(".")
and len(self._bearer_token) > 16
):
raise_exception(
CredentialError,
"Bearer token appears invalid (wrong format)",
"Bearer token must start with 'ey', end with '.', and be longer than 16 characters. "
"Please verify you copied the complete JWT token from your PWS account.",
credential_source="oauth2_init",
)
@property
def username(self) -> str:
"""
Get the FFIEC username.
Returns:
Username for UserID header
"""
return self._username
@property
def bearer_token(self) -> str:
"""
Get the OAuth2 bearer token.
Returns:
Bearer token for authentication
"""
return self._bearer_token
@property
def token_expires(self) -> Optional[datetime]:
"""
Get token expiration datetime.
Returns:
Token expiration datetime or None if not set
"""
return self._token_expires
@property
def is_expired(self) -> bool:
"""
Check if token is expired or expires within 24 hours.
Returns:
True if token is expired or expires soon, False otherwise
"""
if not self._token_expires:
# No expiration date set - assume valid
return False
# Consider expired if expires within 24 hours
warning_time = datetime.now() + timedelta(hours=24)
return self._token_expires <= warning_time
[docs]
def test_credentials(self, session: requests.Session = None) -> bool:
"""
Test OAuth2 credentials against REST API (placeholder).
Note: This will be implemented when REST adapter is available.
For now, validates token format and expiration.
Args:
session: Optional requests session (for future compatibility)
Returns:
True if credentials appear valid, False otherwise
"""
# Basic validation
if not self._username or not self._bearer_token:
return False
# Check if token is expired
if self.is_expired:
print("Warning: OAuth2 token is expired or expires within 24 hours.")
return False
# TODO: Implement actual REST API test call when adapter is ready
print("OAuth2 credentials appear valid (full validation pending REST adapter).")
return True
def __str__(self) -> str:
"""String representation masking sensitive data."""
username_display = self._mask_sensitive_string(self._username)
token_display = self._mask_sensitive_string(self._bearer_token)
expiry_info = ""
if self._token_expires:
if self.is_expired:
expiry_info = ", status='EXPIRED'"
else:
days_remaining = (self._token_expires - datetime.now()).days
expiry_info = f", expires_in='{days_remaining} days'"
return f"OAuth2Credentials(username='{username_display}', token='{token_display}'{expiry_info})"
def __repr__(self) -> str:
return self.__str__()
def _mask_sensitive_string(self, value: str) -> str:
"""Mask sensitive string data, showing only first and last character."""
if not value:
return "***"
if len(value) <= 2:
return "*" * len(value)
return f"{value[0]}{'*' * (len(value) - 2)}{value[-1]}"
# Prevent modification after initialization (immutable for security)
def __setattr__(self, name: str, value: Any) -> None:
if (
getattr(self, "_initialized", False)
and not name.startswith("_")
and name != "credential_source"
):
raise_exception(
CredentialError,
f"Cannot modify {name} after initialization",
f"Cannot modify {name} after initialization. OAuth2Credentials are immutable for security.",
credential_source="oauth2_modification",
)
super().__setattr__(name, value)
[docs]
class WebserviceCredentials(object):
"""The WebserviceCredentials class. This class is used to store the credentials for the FFIEC webservice.
Args:
username (str, optional): FFIEC Webservice username. Optional: If not provided, the credentials will be set from the environment variable `FFIEC_USERNAME`
password (str, optional): FFIEC Webservice password. Optional: If not provided, the credentials will be set from the environment variable `FFIEC_PASSWORD`
Returns:
WebserviceCredentials: An instance of the WebserviceCredentials class.
"""
def __init__(
self, username: Optional[str] = None, password: Optional[str] = None
) -> None:
# Flag to track if credentials are initialized (for immutability)
self._initialized = False
# Initialize credential_source
self.credential_source: CredentialType = CredentialType.NO_CREDENTIALS
# collect the credentials from the environment variables
# if the environment variables are not set, we will set the credentials from the arguments
username_env = os.getenv("FFIEC_USERNAME")
password_env = os.getenv("FFIEC_PASSWORD")
# if we are passing in credentials, use them
if password and username:
self.username = username
self.password = password
self.credential_source = CredentialType.SET_FROM_INIT
self._initialized = True # Mark as initialized
return
# if not, check if we have the two environment variables
# do we have both environment variables?
elif username_env and password_env:
self.username = username_env
self.password = password_env
self.credential_source = CredentialType.SET_FROM_ENV
self._initialized = True # Mark as initialized
else:
# do we have a username and password?
self.credential_source = CredentialType.NO_CREDENTIALS
# Provide helpful error message based on what's missing
missing = []
if not username and not username_env:
missing.append("username (set via argument or FFIEC_USERNAME env var)")
if not password and not password_env:
missing.append("password (set via argument or FFIEC_PASSWORD env var)")
raise_exception(
CredentialError,
f"Missing required credentials: {', '.join(missing)}",
f"Missing required credentials: {', '.join(missing)}. "
"Please provide credentials either as arguments or environment variables.",
credential_source="none",
)
return
def __str__(self) -> str:
"""String representation of the credentials - shows username but masks password for security."""
if (
self.credential_source == CredentialType.NO_CREDENTIALS
or self.credential_source is None
):
return "WebserviceCredentials(status='not configured')"
# Mask username for security
username_display = (
self._mask_sensitive_string(self.username)
if hasattr(self, "_username") and self._username
else "not_set"
)
if self.credential_source == CredentialType.SET_FROM_INIT:
return (
f"WebserviceCredentials(source='init', username='{username_display}')"
)
elif self.credential_source == CredentialType.SET_FROM_ENV:
return f"WebserviceCredentials(source='environment', username='{username_display}')"
else:
return "WebserviceCredentials(source='unknown')"
def _mask_sensitive_string(self, value: str) -> str:
"""Mask sensitive string data, showing only first and last character."""
if not value:
return "***"
if len(value) <= 2:
return "*" * len(value)
return f"{value[0]}{'*' * (len(value) - 2)}{value[-1]}"
def __repr__(self) -> str:
return self.__str__()
[docs]
def test_credentials(self, session: requests.Session) -> bool:
"""Test the credentials with the FFIEC Webservice to determine if they are valid and accepted.
| Note: The session argument can be generated directly from requests, or
| using the helper class `FFIECConnection`
Args:
session (requests.Session): the connection to test the credentials
Returns:
bool: True if the credentials are valid, False otherwise
Raises:
ValueError: if the credentials are not set
Exception: Other unspecified errors
"""
# check that we have a user name
if self.username is None:
raise_exception(
CredentialError,
"Username is not set",
"Username is not set. Please provide username via constructor or FFIEC_USERNAME environment variable.",
credential_source=str(self.credential_source),
)
# check that we have a password
if self.password is None:
raise_exception(
CredentialError,
"Password is not set",
"Password is not set. Please provide password via constructor or FFIEC_PASSWORD environment variable.",
credential_source=str(self.credential_source),
)
# we have a user name and password, so try to log in
try:
# Use cached SOAP client for better performance
from ffiec_data_connect.soap_cache import get_soap_client
soap_client = get_soap_client(self, session)
print("Standby...testing your access.")
has_access_response = soap_client.service.TestUserAccess()
if has_access_response:
print("Your credentials are valid.")
return True
else:
print(
"Your credentials are invalid. Please refer to the documentation for more information."
)
print(has_access_response)
return False
print(has_access_response)
except Exception as e:
# More descriptive error message
error_msg = str(e)
if "401" in error_msg or "unauthorized" in error_msg.lower():
raise_exception(
CredentialError,
"Authentication failed",
"Authentication failed: Invalid username or password. "
"Please verify your FFIEC credentials are correct.",
credential_source=str(self.credential_source),
)
elif "connection" in error_msg.lower() or "timeout" in error_msg.lower():
raise_exception(
ConnectionError,
"Failed to connect to FFIEC webservice",
"Failed to connect to FFIEC webservice. "
"Please check your internet connection and proxy settings.",
url=constants.WebserviceConstants.base_url,
)
else:
raise_exception(
CredentialError,
f"Failed to validate credentials: {error_msg}",
f"Failed to validate credentials: {error_msg}. "
"Please refer to https://cdr.ffiec.gov/public/PWS/Home.aspx for account setup.",
credential_source=str(self.credential_source),
)
# This should never be reached due to exceptions above
return False
@property
def username(self) -> str:
"""Returns the username from the WebserviceCredentials instance.
Returns:
str: the username stored in the WebserviceCredentials instance
"""
return self._username
@username.setter
def username(self, username: str) -> None:
"""Sets the username in the WebserviceCredentials instance.
Args:
username (str): the username to set in the WebserviceCredentials instance
Raises:
RuntimeError: If credentials are already initialized (immutable)
"""
if getattr(self, "_initialized", False):
from ffiec_data_connect.exceptions import CredentialError, raise_exception
raise_exception(
CredentialError,
"Cannot modify username after initialization",
"Cannot modify username after initialization. WebserviceCredentials are immutable for security.",
credential_source=str(getattr(self, "credential_source", "unknown")),
)
self._username = username
@property
def password(self) -> str:
"""Returns the password from the WebserviceCredentials instance.
Returns:
str: the password stored in the WebserviceCredentials instance
"""
return self._password
@password.setter
def password(self, password: str) -> None:
"""Sets the password in the WebserviceCredentials instance.
Args:
password (str): the password to set in the WebserviceCredentials instance
Raises:
RuntimeError: If credentials are already initialized (immutable)
"""
if getattr(self, "_initialized", False):
from ffiec_data_connect.exceptions import CredentialError, raise_exception
raise_exception(
CredentialError,
"Cannot modify password after initialization",
"Cannot modify password after initialization. WebserviceCredentials are immutable for security.",
credential_source=str(getattr(self, "credential_source", "unknown")),
)
self._password = password