Python

CGPT SDK Docs

ChainGPT AI News Generator SDK Documentation

The ChainGPT AI News Generator provides access to curated blockchain and cryptocurrency news with advanced filtering capabilities. You can retrieve the latest news articles, filter by categories, tokens, and search terms, and implement pagination for large datasets.

Table of Contents

  1. Installation

  2. Quick Start

  3. Initialization and Setup

  4. NewsService API Reference

  5. Data Models Reference

  6. Usage Examples

  7. Error Handling

  8. Common Category and Token IDs

  9. Best Practices


Installation

Install the ChainGPT SDK via pip:

pip install chaingpt

Or add to your requirements.txt:

chaingpt>=1.1.3

For environment variable management (recommended):

pip install python-dotenv

Quick Start

import asyncio
import os
from chaingpt.client import ChainGPTClient

async def main():
    # Initialize client
    client = ChainGPTClient(api_key=os.getenv("CHAINGPT_API_KEY"))
    
    # Get latest news
    news = await client.news.get_news(limit=5)
    
    # Display results
    for article in news.data:
        print(f"{article.title} - {article.createdAt}")
    
    await client.close()

if __name__ == "__main__":
    asyncio.run(main())

Initialization and Setup

Environment Variables

Set up your API key as an environment variable for security:

export CHAINGPT_API_KEY="your_api_key_here"

Or use a .env file:

CHAINGPT_API_KEY=your_api_key_here

Basic Client Setup

import asyncio
import os
from chaingpt.client import ChainGPTClient
from chaingpt.exceptions import ChainGPTError, APIError, ValidationError
from dotenv import load_dotenv  # Optional: for .env file support

# Load environment variables
load_dotenv()

async def main():
    # Initialize the client
    api_key = os.getenv("CHAINGPT_API_KEY")
    if not api_key:
        raise ValueError("CHAINGPT_API_KEY environment variable is required")
    
    client = ChainGPTClient(api_key=api_key)
    
    try:
        # Access the news service
        news_service = client.news
        
        # Your code here
        
    finally:
        # Always close the client
        await client.close()

if __name__ == "__main__":
    asyncio.run(main())

NewsService API Reference

get_news() Method

Retrieves AI-generated news articles with comprehensive filtering and pagination capabilities.

Method Signature

async def get_news(
    self,
    category_id: Optional[Union[int, List[int]]] = None,
    sub_category_id: Optional[Union[int, List[int]]] = None,
    token_id: Optional[Union[int, List[int]]] = None,
    search_query: Optional[str] = None,
    fetch_after: Optional[str] = None,
    limit: Optional[int] = 10,
    offset: Optional[int] = 0,
    sort_by: Optional[str] = "createdAt",
) -> GetNewsResponseModel

Parameters

Parameter
Type
Default
Description

category_id

Optional[Union[int, List[int]]]

None

Filter by one or more category IDs. Examples: 8 (NFT), [8, 12] (multiple categories)

sub_category_id

Optional[Union[int, List[int]]]

None

Filter by one or more sub-category IDs. Examples: 15 (Ethereum), [15, 39] (multiple)

token_id

Optional[Union[int, List[int]]]

None

Filter by specific token IDs. Examples: 79 (Bitcoin), [79, 1027] (multiple tokens)

search_query

Optional[str]

None

Keyword search for title/description. Examples: "halving", "defi", "nft"

fetch_after

Optional[str]

None

Date filter in YYYY-MM-DD format. Only returns articles published after this date

limit

Optional[int]

10

Maximum articles to return (pagination). Range: 1-100

offset

Optional[int]

0

Number of articles to skip (pagination)

sort_by

Optional[str]

"createdAt"

Sort field. Currently supports "createdAt"

Return Value

Returns a GetNewsResponseModel object containing:

Field
Type
Description

statusCode

Optional[int]

HTTP status code (200 for success)

message

Optional[str]

Response message

data

List[NewsArticleModel]

Array of news articles

limit

Optional[int]

Applied limit parameter

offset

Optional[int]

Applied offset parameter

total

Optional[int]

Total number of articles available

Raises

  • ValidationError: Invalid parameters or malformed request

  • APIError: Server-side errors or invalid responses

  • AuthenticationError: Invalid API key

  • RateLimitError: Too many requests

  • ChainGPTError: Other SDK-related errors


Data Models Reference

NewsArticleModel

Represents a single news article with complete metadata.

class NewsArticleModel(BaseModel):
    # Basic article information
    id: int                           # Unique article identifier
    title: str                        # Article headline
    description: str                  # Article summary/description
    pubDate: str                      # Publication date (ISO format)
    author: str                       # Article author name
    imageUrl: str                     # Featured image URL
    createdAt: str                    # Creation timestamp
    updatedAt: str                    # Last update timestamp
    
    # Publishing information
    isPublished: bool                 # Publication status
    isFeatured: int                   # Featured article flag (0/1)
    isTopStory: int                   # Top story flag (0/1)
    viewsCount: int                   # Number of article views
    
    # Classification
    categoryId: Optional[int]         # Primary category ID
    subCategoryId: Optional[int]      # Sub-category ID
    tokenId: Optional[int]            # Related token/cryptocurrency ID
    
    # Relationships (populated objects)
    category: Optional[CategoryModel]     # Category details
    subCategory: Optional[SubCategoryModel]  # Sub-category details
    token: Optional[TokenModel]           # Token details
    media: Optional[MediaModel]           # Media/image details
    newsTags: List[Any]                   # Article tags
    
    # Internal fields
    userId: Optional[int]             # Author user ID
    mediaId: int                      # Media file ID
    deletedAt: Optional[str]          # Deletion timestamp (if soft-deleted)

CategoryModel

class CategoryModel(BaseModel):
    id: int                    # Category identifier
    name: str                  # Category name (e.g., "NFT", "DeFi")
    isBlockchain: bool         # Blockchain-related category flag
    isToken: bool              # Token-specific category flag
    createdAt: str             # Creation timestamp
    updatedAt: str             # Last update timestamp
    deletedAt: Optional[str]   # Deletion timestamp (if applicable)

SubCategoryModel

class SubCategoryModel(BaseModel):
    id: int                    # Sub-category identifier
    name: str                  # Sub-category name (e.g., "Ethereum", "Bitcoin")
    isBlockchain: bool         # Blockchain-related flag
    isToken: bool              # Token-specific flag
    createdAt: str             # Creation timestamp
    updatedAt: str             # Last update timestamp
    deletedAt: Optional[str]   # Deletion timestamp (if applicable)

TokenModel

class TokenModel(BaseModel):
    id: int                    # Token identifier
    name: str                  # Token name (e.g., "Bitcoin", "Ethereum")
    isBlockchain: bool         # Blockchain-related flag
    isToken: bool              # Token classification flag
    createdAt: str             # Creation timestamp
    updatedAt: str             # Last update timestamp
    deletedAt: Optional[str]   # Deletion timestamp (if applicable)

MediaModel

class MediaModel(BaseModel):
    id: int                         # Media file identifier
    fileName: str                   # Original filename
    fileDescriptor: str             # File description
    mimeType: str                   # MIME type (e.g., "image/jpeg")
    fileSize: int                   # File size in bytes
    createdAt: Optional[str]        # Creation timestamp
    updatedAt: Optional[str]        # Last update timestamp
    deletedAt: Optional[str]        # Deletion timestamp (if applicable)

GetNewsResponseModel

class GetNewsResponseModel(BaseModel):
    statusCode: Optional[int]       # HTTP status code
    message: Optional[str]          # Response message
    data: List[NewsArticleModel]    # Array of news articles
    limit: Optional[int]            # Applied limit parameter
    offset: Optional[int]           # Applied offset parameter
    total: Optional[int]            # Total articles available

Usage Examples

Basic News Retrieval

async def get_latest_news():
    client = ChainGPTClient(api_key=os.getenv("CHAINGPT_API_KEY"))
    
    try:
        # Get latest 5 articles
        news = await client.news.get_news(limit=5)
        
        print(f"Found {news.total} total articles")
        print(f"Showing {len(news.data)} articles:")
        
        for article in news.data:
            print(f"\n📰 {article.title}")
            print(f"   👤 {article.author}")
            print(f"   📅 {article.createdAt}")
            print(f"   👀 {article.viewsCount} views")
            print(f"   📝 {article.description[:100]}...")
            
            # Show category information if available
            if article.category:
                print(f"   🏷️ Category: {article.category.name}")
            if article.subCategory:
                print(f"   🏷️ Sub-category: {article.subCategory.name}")
            if article.token:
                print(f"   🪙 Token: {article.token.name}")
                
    except ChainGPTError as e:
        print(f"Error: {e}")
    finally:
        await client.close()

Filtering by Category and Sub-Category

async def get_nft_ethereum_news():
    client = ChainGPTClient(api_key=os.getenv("CHAINGPT_API_KEY"))
    
    try:
        # Get NFT news related to Ethereum
        news = await client.news.get_news(
            category_id=8,              # NFT category
            sub_category_id=[15, 39],   # Ethereum-related subcategories
            limit=10
        )
        
        print(f"Found {news.total} NFT/Ethereum articles:")
        
        for article in news.data:
            print(f"\n🎨 {article.title}")
            print(f"   📅 Published: {article.pubDate}")
            print(f"   🏷️ Category: {article.category.name if article.category else 'N/A'}")
            print(f"   🏷️ Sub-category: {article.subCategory.name if article.subCategory else 'N/A'}")
            
    except ChainGPTError as e:
        print(f"Error: {e}")
    finally:
        await client.close()

Search with Keywords

async def search_bitcoin_halving():
    client = ChainGPTClient(api_key=os.getenv("CHAINGPT_API_KEY"))
    
    try:
        # Search for Bitcoin halving news
        news = await client.news.get_news(
            search_query="halving",
            token_id=79,  # Bitcoin token ID
            limit=5
        )
        
        print(f"Found {news.total} Bitcoin halving articles:")
        
        for article in news.data:
            print(f"\n₿ {article.title}")
            print(f"   📅 {article.createdAt}")
            print(f"   🪙 Token: {article.token.name if article.token else 'N/A'}")
            print(f"   📝 {article.description}")
            
    except ChainGPTError as e:
        print(f"Error: {e}")
    finally:
        await client.close()

Date Range Filtering

async def get_recent_news():
    client = ChainGPTClient(api_key=os.getenv("CHAINGPT_API_KEY"))
    
    try:
        # Get news from the last month
        from datetime import datetime, timedelta
        
        # Calculate date 30 days ago
        thirty_days_ago = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d")
        
        news = await client.news.get_news(
            fetch_after=thirty_days_ago,
            limit=10,
            sort_by="createdAt"
        )
        
        print(f"Found {news.total} articles from the last 30 days:")
        
        for article in news.data:
            print(f"\n📰 {article.title}")
            print(f"   📅 {article.createdAt}")
            print(f"   👀 {article.viewsCount} views")
            
    except ChainGPTError as e:
        print(f"Error: {e}")
    finally:
        await client.close()

Pagination Implementation

async def paginate_news():
    client = ChainGPTClient(api_key=os.getenv("CHAINGPT_API_KEY"))
    
    try:
        page_size = 5
        current_page = 0
        
        while True:
            offset = current_page * page_size
            
            news = await client.news.get_news(
                limit=page_size,
                offset=offset
            )
            
            if not news.data:
                print("No more articles to display")
                break
                
            print(f"\n{'='*50}")
            print(f"Page {current_page + 1}")
            print(f"Showing articles {offset + 1}-{offset + len(news.data)} of {news.total}")
            print(f"{'='*50}")
            
            for i, article in enumerate(news.data, 1):
                print(f"\n{offset + i}. {article.title}")
                print(f"    📅 {article.createdAt}")
                print(f"    👀 {article.viewsCount} views")
            
            # User interaction
            user_input = input(f"\nShow next page? (y/n/q to quit): ").lower()
            if user_input in ['n', 'q', 'quit']:
                break
                
            current_page += 1
            
    except ChainGPTError as e:
        print(f"Error: {e}")
    finally:
        await client.close()

Complex Multi-Filter Query

async def complex_news_search():
    client = ChainGPTClient(api_key=os.getenv("CHAINGPT_API_KEY"))
    
    try:
        # Complex query: Multiple categories, tokens, and search terms
        news = await client.news.get_news(
            category_id=[8, 12],        # NFT and DeFi categories
            token_id=[79, 1027],        # Bitcoin and Ethereum
            search_query="market",      # Market-related news
            fetch_after="2024-01-01",   # This year only
            limit=15
        )
        
        print(f"Complex search results: {news.total} articles found")
        print("Filters applied:")
        print("  - Categories: NFT, DeFi")
        print("  - Tokens: Bitcoin, Ethereum")
        print("  - Search: 'market'")
        print("  - Date: After 2024-01-01")
        print(f"{'='*50}")
        
        # Group by category for better display
        by_category = {}
        for article in news.data:
            cat_name = article.category.name if article.category else "Uncategorized"
            if cat_name not in by_category:
                by_category[cat_name] = []
            by_category[cat_name].append(article)
        
        for category, articles in by_category.items():
            print(f"\n📂 {category} ({len(articles)} articles):")
            for article in articles:
                print(f"  • {article.title}")
                print(f"    🪙 {article.token.name if article.token else 'N/A'}")
                print(f"    👀 {article.viewsCount} views")
                
    except ChainGPTError as e:
        print(f"Error: {e}")
    finally:
        await client.close()

Error Handling

Exception Types

The SDK provides specific exception types for different error scenarios:

from chaingpt.exceptions import (
    ChainGPTError,          # Base exception
    APIError,               # API response errors
    ValidationError,        # Request validation errors
    AuthenticationError,    # Authentication failures
    RateLimitError,         # Rate limit exceeded
    InsufficientCreditsError,  # Insufficient account credits
    NotFoundError,          # Endpoint not found
    ServerError,            # Server-side errors
    TimeoutError,           # Request timeout
    ConfigurationError      # SDK configuration errors
)

Comprehensive Error Handling

async def robust_news_fetching():
    client = ChainGPTClient(api_key=os.getenv("CHAINGPT_API_KEY"))
    
    try:
        news = await client.news.get_news(
            category_id=8,
            limit=10
        )
        
        # Check response status
        if news.statusCode != 200:
            print(f"API returned status {news.statusCode}: {news.message}")
            return
            
        # Process successful response
        for article in news.data:
            print(f"📰 {article.title}")
            
    except ValidationError as e:
        print(f"❌ Validation Error: {e.message}")
        if hasattr(e, 'field') and e.field:
            print(f"   Field: {e.field}")
        # Log details for debugging
        if e.details:
            print(f"   Details: {e.details}")
            
    except AuthenticationError as e:
        print(f"🔐 Authentication Error: {e.message}")
        print("   Please check your API key")
        
    except RateLimitError as e:
        print(f"⏰ Rate Limit Error: {e.message}")
        if hasattr(e, 'retry_after') and e.retry_after:
            print(f"   Retry after: {e.retry_after} seconds")
            
    except InsufficientCreditsError as e:
        print(f"💳 Insufficient Credits: {e.message}")
        print("   Please check your account balance")
        
    except APIError as e:
        print(f"🌐 API Error: {e.message}")
        if e.status_code:
            print(f"   Status Code: {e.status_code}")
        if e.details:
            print(f"   Details: {e.details}")
            
    except TimeoutError as e:
        print(f"⏱️ Timeout Error: {e.message}")
        print("   Try again or increase timeout settings")
        
    except ChainGPTError as e:
        print(f"🔧 SDK Error: {e.message}")
        if e.details:
            print(f"   Details: {e.details}")
            
    except Exception as e:
        print(f"💥 Unexpected Error: {str(e)}")
        print("   Please report this issue")
        
    finally:
        await client.close()

Error Handling with Retry Logic

import asyncio
from typing import Optional

async def fetch_news_with_retry(
    client: ChainGPTClient,
    max_retries: int = 3,
    delay: float = 1.0
) -> Optional[GetNewsResponseModel]:
    """Fetch news with automatic retry on certain errors."""
    
    for attempt in range(max_retries):
        try:
            news = await client.news.get_news(limit=10)
            return news
            
        except RateLimitError as e:
            if attempt == max_retries - 1:
                raise
            
            # Wait for the retry-after period if specified
            wait_time = e.retry_after if e.retry_after else delay * (2 ** attempt)
            print(f"Rate limited. Waiting {wait_time} seconds before retry {attempt + 1}/{max_retries}")
            await asyncio.sleep(wait_time)
            
        except (TimeoutError, ServerError) as e:
            if attempt == max_retries - 1:
                raise
                
            wait_time = delay * (2 ** attempt)  # Exponential backoff
            print(f"Temporary error: {e.message}. Retrying in {wait_time} seconds...")
            await asyncio.sleep(wait_time)
            
        except (AuthenticationError, ValidationError, InsufficientCreditsError):
            # Don't retry on these errors
            raise
            
    return None

Common Category and Token IDs

Categories

ID
Name
Description

8

NFT

Non-Fungible Token related news

12

DeFi

Decentralized Finance news

ID
Name
Description

15

Ethereum

Ethereum blockchain news

39

Ethereum

Alternative Ethereum category

Tokens

ID
Name
Symbol
Description

79

Bitcoin

BTC

Bitcoin cryptocurrency

1027

Ethereum

ETH

Ethereum cryptocurrency

Note: These IDs are examples and may change. Use the API responses to discover current category, sub-category, and token IDs. Consider caching this information for better performance.

Discovering Available IDs

async def discover_categories():
    """Example function to discover available categories from API responses."""
    client = ChainGPTClient(api_key=os.getenv("CHAINGPT_API_KEY"))
    
    try:
        # Fetch a large sample of news to discover categories
        news = await client.news.get_news(limit=100)
        
        categories = {}
        sub_categories = {}
        tokens = {}
        
        for article in news.data:
            # Collect category information
            if article.category:
                categories[article.category.id] = article.category.name
            
            # Collect sub-category information
            if article.subCategory:
                sub_categories[article.subCategory.id] = article.subCategory.name
                
            # Collect token information
            if article.token:
                tokens[article.token.id] = article.token.name
        
        print("Available Categories:")
        for cat_id, cat_name in sorted(categories.items()):
            print(f"  {cat_id}: {cat_name}")
            
        print("\nAvailable Sub-Categories:")
        for sub_id, sub_name in sorted(sub_categories.items()):
            print(f"  {sub_id}: {sub_name}")
            
        print("\nAvailable Tokens:")
        for token_id, token_name in sorted(tokens.items()):
            print(f"  {token_id}: {token_name}")
            
    except ChainGPTError as e:
        print(f"Error discovering categories: {e}")
    finally:
        await client.close()

Best Practices

1. Efficient API Usage

# ✅ Good: Use specific filters to reduce response size
news = await client.news.get_news(
    category_id=8,      # Specific category
    limit=10,           # Reasonable limit
    fetch_after="2024-01-01"  # Recent news only
)

# ❌ Avoid: Fetching all news without filters
news = await client.news.get_news(limit=1000)  # Too large, may timeout

2. Proper Pagination

# ✅ Good: Implement proper pagination
async def get_all_news(client, category_id):
    all_articles = []
    offset = 0
    page_size = 50
    
    while True:
        news = await client.news.get_news(
            category_id=category_id,
            limit=page_size,
            offset=offset
        )
        
        if not news.data:
            break
            
        all_articles.extend(news.data)
        offset += page_size
        
        # Respect rate limits
        await asyncio.sleep(0.1)
    
    return all_articles

3. Error Handling and Logging

import logging

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

async def fetch_news_safely():
    try:
        news = await client.news.get_news(limit=10)
        logger.info(f"Successfully fetched {len(news.data)} articles")
        return news
        
    except ChainGPTError as e:
        logger.error(f"ChainGPT API error: {e.message}")
        if e.details:
            logger.debug(f"Error details: {e.details}")
        raise

4. Resource Management

# ✅ Good: Always close the client
async def process_news():
    client = ChainGPTClient(api_key=os.getenv("CHAINGPT_API_KEY"))
    try:
        # Your news processing code
        news = await client.news.get_news()
        return news
    finally:
        await client.close()

# ✅ Better: Use async context manager if available
async def process_news_with_context():
    async with ChainGPTClient(api_key=os.getenv("CHAINGPT_API_KEY")) as client:
        news = await client.news.get_news()
        return news

5. Caching for Performance

import asyncio
from datetime import datetime, timedelta
from typing import Dict, Optional

class NewsCache:
    def __init__(self, ttl_minutes: int = 5):
        self.cache: Dict[str, tuple] = {}
        self.ttl = timedelta(minutes=ttl_minutes)
    
    def _make_key(self, **kwargs) -> str:
        return str(sorted(kwargs.items()))
    
    def get(self, **kwargs) -> Optional[GetNewsResponseModel]:
        key = self._make_key(**kwargs)
        if key in self.cache:
            data, timestamp = self.cache[key]
            if datetime.now() - timestamp < self.ttl:
                return data
            else:
                del self.cache[key]
        return None
    
    def set(self, data: GetNewsResponseModel, **kwargs):
        key = self._make_key(**kwargs)
        self.cache[key] = (data, datetime.now())

# Usage
cache = NewsCache(ttl_minutes=10)

async def get_cached_news(client, **kwargs):
    # Try cache first
    cached = cache.get(**kwargs)
    if cached:
        return cached
    
    # Fetch from API
    news = await client.news.get_news(**kwargs)
    cache.set(news, **kwargs)
    return news

6. Rate Limiting

import asyncio
from datetime import datetime, timedelta

class RateLimiter:
    def __init__(self, calls_per_minute: int = 60):
        self.calls_per_minute = calls_per_minute
        self.calls = []
    
    async def wait_if_needed(self):
        now = datetime.now()
        # Remove calls older than 1 minute
        self.calls = [call_time for call_time in self.calls 
                     if now - call_time < timedelta(minutes=1)]
        
        if len(self.calls) >= self.calls_per_minute:
            # Wait until the oldest call is more than 1 minute old
            sleep_time = 60 - (now - self.calls[0]).total_seconds()
            if sleep_time > 0:
                await asyncio.sleep(sleep_time)
        
        self.calls.append(now)

# Usage
rate_limiter = RateLimiter(calls_per_minute=30)

async def rate_limited_request(client, **kwargs):
    await rate_limiter.wait_if_needed()
    return await client.news.get_news(**kwargs)

Last updated

Was this helpful?