Python
CGPT SDK Docs
ChainGPT AI News Generator SDK Documentation
The ChainGPT AI News Generator provides access to curated blockchain and cryptocurrency news with advanced filtering capabilities. You can retrieve the latest news articles, filter by categories, tokens, and search terms, and implement pagination for large datasets.
Table of Contents
Installation
Quick Start
Initialization and Setup
NewsService API Reference
Data Models Reference
Usage Examples
Error Handling
Common Category and Token IDs
Best Practices
Installation
Install the ChainGPT SDK via pip:
pip install chaingpt
Or add to your requirements.txt
:
chaingpt>=1.1.3
For environment variable management (recommended):
pip install python-dotenv
Quick Start
import asyncio
import os
from chaingpt.client import ChainGPTClient
async def main():
# Initialize client
client = ChainGPTClient(api_key=os.getenv("CHAINGPT_API_KEY"))
# Get latest news
news = await client.news.get_news(limit=5)
# Display results
for article in news.data:
print(f"{article.title} - {article.createdAt}")
await client.close()
if __name__ == "__main__":
asyncio.run(main())
Initialization and Setup
Environment Variables
Set up your API key as an environment variable for security:
export CHAINGPT_API_KEY="your_api_key_here"
Or use a .env
file:
CHAINGPT_API_KEY=your_api_key_here
Basic Client Setup
import asyncio
import os
from chaingpt.client import ChainGPTClient
from chaingpt.exceptions import ChainGPTError, APIError, ValidationError
from dotenv import load_dotenv # Optional: for .env file support
# Load environment variables
load_dotenv()
async def main():
# Initialize the client
api_key = os.getenv("CHAINGPT_API_KEY")
if not api_key:
raise ValueError("CHAINGPT_API_KEY environment variable is required")
client = ChainGPTClient(api_key=api_key)
try:
# Access the news service
news_service = client.news
# Your code here
finally:
# Always close the client
await client.close()
if __name__ == "__main__":
asyncio.run(main())
NewsService API Reference
get_news()
Method
get_news()
MethodRetrieves AI-generated news articles with comprehensive filtering and pagination capabilities.
Method Signature
async def get_news(
self,
category_id: Optional[Union[int, List[int]]] = None,
sub_category_id: Optional[Union[int, List[int]]] = None,
token_id: Optional[Union[int, List[int]]] = None,
search_query: Optional[str] = None,
fetch_after: Optional[str] = None,
limit: Optional[int] = 10,
offset: Optional[int] = 0,
sort_by: Optional[str] = "createdAt",
) -> GetNewsResponseModel
Parameters
category_id
Optional[Union[int, List[int]]]
None
Filter by one or more category IDs. Examples: 8
(NFT), [8, 12]
(multiple categories)
sub_category_id
Optional[Union[int, List[int]]]
None
Filter by one or more sub-category IDs. Examples: 15
(Ethereum), [15, 39]
(multiple)
token_id
Optional[Union[int, List[int]]]
None
Filter by specific token IDs. Examples: 79
(Bitcoin), [79, 1027]
(multiple tokens)
search_query
Optional[str]
None
Keyword search for title/description. Examples: "halving"
, "defi"
, "nft"
fetch_after
Optional[str]
None
Date filter in YYYY-MM-DD
format. Only returns articles published after this date
limit
Optional[int]
10
Maximum articles to return (pagination). Range: 1-100
offset
Optional[int]
0
Number of articles to skip (pagination)
sort_by
Optional[str]
"createdAt"
Sort field. Currently supports "createdAt"
Return Value
Returns a GetNewsResponseModel
object containing:
statusCode
Optional[int]
HTTP status code (200 for success)
message
Optional[str]
Response message
data
List[NewsArticleModel]
Array of news articles
limit
Optional[int]
Applied limit parameter
offset
Optional[int]
Applied offset parameter
total
Optional[int]
Total number of articles available
Raises
ValidationError
: Invalid parameters or malformed requestAPIError
: Server-side errors or invalid responsesAuthenticationError
: Invalid API keyRateLimitError
: Too many requestsChainGPTError
: Other SDK-related errors
Data Models Reference
NewsArticleModel
Represents a single news article with complete metadata.
class NewsArticleModel(BaseModel):
# Basic article information
id: int # Unique article identifier
title: str # Article headline
description: str # Article summary/description
pubDate: str # Publication date (ISO format)
author: str # Article author name
imageUrl: str # Featured image URL
createdAt: str # Creation timestamp
updatedAt: str # Last update timestamp
# Publishing information
isPublished: bool # Publication status
isFeatured: int # Featured article flag (0/1)
isTopStory: int # Top story flag (0/1)
viewsCount: int # Number of article views
# Classification
categoryId: Optional[int] # Primary category ID
subCategoryId: Optional[int] # Sub-category ID
tokenId: Optional[int] # Related token/cryptocurrency ID
# Relationships (populated objects)
category: Optional[CategoryModel] # Category details
subCategory: Optional[SubCategoryModel] # Sub-category details
token: Optional[TokenModel] # Token details
media: Optional[MediaModel] # Media/image details
newsTags: List[Any] # Article tags
# Internal fields
userId: Optional[int] # Author user ID
mediaId: int # Media file ID
deletedAt: Optional[str] # Deletion timestamp (if soft-deleted)
CategoryModel
class CategoryModel(BaseModel):
id: int # Category identifier
name: str # Category name (e.g., "NFT", "DeFi")
isBlockchain: bool # Blockchain-related category flag
isToken: bool # Token-specific category flag
createdAt: str # Creation timestamp
updatedAt: str # Last update timestamp
deletedAt: Optional[str] # Deletion timestamp (if applicable)
SubCategoryModel
class SubCategoryModel(BaseModel):
id: int # Sub-category identifier
name: str # Sub-category name (e.g., "Ethereum", "Bitcoin")
isBlockchain: bool # Blockchain-related flag
isToken: bool # Token-specific flag
createdAt: str # Creation timestamp
updatedAt: str # Last update timestamp
deletedAt: Optional[str] # Deletion timestamp (if applicable)
TokenModel
class TokenModel(BaseModel):
id: int # Token identifier
name: str # Token name (e.g., "Bitcoin", "Ethereum")
isBlockchain: bool # Blockchain-related flag
isToken: bool # Token classification flag
createdAt: str # Creation timestamp
updatedAt: str # Last update timestamp
deletedAt: Optional[str] # Deletion timestamp (if applicable)
MediaModel
class MediaModel(BaseModel):
id: int # Media file identifier
fileName: str # Original filename
fileDescriptor: str # File description
mimeType: str # MIME type (e.g., "image/jpeg")
fileSize: int # File size in bytes
createdAt: Optional[str] # Creation timestamp
updatedAt: Optional[str] # Last update timestamp
deletedAt: Optional[str] # Deletion timestamp (if applicable)
GetNewsResponseModel
class GetNewsResponseModel(BaseModel):
statusCode: Optional[int] # HTTP status code
message: Optional[str] # Response message
data: List[NewsArticleModel] # Array of news articles
limit: Optional[int] # Applied limit parameter
offset: Optional[int] # Applied offset parameter
total: Optional[int] # Total articles available
Usage Examples
Basic News Retrieval
async def get_latest_news():
client = ChainGPTClient(api_key=os.getenv("CHAINGPT_API_KEY"))
try:
# Get latest 5 articles
news = await client.news.get_news(limit=5)
print(f"Found {news.total} total articles")
print(f"Showing {len(news.data)} articles:")
for article in news.data:
print(f"\n📰 {article.title}")
print(f" 👤 {article.author}")
print(f" 📅 {article.createdAt}")
print(f" 👀 {article.viewsCount} views")
print(f" 📝 {article.description[:100]}...")
# Show category information if available
if article.category:
print(f" 🏷️ Category: {article.category.name}")
if article.subCategory:
print(f" 🏷️ Sub-category: {article.subCategory.name}")
if article.token:
print(f" 🪙 Token: {article.token.name}")
except ChainGPTError as e:
print(f"Error: {e}")
finally:
await client.close()
Filtering by Category and Sub-Category
async def get_nft_ethereum_news():
client = ChainGPTClient(api_key=os.getenv("CHAINGPT_API_KEY"))
try:
# Get NFT news related to Ethereum
news = await client.news.get_news(
category_id=8, # NFT category
sub_category_id=[15, 39], # Ethereum-related subcategories
limit=10
)
print(f"Found {news.total} NFT/Ethereum articles:")
for article in news.data:
print(f"\n🎨 {article.title}")
print(f" 📅 Published: {article.pubDate}")
print(f" 🏷️ Category: {article.category.name if article.category else 'N/A'}")
print(f" 🏷️ Sub-category: {article.subCategory.name if article.subCategory else 'N/A'}")
except ChainGPTError as e:
print(f"Error: {e}")
finally:
await client.close()
Search with Keywords
async def search_bitcoin_halving():
client = ChainGPTClient(api_key=os.getenv("CHAINGPT_API_KEY"))
try:
# Search for Bitcoin halving news
news = await client.news.get_news(
search_query="halving",
token_id=79, # Bitcoin token ID
limit=5
)
print(f"Found {news.total} Bitcoin halving articles:")
for article in news.data:
print(f"\n₿ {article.title}")
print(f" 📅 {article.createdAt}")
print(f" 🪙 Token: {article.token.name if article.token else 'N/A'}")
print(f" 📝 {article.description}")
except ChainGPTError as e:
print(f"Error: {e}")
finally:
await client.close()
Date Range Filtering
async def get_recent_news():
client = ChainGPTClient(api_key=os.getenv("CHAINGPT_API_KEY"))
try:
# Get news from the last month
from datetime import datetime, timedelta
# Calculate date 30 days ago
thirty_days_ago = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d")
news = await client.news.get_news(
fetch_after=thirty_days_ago,
limit=10,
sort_by="createdAt"
)
print(f"Found {news.total} articles from the last 30 days:")
for article in news.data:
print(f"\n📰 {article.title}")
print(f" 📅 {article.createdAt}")
print(f" 👀 {article.viewsCount} views")
except ChainGPTError as e:
print(f"Error: {e}")
finally:
await client.close()
Pagination Implementation
async def paginate_news():
client = ChainGPTClient(api_key=os.getenv("CHAINGPT_API_KEY"))
try:
page_size = 5
current_page = 0
while True:
offset = current_page * page_size
news = await client.news.get_news(
limit=page_size,
offset=offset
)
if not news.data:
print("No more articles to display")
break
print(f"\n{'='*50}")
print(f"Page {current_page + 1}")
print(f"Showing articles {offset + 1}-{offset + len(news.data)} of {news.total}")
print(f"{'='*50}")
for i, article in enumerate(news.data, 1):
print(f"\n{offset + i}. {article.title}")
print(f" 📅 {article.createdAt}")
print(f" 👀 {article.viewsCount} views")
# User interaction
user_input = input(f"\nShow next page? (y/n/q to quit): ").lower()
if user_input in ['n', 'q', 'quit']:
break
current_page += 1
except ChainGPTError as e:
print(f"Error: {e}")
finally:
await client.close()
Complex Multi-Filter Query
async def complex_news_search():
client = ChainGPTClient(api_key=os.getenv("CHAINGPT_API_KEY"))
try:
# Complex query: Multiple categories, tokens, and search terms
news = await client.news.get_news(
category_id=[8, 12], # NFT and DeFi categories
token_id=[79, 1027], # Bitcoin and Ethereum
search_query="market", # Market-related news
fetch_after="2024-01-01", # This year only
limit=15
)
print(f"Complex search results: {news.total} articles found")
print("Filters applied:")
print(" - Categories: NFT, DeFi")
print(" - Tokens: Bitcoin, Ethereum")
print(" - Search: 'market'")
print(" - Date: After 2024-01-01")
print(f"{'='*50}")
# Group by category for better display
by_category = {}
for article in news.data:
cat_name = article.category.name if article.category else "Uncategorized"
if cat_name not in by_category:
by_category[cat_name] = []
by_category[cat_name].append(article)
for category, articles in by_category.items():
print(f"\n📂 {category} ({len(articles)} articles):")
for article in articles:
print(f" • {article.title}")
print(f" 🪙 {article.token.name if article.token else 'N/A'}")
print(f" 👀 {article.viewsCount} views")
except ChainGPTError as e:
print(f"Error: {e}")
finally:
await client.close()
Error Handling
Exception Types
The SDK provides specific exception types for different error scenarios:
from chaingpt.exceptions import (
ChainGPTError, # Base exception
APIError, # API response errors
ValidationError, # Request validation errors
AuthenticationError, # Authentication failures
RateLimitError, # Rate limit exceeded
InsufficientCreditsError, # Insufficient account credits
NotFoundError, # Endpoint not found
ServerError, # Server-side errors
TimeoutError, # Request timeout
ConfigurationError # SDK configuration errors
)
Comprehensive Error Handling
async def robust_news_fetching():
client = ChainGPTClient(api_key=os.getenv("CHAINGPT_API_KEY"))
try:
news = await client.news.get_news(
category_id=8,
limit=10
)
# Check response status
if news.statusCode != 200:
print(f"API returned status {news.statusCode}: {news.message}")
return
# Process successful response
for article in news.data:
print(f"📰 {article.title}")
except ValidationError as e:
print(f"❌ Validation Error: {e.message}")
if hasattr(e, 'field') and e.field:
print(f" Field: {e.field}")
# Log details for debugging
if e.details:
print(f" Details: {e.details}")
except AuthenticationError as e:
print(f"🔐 Authentication Error: {e.message}")
print(" Please check your API key")
except RateLimitError as e:
print(f"⏰ Rate Limit Error: {e.message}")
if hasattr(e, 'retry_after') and e.retry_after:
print(f" Retry after: {e.retry_after} seconds")
except InsufficientCreditsError as e:
print(f"💳 Insufficient Credits: {e.message}")
print(" Please check your account balance")
except APIError as e:
print(f"🌐 API Error: {e.message}")
if e.status_code:
print(f" Status Code: {e.status_code}")
if e.details:
print(f" Details: {e.details}")
except TimeoutError as e:
print(f"⏱️ Timeout Error: {e.message}")
print(" Try again or increase timeout settings")
except ChainGPTError as e:
print(f"🔧 SDK Error: {e.message}")
if e.details:
print(f" Details: {e.details}")
except Exception as e:
print(f"💥 Unexpected Error: {str(e)}")
print(" Please report this issue")
finally:
await client.close()
Error Handling with Retry Logic
import asyncio
from typing import Optional
async def fetch_news_with_retry(
client: ChainGPTClient,
max_retries: int = 3,
delay: float = 1.0
) -> Optional[GetNewsResponseModel]:
"""Fetch news with automatic retry on certain errors."""
for attempt in range(max_retries):
try:
news = await client.news.get_news(limit=10)
return news
except RateLimitError as e:
if attempt == max_retries - 1:
raise
# Wait for the retry-after period if specified
wait_time = e.retry_after if e.retry_after else delay * (2 ** attempt)
print(f"Rate limited. Waiting {wait_time} seconds before retry {attempt + 1}/{max_retries}")
await asyncio.sleep(wait_time)
except (TimeoutError, ServerError) as e:
if attempt == max_retries - 1:
raise
wait_time = delay * (2 ** attempt) # Exponential backoff
print(f"Temporary error: {e.message}. Retrying in {wait_time} seconds...")
await asyncio.sleep(wait_time)
except (AuthenticationError, ValidationError, InsufficientCreditsError):
# Don't retry on these errors
raise
return None
Common Category and Token IDs
Categories
8
NFT
Non-Fungible Token related news
12
DeFi
Decentralized Finance news
Sub-Categories (Ethereum-related)
15
Ethereum
Ethereum blockchain news
39
Ethereum
Alternative Ethereum category
Tokens
79
Bitcoin
BTC
Bitcoin cryptocurrency
1027
Ethereum
ETH
Ethereum cryptocurrency
Note: These IDs are examples and may change. Use the API responses to discover current category, sub-category, and token IDs. Consider caching this information for better performance.
Discovering Available IDs
async def discover_categories():
"""Example function to discover available categories from API responses."""
client = ChainGPTClient(api_key=os.getenv("CHAINGPT_API_KEY"))
try:
# Fetch a large sample of news to discover categories
news = await client.news.get_news(limit=100)
categories = {}
sub_categories = {}
tokens = {}
for article in news.data:
# Collect category information
if article.category:
categories[article.category.id] = article.category.name
# Collect sub-category information
if article.subCategory:
sub_categories[article.subCategory.id] = article.subCategory.name
# Collect token information
if article.token:
tokens[article.token.id] = article.token.name
print("Available Categories:")
for cat_id, cat_name in sorted(categories.items()):
print(f" {cat_id}: {cat_name}")
print("\nAvailable Sub-Categories:")
for sub_id, sub_name in sorted(sub_categories.items()):
print(f" {sub_id}: {sub_name}")
print("\nAvailable Tokens:")
for token_id, token_name in sorted(tokens.items()):
print(f" {token_id}: {token_name}")
except ChainGPTError as e:
print(f"Error discovering categories: {e}")
finally:
await client.close()
Best Practices
1. Efficient API Usage
# ✅ Good: Use specific filters to reduce response size
news = await client.news.get_news(
category_id=8, # Specific category
limit=10, # Reasonable limit
fetch_after="2024-01-01" # Recent news only
)
# ❌ Avoid: Fetching all news without filters
news = await client.news.get_news(limit=1000) # Too large, may timeout
2. Proper Pagination
# ✅ Good: Implement proper pagination
async def get_all_news(client, category_id):
all_articles = []
offset = 0
page_size = 50
while True:
news = await client.news.get_news(
category_id=category_id,
limit=page_size,
offset=offset
)
if not news.data:
break
all_articles.extend(news.data)
offset += page_size
# Respect rate limits
await asyncio.sleep(0.1)
return all_articles
3. Error Handling and Logging
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def fetch_news_safely():
try:
news = await client.news.get_news(limit=10)
logger.info(f"Successfully fetched {len(news.data)} articles")
return news
except ChainGPTError as e:
logger.error(f"ChainGPT API error: {e.message}")
if e.details:
logger.debug(f"Error details: {e.details}")
raise
4. Resource Management
# ✅ Good: Always close the client
async def process_news():
client = ChainGPTClient(api_key=os.getenv("CHAINGPT_API_KEY"))
try:
# Your news processing code
news = await client.news.get_news()
return news
finally:
await client.close()
# ✅ Better: Use async context manager if available
async def process_news_with_context():
async with ChainGPTClient(api_key=os.getenv("CHAINGPT_API_KEY")) as client:
news = await client.news.get_news()
return news
5. Caching for Performance
import asyncio
from datetime import datetime, timedelta
from typing import Dict, Optional
class NewsCache:
def __init__(self, ttl_minutes: int = 5):
self.cache: Dict[str, tuple] = {}
self.ttl = timedelta(minutes=ttl_minutes)
def _make_key(self, **kwargs) -> str:
return str(sorted(kwargs.items()))
def get(self, **kwargs) -> Optional[GetNewsResponseModel]:
key = self._make_key(**kwargs)
if key in self.cache:
data, timestamp = self.cache[key]
if datetime.now() - timestamp < self.ttl:
return data
else:
del self.cache[key]
return None
def set(self, data: GetNewsResponseModel, **kwargs):
key = self._make_key(**kwargs)
self.cache[key] = (data, datetime.now())
# Usage
cache = NewsCache(ttl_minutes=10)
async def get_cached_news(client, **kwargs):
# Try cache first
cached = cache.get(**kwargs)
if cached:
return cached
# Fetch from API
news = await client.news.get_news(**kwargs)
cache.set(news, **kwargs)
return news
6. Rate Limiting
import asyncio
from datetime import datetime, timedelta
class RateLimiter:
def __init__(self, calls_per_minute: int = 60):
self.calls_per_minute = calls_per_minute
self.calls = []
async def wait_if_needed(self):
now = datetime.now()
# Remove calls older than 1 minute
self.calls = [call_time for call_time in self.calls
if now - call_time < timedelta(minutes=1)]
if len(self.calls) >= self.calls_per_minute:
# Wait until the oldest call is more than 1 minute old
sleep_time = 60 - (now - self.calls[0]).total_seconds()
if sleep_time > 0:
await asyncio.sleep(sleep_time)
self.calls.append(now)
# Usage
rate_limiter = RateLimiter(calls_per_minute=30)
async def rate_limited_request(client, **kwargs):
await rate_limiter.wait_if_needed()
return await client.news.get_news(**kwargs)
Last updated
Was this helpful?