🚀 Quick Start
Installation
Copy
pip install snake-query-sdk
Basic Usage
Copy
from snake_query_sdk import SnakeQuery, SchemaBuilder
import os
# Initialize client with your API key
client = SnakeQuery(api_key=os.environ.get('SNAKE_QUERY_API_KEY'))
# Define the expected response structure
product_schema = SchemaBuilder.create() \
.array(
SchemaBuilder.create() \
.object() \
.add_string_property('name') \
.add_number_property('price', minimum=0) \
.required(['name', 'price']) \
.build()
) \
.build()
# Make the query
result = client.query(
query='Find products under $100',
fetch_url='https://api.store.com/products',
response_schema=product_schema
)
print(result['response']) # Structured data
print(result['usageCount']) # Token usage info
🎯 What is SnakeQuery?
SnakeQuery transforms natural language into structured data queries using AI. Instead of writing complex data operations, simply describe what you want in plain English. Example:Copy
# Instead of complex Python:
result = sorted(
[
{'name': item['title'], 'price': item['price'], 'rating': item['rating']}
for item in data
if item['price'] < 500 and item['category'] == 'electronics'
],
key=lambda x: x['rating'],
reverse=True
)[:5]
# Use natural language:
result = client.query(
query='Find top 5 electronics under $500, show name, price and rating, sort by rating',
data=products
)
✨ Features
- 🧠 Natural Language Processing: Write queries in plain English
- 🏗️ Schema-Driven: Type-safe, validated responses
- 🌐 Multiple Data Sources: Query lists, dictionaries, or REST APIs
- ⚡ High Performance: Optimized for production use
- 🔒 Secure: Built-in authentication and error handling
🔐 Authentication
Get your API key from the SnakeQuery Dashboard:Copy
from snake_query_sdk import SnakeQuery
client = SnakeQuery(api_key='1234567890abcdef...')
Environment Variables (Recommended)
Copy
export SNAKE_QUERY_API_KEY="1234567890abcdef..."
Copy
import os
client = SnakeQuery(api_key=os.environ.get('SNAKE_QUERY_API_KEY'))
📚 API Reference
SnakeQuery Class
Constructor
Copy
SnakeQuery(api_key: str)
Methods
query()
Main method for all query operations:
Copy
def query(
self,
query: str,
data: Any = None,
fetch_url: str = None,
response_schema: dict = None
) -> dict:
query
(str): Natural language querydata
(Any, optional): Direct data to queryfetch_url
(str, optional): URL to fetch data fromresponse_schema
(dict, optional): Response structure schema
Copy
{
'usageCount': {
'inputTokens': int,
'outputTokens': int,
'totalTokens': int
},
'response': Any # Your structured data
}
🏗️ Schema Building
SchemaBuilder Class
Create structured response schemas using fluent API:Basic Types
Copy
from snake_query_sdk import SchemaBuilder
# String field
string_schema = SchemaBuilder.create() \
.string(min_length=1, max_length=100) \
.build()
# Number field
number_schema = SchemaBuilder.create() \
.number(minimum=0, maximum=1000) \
.build()
# Boolean field
boolean_schema = SchemaBuilder.create() \
.boolean() \
.build()
Objects
Copy
user_schema = SchemaBuilder.create() \
.object() \
.add_string_property('name') \
.add_number_property('age', minimum=0) \
.add_string_property('email') \
.required(['name', 'age']) \
.build()
Arrays
Copy
# Array of objects
products_schema = SchemaBuilder.create() \
.array(
SchemaBuilder.create() \
.object() \
.add_string_property('title') \
.add_number_property('price', minimum=0) \
.required(['title', 'price']) \
.build()
) \
.build()
# Array of simple types
tags_schema = SchemaBuilder.create() \
.array_of_strings() \
.build()
Complex Nested Structures
Copy
order_schema = SchemaBuilder.create() \
.object() \
.add_string_property('orderId') \
.add_object_property('customer', {
'name': {'type': 'string'},
'email': {'type': 'string'}
}) \
.add_array_property('items', {
'type': 'object',
'properties': {
'product': {'type': 'string'},
'quantity': {'type': 'number', 'minimum': 1},
'price': {'type': 'number', 'minimum': 0}
}
}) \
.required(['orderId', 'customer', 'items']) \
.build()
💻 Examples
Your First Query
Copy
from snake_query_sdk import SnakeQuery
import os
def first_query():
# Initialize client
client = SnakeQuery(api_key=os.environ.get('SNAKE_QUERY_API_KEY'))
# Sample data
products = [
{'name': 'iPhone 14', 'price': 999, 'category': 'electronics'},
{'name': 'Nike Shoes', 'price': 129, 'category': 'fashion'},
{'name': 'Coffee Mug', 'price': 19, 'category': 'home'}
]
try:
result = client.query(
query='Find products under $200',
data=products
)
print('Results:', result['response'])
print('Tokens used:', result['usageCount']['totalTokens'])
except Exception as error:
print('Error:', str(error))
first_query()
Query Direct Data with Schema
Copy
from snake_query_sdk import SnakeQuery, SchemaBuilder
import os
def data_query_demo():
client = SnakeQuery(api_key=os.environ.get('SNAKE_QUERY_API_KEY'))
products = [
{'name': 'iPhone 14', 'price': 999, 'category': 'electronics', 'brand': 'Apple', 'stock': 50},
{'name': 'Samsung TV', 'price': 1299, 'category': 'electronics', 'brand': 'Samsung', 'stock': 25},
{'name': 'Nike Shoes', 'price': 129, 'category': 'fashion', 'brand': 'Nike', 'stock': 100},
{'name': 'MacBook Pro', 'price': 2399, 'category': 'electronics', 'brand': 'Apple', 'stock': 15}
]
# Define response schema
product_schema = SchemaBuilder.create() \
.array(
SchemaBuilder.create() \
.object() \
.add_string_property('productName') \
.add_number_property('price', minimum=0) \
.add_string_property('brand') \
.add_string_property('category') \
.required(['productName', 'price', 'brand']) \
.build()
) \
.build()
result = client.query(
query='Find all products that cost less than $500 and show their names, prices, brands and categories',
data=products,
response_schema=product_schema
)
print(f"Found {len(result['response'])} products under $500")
print('Results:', result['response'])
data_query_demo()
Query External API
Copy
from snake_query_sdk import SnakeQuery, SchemaBuilder
import os
def url_query_demo():
client = SnakeQuery(api_key=os.environ.get('SNAKE_QUERY_API_KEY'))
# Define response schema
product_schema = SchemaBuilder.create() \
.array(
SchemaBuilder.create() \
.object() \
.add_string_property('productTitle') \
.add_number_property('price', minimum=0) \
.add_string_property('categoryName') \
.required(['productTitle', 'price', 'categoryName']) \
.build()
) \
.build()
result = client.query(
query='Show me products that cost less than $100, include title, price and category name',
fetch_url='https://api.escuelajs.co/api/v1/products',
response_schema=product_schema
)
print(f"Found {len(result['response'])} products under $100")
print('Results:', result['response'])
url_query_demo()
Data Analysis Example
Copy
import pandas as pd
from snake_query_sdk import SnakeQuery, SchemaBuilder
def analyze_sales_data():
client = SnakeQuery(api_key=os.environ.get('SNAKE_QUERY_API_KEY'))
# Sample sales data
sales_data = [
{'date': '2024-01-01', 'amount': 1500, 'rep': 'John', 'region': 'North'},
{'date': '2024-01-02', 'amount': 2200, 'rep': 'Jane', 'region': 'South'},
{'date': '2024-01-03', 'amount': 800, 'rep': 'John', 'region': 'North'},
{'date': '2024-01-04', 'amount': 1800, 'rep': 'Mike', 'region': 'West'}
]
# Define analytics schema
analytics_schema = SchemaBuilder.create() \
.object() \
.add_array_property('salesByRep', {
'type': 'object',
'properties': {
'rep': {'type': 'string'},
'totalSales': {'type': 'number'},
'avgSale': {'type': 'number'}
}
}) \
.add_array_property('salesByRegion', {
'type': 'object',
'properties': {
'region': {'type': 'string'},
'totalSales': {'type': 'number'}
}
}) \
.required(['salesByRep', 'salesByRegion']) \
.build()
result = client.query(
query='Calculate total sales and average sale by rep, and total sales by region',
data=sales_data,
response_schema=analytics_schema
)
# Convert to DataFrame for further analysis
sales_by_rep = pd.DataFrame(result['response']['salesByRep'])
sales_by_region = pd.DataFrame(result['response']['salesByRegion'])
print("Sales by Representative:")
print(sales_by_rep)
print("\nSales by Region:")
print(sales_by_region)
🎯 Common Use Cases
1. Data Science & Analytics
Copy
# Analyze customer data
result = client.query(
query='Find customer segments with highest lifetime value and their characteristics',
data=customer_data,
response_schema=customer_segments_schema
)
# Process survey responses
result = client.query(
query='Categorize feedback by sentiment and extract key themes',
data=survey_responses,
response_schema=sentiment_analysis_schema
)
2. E-commerce Applications
Copy
# Product recommendations
result = client.query(
query='Find similar products based on user preferences and purchase history',
data=product_catalog,
response_schema=recommendations_schema
)
# Inventory analysis
result = client.query(
query='Identify products with low stock that need reordering',
fetch_url='https://api.store.com/inventory',
response_schema=inventory_alert_schema
)
3. Financial Analysis
Copy
# Portfolio analysis
result = client.query(
query='Calculate risk metrics and performance indicators for each asset',
data=portfolio_data,
response_schema=risk_metrics_schema
)
# Expense categorization
result = client.query(
query='Categorize expenses and identify cost-saving opportunities',
data=expense_data,
response_schema=expense_analysis_schema
)
⚠️ Error Handling
Using Exception Handling
Copy
from snake_query_sdk import SnakeQueryError
try:
result = client.query(
query='Analyze sales data',
fetch_url='https://api.example.com/sales'
)
except SnakeQueryError as e:
if e.status == 401:
print('Invalid API key')
elif e.status == 402:
print('Insufficient credits')
elif e.status == 500:
print('Server error:', e.message)
elif e.status == 504:
print('Request timeout - try a simpler query')
else:
print(f'Unknown error: {e.message}')
except Exception as e:
print(f'Network or other error: {str(e)}')
Robust Error Handling
Copy
def robust_query(client, query_options, max_retries=3):
for attempt in range(max_retries):
try:
return client.query(**query_options)
except SnakeQueryError as e:
if e.status in [500, 503, 504] and attempt < max_retries - 1:
print(f'Retry {attempt + 1}/{max_retries}...')
time.sleep(2 ** attempt) # Exponential backoff
continue
raise e
except Exception as e:
if 'timeout' in str(e).lower() and attempt < max_retries - 1:
print(f'Network timeout, retry {attempt + 1}/{max_retries}...')
time.sleep(2 ** attempt)
continue
raise e
✨ Best Practices
1. Always Use Schemas for Production
Copy
# ❌ Don't rely on free-form responses
result = client.query(
query='Get user data',
data=users
)
# ✅ Use structured schemas
result = client.query(
query='Get user data',
data=users,
response_schema=user_schema
)
2. Environment Variables for API Keys
Copy
# ❌ Don't hardcode API keys
client = SnakeQuery(api_key='1234567890abcdef...')
# ✅ Use environment variables
import os
client = SnakeQuery(api_key=os.environ.get('SNAKE_QUERY_API_KEY'))
3. Handle Errors Gracefully
Copy
# ✅ Always use try-catch blocks
try:
result = client.query(**options)
return result['response']
except SnakeQueryError as error:
print(f'Query failed: {error.message}')
return None # or default value
4. Optimize for Performance
Copy
# ✅ Be specific in your queries
result = client.query(
query='Find the 5 most recent completed orders',
fetch_url=api_url,
response_schema=order_schema
)
# ❌ Avoid overly broad queries
result = client.query(
query='Give me everything about all orders',
fetch_url=api_url
)
🐍 Python-Specific Features
Integration with Pandas
Copy
import pandas as pd
from snake_query_sdk import SnakeQuery
def query_to_dataframe(client, query, data=None, fetch_url=None):
result = client.query(
query=query,
data=data,
fetch_url=fetch_url
)
# Convert response to DataFrame
if isinstance(result['response'], list):
return pd.DataFrame(result['response'])
else:
return pd.DataFrame([result['response']])
# Usage
df = query_to_dataframe(
client,
'Find products by category with average prices',
data=products
)
print(df.head())
Working with NumPy Arrays
Copy
import numpy as np
def query_numpy_data(client, numpy_array, query):
# Convert numpy array to list for querying
data_list = numpy_array.tolist() if hasattr(numpy_array, 'tolist') else numpy_array
result = client.query(
query=query,
data=data_list
)
return result['response']
Async Support (Future Feature)
Copy
# Coming soon: async support for high-performance applications
import asyncio
from snake_query_sdk import AsyncSnakeQuery
async def async_query_example():
client = AsyncSnakeQuery(api_key=os.environ.get('SNAKE_QUERY_API_KEY'))
result = await client.query(
query='Analyze data asynchronously',
data=large_dataset
)
return result['response']
📊 Response Format
All successful queries return a dictionary:Copy
{
'usageCount': {
'inputTokens': 150,
'outputTokens': 75,
'totalTokens': 225
},
'response': [
# Your structured data here
{
'productName': 'iPhone 14',
'price': 999,
'category': 'Electronics'
}
]
}
🚀 Advanced Examples
Run the included examples:Copy
# Set your API key
export SNAKE_QUERY_API_KEY="your-key-here"
# Run data query demo
python -c "
from examples.data_query_demo import data_query_demo
data_query_demo()
"
# Run URL query demo
python -c "
from examples.url_query_demo import url_query_demo
url_query_demo()
"
📝 Requirements
- Python 3.7+
requests
library- Valid SnakeQuery API key
🎉 Next Steps
- Learn about Schema Building patterns
- Explore Fetch API for browser applications
- Check out Node.js SDK for server-side JavaScript
🔗 Additional Resources
- PyPI Package
- SnakeQuery Dashboard
- GitHub Repository
- GitHub Discussions - Ask questions and share ideas