Connecting AI with Databases: A Complete Guide
Kartik Kalia

Integrating AI with databases opens up powerful possibilities for data analysis, query generation, and intelligent data retrieval. This guide explores two primary approaches and provides practical implementation examples.
Two Main Approaches
Demo: AI Database Integration in Action
The AI Explorer interface demonstrates both approaches: direct data attachment for quick queries and schema-based query generation for complex database operations.
Approach 1: Direct Data Attachment
When working with small datasets, you can attach the actual data and schema directly to your AI prompt.
Best for:
- Small datasets (< 1000 rows)
- Simple queries
- Quick prototyping
- When data privacy allows direct sharing
Example:
const OpenAI = require('openai');
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY
});
async function queryWithData() {
const response = await openai.chat.completions.create({
model: "gpt-4",
messages: [
{
role: "user",
content: `Based on this data:
Schema: users(id, name, email, age, department)
Data:
1, John Doe, john@email.com, 30, Engineering
2, Jane Smith, jane@email.com, 25, Marketing
3, Bob Johnson, bob@email.com, 35, Engineering
Question: How many engineers are over 30?`
}
]
});
return response.choices[0].message.content;
}
import openai
openai.api_key = "your-api-key"
def query_with_data():
response = openai.chat.completions.create(
model="gpt-4",
messages=[
{
"role": "user",
"content": """Based on this data:
Schema: users(id, name, email, age, department)
Data:
1, John Doe, john@email.com, 30, Engineering
2, Jane Smith, jane@email.com, 25, Marketing
3, Bob Johnson, bob@email.com, 35, Engineering
Question: How many engineers are over 30?"""
}
]
)
return response.choices[0].message.content
curl https://api.openai.com/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $OPENAI_API_KEY" \
-d '{
"model": "gpt-4",
"messages": [
{
"role": "user",
"content": "Based on this data:\n\nSchema: users(id, name, email, age, department)\nData: \n1, John Doe, john@email.com, 30, Engineering\n2, Jane Smith, jane@email.com, 25, Marketing\n3, Bob Johnson, bob@email.com, 35, Engineering\n\nQuestion: How many engineers are over 30?"
}
]
}'
import okhttp3.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
public class OpenAIQuery {
private static final String API_KEY = System.getenv("OPENAI_API_KEY");
private static final String API_URL = "https://api.openai.com/v1/chat/completions";
public String queryWithData() throws Exception {
OkHttpClient client = new OkHttpClient();
ObjectMapper mapper = new ObjectMapper();
ObjectNode requestBody = mapper.createObjectNode();
requestBody.put("model", "gpt-4");
ArrayNode messages = requestBody.putArray("messages");
ObjectNode message = messages.addObject();
message.put("role", "user");
message.put("content",
"Based on this data:\n\n" +
"Schema: users(id, name, email, age, department)\n" +
"Data: \n" +
"1, John Doe, john@email.com, 30, Engineering\n" +
"2, Jane Smith, jane@email.com, 25, Marketing\n" +
"3, Bob Johnson, bob@email.com, 35, Engineering\n\n" +
"Question: How many engineers are over 30?"
);
RequestBody body = RequestBody.create(
mapper.writeValueAsString(requestBody),
MediaType.get("application/json")
);
Request request = new Request.Builder()
.url(API_URL)
.header("Authorization", "Bearer " + API_KEY)
.post(body)
.build();
Response response = client.newCall(request).execute();
return response.body().string();
}
}
Approach 2: Schema-Based Query Generation
For larger datasets, provide only the schema and let AI generate appropriate database queries.
Best for:
- Large datasets
- Complex database structures
- Production environments
- When data security is paramount
Example:
const OpenAI = require('openai');
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY
});
async function generateSQLQuery() {
const response = await openai.chat.completions.create({
model: "gpt-4",
messages: [
{
role: "user",
content: `Database Schema:
TABLE users (
id INTEGER PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100),
age INTEGER,
department VARCHAR(50)
);
Generate a SQL query to find all engineers over 30 years old.`
}
]
});
return response.choices[0].message.content;
}
import openai
openai.api_key = "your-api-key"
def generate_sql_query():
response = openai.chat.completions.create(
model="gpt-4",
messages=[
{
"role": "user",
"content": """Database Schema:
TABLE users (
id INTEGER PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100),
age INTEGER,
department VARCHAR(50)
);
Generate a SQL query to find all engineers over 30 years old."""
}
]
)
return response.choices[0].message.content
curl https://api.openai.com/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $OPENAI_API_KEY" \
-d '{
"model": "gpt-4",
"messages": [
{
"role": "user",
"content": "Database Schema:\\nTABLE users (\\n id INTEGER PRIMARY KEY,\\n name VARCHAR(100),\\n email VARCHAR(100),\\n age INTEGER,\\n department VARCHAR(50)\\n);\\n\\nGenerate a SQL query to find all engineers over 30 years old."
}
]
}'
import okhttp3.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
public class SQLGenerator {
private static final String API_KEY = System.getenv("OPENAI_API_KEY");
private static final String API_URL = "https://api.openai.com/v1/chat/completions";
public String generateSQLQuery(String schema, String question) throws Exception {
OkHttpClient client = new OkHttpClient();
ObjectMapper mapper = new ObjectMapper();
ObjectNode requestBody = mapper.createObjectNode();
requestBody.put("model", "gpt-4");
ArrayNode messages = requestBody.putArray("messages");
ObjectNode systemMessage = messages.addObject();
systemMessage.put("role", "system");
systemMessage.put("content", "You are a SQL expert. Generate accurate SQL queries based on the schema provided.");
ObjectNode userMessage = messages.addObject();
userMessage.put("role", "user");
userMessage.put("content", "Schema: " + schema + "\n\nGenerate SQL for: " + question);
RequestBody body = RequestBody.create(
mapper.writeValueAsString(requestBody),
MediaType.get("application/json")
);
Request request = new Request.Builder()
.url(API_URL)
.header("Authorization", "Bearer " + API_KEY)
.post(body)
.build();
Response response = client.newCall(request).execute();
return response.body().string();
}
// Usage
public void example() throws Exception {
String schema = """
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100),
age INTEGER,
department VARCHAR(50)
);
""";
String sqlQuery = generateSQLQuery(schema, "Find average age by department");
}
}
Enhanced Schema-Based Query Generation with Response Types
For production applications, you can enhance the schema-based approach to return structured responses with different visualization types based on the user’s prompt.
Response Types:
transaction_list
- For detailed transaction recordscharts
- For data visualization (bar charts, pie charts, line graphs)summary
- For aggregated statistics and insights
Structured Response Format:
import openai
import json
from typing import Dict, List, Optional
from enum import Enum
class ResponseType(Enum):
TRANSACTION_LIST = "transaction_list"
CHARTS = "charts"
SUMMARY = "summary"
class EnhancedDatabaseQuery:
def __init__(self, api_key: str):
openai.api_key = api_key
def generate_structured_query(self, schema: str, question: str) -> Dict:
"""Generate SQL query with structured response format"""
# Enhanced system prompt for response type detection
system_prompt = """
You are a database expert. Analyze the user's question and generate:
1. A valid SQL query
2. Determine the appropriate response type (transaction_list, charts, or summary)
3. Provide a short description
4. Identify relevant fields for the response
Response types:
- transaction_list: For detailed records/transactions
- charts: For data visualization (bar, pie, line charts)
- summary: For aggregated statistics and insights
Return your response as a JSON object with this structure:
{
"sql_query": "SELECT ...",
"response_type": "transaction_list|charts|summary",
"short_description": "Brief description of what this query returns",
"api_data": {
"type": "transaction_list|charts|summary",
"description": "Detailed description",
"data": {
"sql": "SELECT ...",
"fields": ["field1", "field2", ...]
}
}
}
"""
response = openai.chat.completions.create(
model="gpt-4",
messages=[
{
"role": "system",
"content": system_prompt
},
{
"role": "user",
"content": f"Database Schema:\n{schema}\n\nQuestion: {question}"
}
],
response_format={"type": "json_object"}
)
try:
result = json.loads(response.choices[0].message.content)
return result
except json.JSONDecodeError:
# Fallback to text response if JSON parsing fails
return {
"sql_query": response.choices[0].message.content,
"response_type": "transaction_list",
"short_description": "Query results",
"api_data": {
"type": "transaction_list",
"description": "Query results",
"data": {
"sql": response.choices[0].message.content,
"fields": []
}
}
}
def execute_and_format_response(self, schema: str, question: str, db_connection) -> Dict:
"""Execute query and return formatted response"""
# Generate structured query
query_result = self.generate_structured_query(schema, question)
try:
# Execute the SQL query
cursor = db_connection.cursor()
cursor.execute(query_result["sql_query"])
# Get column names
columns = [description[0] for description in cursor.description]
rows = cursor.fetchall()
# Convert to list of dictionaries
data = [dict(zip(columns, row)) for row in rows]
# Update the response with actual data
query_result["api_data"]["data"]["fields"] = columns
query_result["api_data"]["data"]["results"] = data
return query_result
except Exception as e:
return {
"error": str(e),
"sql_query": query_result.get("sql_query", ""),
"response_type": query_result.get("response_type", "transaction_list"),
"short_description": "Query failed to execute",
"api_data": query_result.get("api_data", {})
}
# Usage Example
if __name__ == "__main__":
db_query = EnhancedDatabaseQuery("your-api-key")
schema = """
CREATE TABLE transaction_details (
id VARCHAR(50) PRIMARY KEY,
user_id VARCHAR(50),
account_name VARCHAR(100),
amount DECIMAL(10,2),
merchant_name VARCHAR(100),
date DATE,
category VARCHAR(50)
);
"""
# Example 1: Transaction list
result1 = db_query.generate_structured_query(
schema,
"Show me all transactions over €50"
)
print("Transaction List Response:", json.dumps(result1, indent=2))
# Example 2: Chart data
result2 = db_query.generate_structured_query(
schema,
"Show spending by category as a pie chart"
)
print("Chart Response:", json.dumps(result2, indent=2))
# Example 3: Summary
result3 = db_query.generate_structured_query(
schema,
"Give me a summary of total spending this month"
)
print("Summary Response:", json.dumps(result3, indent=2))
const OpenAI = require('openai');
class EnhancedDatabaseQuery {
constructor(apiKey) {
this.openai = new OpenAI({ apiKey });
}
async generateStructuredQuery(schema, question) {
const systemPrompt = `
You are a database expert. Analyze the user's question and generate:
1. A valid SQL query
2. Determine the appropriate response type (transaction_list, charts, or summary)
3. Provide a short description
4. Identify relevant fields for the response
Response types:
- transaction_list: For detailed records/transactions
- charts: For data visualization (bar, pie, line charts)
- summary: For aggregated statistics and insights
Return your response as a JSON object with this structure:
{
"sql_query": "SELECT ...",
"response_type": "transaction_list|charts|summary",
"short_description": "Brief description of what this query returns",
"api_data": {
"type": "transaction_list|charts|summary",
"description": "Detailed description",
"data": {
"sql": "SELECT ...",
"fields": ["field1", "field2", ...]
}
}
}
`;
const response = await this.openai.chat.completions.create({
model: "gpt-4",
messages: [
{
role: "system",
content: systemPrompt
},
{
role: "user",
content: `Database Schema:\n${schema}\n\nQuestion: ${question}`
}
],
response_format: { type: "json_object" }
});
try {
return JSON.parse(response.choices[0].message.content);
} catch (error) {
// Fallback to text response if JSON parsing fails
return {
sql_query: response.choices[0].message.content,
response_type: "transaction_list",
short_description: "Query results",
api_data: {
type: "transaction_list",
description: "Query results",
data: {
sql: response.choices[0].message.content,
fields: []
}
}
};
}
}
async executeAndFormatResponse(schema, question, dbConnection) {
// Generate structured query
const queryResult = await this.generateStructuredQuery(schema, question);
try {
// Execute the SQL query (using your preferred database library)
const results = await this.executeQuery(dbConnection, queryResult.sql_query);
// Update the response with actual data
queryResult.api_data.data.fields = Object.keys(results[0] || {});
queryResult.api_data.data.results = results;
return queryResult;
} catch (error) {
return {
error: error.message,
sql_query: queryResult.sql_query,
response_type: queryResult.response_type,
short_description: "Query failed to execute",
api_data: queryResult.api_data
};
}
}
async executeQuery(dbConnection, sqlQuery) {
// Implement based on your database library
// Example for SQLite:
return new Promise((resolve, reject) => {
dbConnection.all(sqlQuery, (err, rows) => {
if (err) reject(err);
else resolve(rows);
});
});
}
}
// Usage Example
async function main() {
const dbQuery = new EnhancedDatabaseQuery(process.env.OPENAI_API_KEY);
const schema = `
CREATE TABLE transaction_details (
id VARCHAR(50) PRIMARY KEY,
user_id VARCHAR(50),
account_name VARCHAR(100),
amount DECIMAL(10,2),
merchant_name VARCHAR(100),
date DATE,
category VARCHAR(50)
);
`;
// Example 1: Transaction list
const result1 = await dbQuery.generateStructuredQuery(
schema,
"Show me all transactions over €50"
);
console.log("Transaction List Response:", JSON.stringify(result1, null, 2));
// Example 2: Chart data
const result2 = await dbQuery.generateStructuredQuery(
schema,
"Show spending by category as a pie chart"
);
console.log("Chart Response:", JSON.stringify(result2, null, 2));
// Example 3: Summary
const result3 = await dbQuery.generateStructuredQuery(
schema,
"Give me a summary of total spending this month"
);
console.log("Summary Response:", JSON.stringify(result3, null, 2));
}
if (require.main === module) {
main().catch(console.error);
}
module.exports = EnhancedDatabaseQuery;
import okhttp3.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.*;
public class EnhancedDatabaseQuery {
private static final String API_KEY = System.getenv("OPENAI_API_KEY");
private static final String API_URL = "https://api.openai.com/v1/chat/completions";
private final OkHttpClient client;
private final ObjectMapper mapper;
public EnhancedDatabaseQuery() {
this.client = new OkHttpClient();
this.mapper = new ObjectMapper();
}
public Map<String, Object> generateStructuredQuery(String schema, String question) throws Exception {
String systemPrompt = """
You are a database expert. Analyze the user's question and generate:
1. A valid SQL query
2. Determine the appropriate response type (transaction_list, charts, or summary)
3. Provide a short description
4. Identify relevant fields for the response
Response types:
- transaction_list: For detailed records/transactions
- charts: For data visualization (bar, pie, line charts)
- summary: For aggregated statistics and insights
Return your response as a JSON object with this structure:
{
"sql_query": "SELECT ...",
"response_type": "transaction_list|charts|summary",
"short_description": "Brief description of what this query returns",
"api_data": {
"type": "transaction_list|charts|summary",
"description": "Detailed description",
"data": {
"sql": "SELECT ...",
"fields": ["field1", "field2", ...]
}
}
}
""";
ObjectNode requestBody = mapper.createObjectNode();
requestBody.put("model", "gpt-4");
requestBody.put("response_format", mapper.createObjectNode().put("type", "json_object"));
ArrayNode messages = requestBody.putArray("messages");
ObjectNode systemMessage = messages.addObject();
systemMessage.put("role", "system");
systemMessage.put("content", systemPrompt);
ObjectNode userMessage = messages.addObject();
userMessage.put("role", "user");
userMessage.put("content", "Database Schema:\n" + schema + "\n\nQuestion: " + question);
RequestBody body = RequestBody.create(
mapper.writeValueAsString(requestBody),
MediaType.get("application/json")
);
Request request = new Request.Builder()
.url(API_URL)
.header("Authorization", "Bearer " + API_KEY)
.header("OpenAI-Beta", "assistants=v2")
.post(body)
.build();
try (Response response = client.newCall(request).execute()) {
JsonNode result = mapper.readTree(response.body().string());
String content = result.get("choices").get(0).get("message").get("content").asText();
try {
return mapper.readValue(content, Map.class);
} catch (Exception e) {
// Fallback to text response if JSON parsing fails
Map<String, Object> fallback = new HashMap<>();
fallback.put("sql_query", content);
fallback.put("response_type", "transaction_list");
fallback.put("short_description", "Query results");
Map<String, Object> apiData = new HashMap<>();
apiData.put("type", "transaction_list");
apiData.put("description", "Query results");
Map<String, Object> data = new HashMap<>();
data.put("sql", content);
data.put("fields", new ArrayList<>());
apiData.put("data", data);
fallback.put("api_data", apiData);
return fallback;
}
}
}
public Map<String, Object> executeAndFormatResponse(String schema, String question, Connection dbConnection) throws Exception {
// Generate structured query
Map<String, Object> queryResult = generateStructuredQuery(schema, question);
try {
// Execute the SQL query
String sqlQuery = (String) queryResult.get("sql_query");
Statement stmt = dbConnection.createStatement();
ResultSet rs = stmt.executeQuery(sqlQuery);
// Get column names
ResultSetMetaData metaData = rs.getMetaData();
int columnCount = metaData.getColumnCount();
List<String> fields = new ArrayList<>();
for (int i = 1; i <= columnCount; i++) {
fields.add(metaData.getColumnName(i));
}
// Convert results to list of maps
List<Map<String, Object>> results = new ArrayList<>();
while (rs.next()) {
Map<String, Object> row = new HashMap<>();
for (int i = 1; i <= columnCount; i++) {
row.put(metaData.getColumnName(i), rs.getObject(i));
}
results.add(row);
}
// Update the response with actual data
@SuppressWarnings("unchecked")
Map<String, Object> apiData = (Map<String, Object>) queryResult.get("api_data");
@SuppressWarnings("unchecked")
Map<String, Object> data = (Map<String, Object>) apiData.get("data");
data.put("fields", fields);
data.put("results", results);
rs.close();
stmt.close();
return queryResult;
} catch (Exception e) {
Map<String, Object> errorResult = new HashMap<>(queryResult);
errorResult.put("error", e.getMessage());
errorResult.put("short_description", "Query failed to execute");
return errorResult;
}
}
// Usage Example
public static void main(String[] args) throws Exception {
EnhancedDatabaseQuery dbQuery = new EnhancedDatabaseQuery();
String schema = """
CREATE TABLE transaction_details (
id VARCHAR(50) PRIMARY KEY,
user_id VARCHAR(50),
account_name VARCHAR(100),
amount DECIMAL(10,2),
merchant_name VARCHAR(100),
date DATE,
category VARCHAR(50)
);
""";
// Example 1: Transaction list
Map<String, Object> result1 = dbQuery.generateStructuredQuery(
schema,
"Show me all transactions over €50"
);
System.out.println("Transaction List Response: " + mapper.writeValueAsString(result1, System.out));
// Example 2: Chart data
Map<String, Object> result2 = dbQuery.generateStructuredQuery(
schema,
"Show spending by category as a pie chart"
);
System.out.println("Chart Response: " + mapper.writeValueAsString(result2, System.out));
// Example 3: Summary
Map<String, Object> result3 = dbQuery.generateStructuredQuery(
schema,
"Give me a summary of total spending this month"
);
System.out.println("Summary Response: " + mapper.writeValueAsString(result3, System.out));
}
}
# Enhanced Schema-Based Query Generation with Response Types
curl https://api.openai.com/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $OPENAI_API_KEY" \
-d '{
"model": "gpt-4",
"response_format": {"type": "json_object"},
"messages": [
{
"role": "system",
"content": "You are a database expert. Analyze the user question and generate: 1. A valid SQL query, 2. Determine response type (transaction_list, charts, or summary), 3. Provide short description, 4. Identify relevant fields. Return JSON with structure: {\"sql_query\": \"SELECT ...\", \"response_type\": \"transaction_list|charts|summary\", \"short_description\": \"Brief description\", \"api_data\": {\"type\": \"transaction_list|charts|summary\", \"description\": \"Detailed description\", \"data\": {\"sql\": \"SELECT ...\", \"fields\": [\"field1\", \"field2\"]}}}"
},
{
"role": "user",
"content": "Database Schema: CREATE TABLE transaction_details (id VARCHAR(50) PRIMARY KEY, user_id VARCHAR(50), account_name VARCHAR(100), amount DECIMAL(10,2), merchant_name VARCHAR(100), date DATE, category VARCHAR(50)); Question: Show me all transactions over €50"
}
]
}'
Response Type Detection Logic
The AI determines the response type based on keywords and context in the user’s question:
Transaction List Detection:
- Keywords: “show”, “list”, “all”, “transactions”, “records”, “details”
- Context: When user asks for specific records or detailed information
Chart Detection:
- Keywords: “chart”, “graph”, “pie”, “bar”, “line”, “visualize”, “by category”, “trend”
- Context: When user asks for data visualization or comparisons
Summary Detection:
- Keywords: “summary”, “total”, “average”, “count”, “overview”, “statistics”
- Context: When user asks for aggregated data or insights
Example Response Formats
Transaction List Response:
{
"sql_query": "SELECT * FROM transaction_details WHERE user_id = '0e212d25-d228-4673-a7f2-f680ba2b76be' AND amount > 50",
"response_type": "transaction_list",
"short_description": "Here are your transactions over €50.",
"api_data": {
"type": "transaction_list",
"description": "Here are your transactions over €50.",
"data": {
"sql": "SELECT * FROM transaction_details WHERE user_id = '0e212d25-d228-4673-a7f2-f680ba2b76be' AND amount > 50",
"fields": ["id", "user_id", "account_name", "amount", "merchant_name", "date"]
}
}
}
Chart Response:
{
"sql_query": "SELECT category, SUM(amount) as total_spending FROM transaction_details GROUP BY category",
"response_type": "charts",
"short_description": "Spending breakdown by category for visualization.",
"api_data": {
"type": "charts",
"description": "Spending breakdown by category for visualization.",
"data": {
"sql": "SELECT category, SUM(amount) as total_spending FROM transaction_details GROUP BY category",
"fields": ["category", "total_spending"]
}
}
}
Summary Response:
{
"sql_query": "SELECT COUNT(*) as total_transactions, SUM(amount) as total_spending, AVG(amount) as avg_amount FROM transaction_details WHERE date >= '2024-01-01'",
"response_type": "summary",
"short_description": "Monthly spending summary with key metrics.",
"api_data": {
"type": "summary",
"description": "Monthly spending summary with key metrics.",
"data": {
"sql": "SELECT COUNT(*) as total_transactions, SUM(amount) as total_spending, AVG(amount) as avg_amount FROM transaction_details WHERE date >= '2024-01-01'",
"fields": ["total_transactions", "total_spending", "avg_amount"]
}
}
}
Integration with Frontend
This structured response format makes it easy to integrate with frontend applications:
// Frontend integration example
async function handleDatabaseQuery(question) {
const response = await fetch('/api/database-query', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ question })
});
const result = await response.json();
switch(result.response_type) {
case 'transaction_list':
renderTransactionList(result.api_data.data.results);
break;
case 'charts':
renderChart(result.api_data.data.results);
break;
case 'summary':
renderSummary(result.api_data.data.results);
break;
}
}
Implementation Methods
Method 1: ChatCompletions API
Direct Implementation
const OpenAI = require('openai');
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY
});
async function queryWithData(dataContext, question) {
const response = await openai.chat.completions.create({
model: "gpt-4",
messages: [
{
role: "system",
content: "You are a data analyst. Analyze the provided data and answer questions accurately."
},
{
role: "user",
content: `Data: ${dataContext}\n\nQuestion: ${question}`
}
]
});
return response.choices[0].message.content;
}
// Usage
const schema = "users(id, name, email, age, department)";
const data = "1,John,john@email.com,30,Engineering\n2,Jane,jane@email.com,25,Marketing";
const result = await queryWithData(`Schema: ${schema}\nData: ${data}`, "Count users by department");
import openai
def query_with_data(data_context, question):
response = openai.chat.completions.create(
model="gpt-4",
messages=[
{
"role": "system",
"content": "You are a data analyst. Analyze the provided data and answer questions accurately."
},
{
"role": "user",
"content": f"Data: {data_context}\n\nQuestion: {question}"
}
]
)
return response.choices[0].message.content
# Usage
schema = "users(id, name, email, age, department)"
data = "1,John,john@email.com,30,Engineering\n2,Jane,jane@email.com,25,Marketing"
result = query_with_data(f"Schema: {schema}\nData: {data}", "Count users by department")
curl https://api.openai.com/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $OPENAI_API_KEY" \
-d '{
"model": "gpt-4",
"messages": [
{
"role": "system",
"content": "You are a data analyst. Analyze the provided data and answer questions accurately."
},
{
"role": "user",
"content": "Data: Schema: users(id, name, email, age, department)\nData: 1,John,john@email.com,30,Engineering\n2,Jane,jane@email.com,25,Marketing\n\nQuestion: Count users by department"
}
]
}'
import okhttp3.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
public class DataAnalyzer {
private static final String API_KEY = System.getenv("OPENAI_API_KEY");
private static final String API_URL = "https://api.openai.com/v1/chat/completions";
public String queryWithData(String dataContext, String question) throws Exception {
OkHttpClient client = new OkHttpClient();
ObjectMapper mapper = new ObjectMapper();
ObjectNode requestBody = mapper.createObjectNode();
requestBody.put("model", "gpt-4");
ArrayNode messages = requestBody.putArray("messages");
ObjectNode systemMessage = messages.addObject();
systemMessage.put("role", "system");
systemMessage.put("content", "You are a data analyst. Analyze the provided data and answer questions accurately.");
ObjectNode userMessage = messages.addObject();
userMessage.put("role", "user");
userMessage.put("content", "Data: " + dataContext + "\n\nQuestion: " + question);
RequestBody body = RequestBody.create(
mapper.writeValueAsString(requestBody),
MediaType.get("application/json")
);
Request request = new Request.Builder()
.url(API_URL)
.header("Authorization", "Bearer " + API_KEY)
.post(body)
.build();
Response response = client.newCall(request).execute();
return response.body().string();
}
// Usage
public void example() throws Exception {
String schema = "users(id, name, email, age, department)";
String data = "1,John,john@email.com,30,Engineering\n2,Jane,jane@email.com,25,Marketing";
String result = queryWithData("Schema: " + schema + "\nData: " + data, "Count users by department");
}
}
SQL Query Generation
const OpenAI = require('openai');
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY
});
async function generateSQLQuery(schema, question) {
const response = await openai.chat.completions.create({
model: "gpt-4",
messages: [
{
role: "system",
content: "You are a SQL expert. Generate accurate SQL queries based on the schema provided."
},
{
role: "user",
content: `Schema: ${schema}\n\nGenerate SQL for: ${question}`
}
]
});
return response.choices[0].message.content;
}
// Usage
const schema = `
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100),
age INTEGER,
department VARCHAR(50)
);
`;
const sqlQuery = await generateSQLQuery(schema, "Find average age by department");
def generate_sql_query(schema, question):
response = openai.chat.completions.create(
model="gpt-4",
messages=[
{
"role": "system",
"content": "You are a SQL expert. Generate accurate SQL queries based on the schema provided."
},
{
"role": "user",
"content": f"Schema: {schema}\n\nGenerate SQL for: {question}"
}
]
)
return response.choices[0].message.content
# Usage
schema = """
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100),
age INTEGER,
department VARCHAR(50)
);
"""
sql_query = generate_sql_query(schema, "Find average age by department")
curl https://api.openai.com/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $OPENAI_API_KEY" \
-d '{
"model": "gpt-4",
"messages": [
{
"role": "system",
"content": "You are a SQL expert. Generate accurate SQL queries based on the schema provided."
},
{
"role": "user",
"content": "Schema: CREATE TABLE users (id INTEGER PRIMARY KEY, name VARCHAR(100), email VARCHAR(100), age INTEGER, department VARCHAR(50));\n\nGenerate SQL for: Find average age by department"
}
]
}'
import okhttp3.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
public class SQLQueryGenerator {
private static final String API_KEY = System.getenv("OPENAI_API_KEY");
private static final String API_URL = "https://api.openai.com/v1/chat/completions";
public String generateSQLQuery(String schema, String question) throws Exception {
OkHttpClient client = new OkHttpClient();
ObjectMapper mapper = new ObjectMapper();
ObjectNode requestBody = mapper.createObjectNode();
requestBody.put("model", "gpt-4");
ArrayNode messages = requestBody.putArray("messages");
ObjectNode systemMessage = messages.addObject();
systemMessage.put("role", "system");
systemMessage.put("content", "You are a SQL expert. Generate accurate SQL queries based on the schema provided.");
ObjectNode userMessage = messages.addObject();
userMessage.put("role", "user");
userMessage.put("content", "Schema: " + schema + "\n\nGenerate SQL for: " + question);
RequestBody body = RequestBody.create(
mapper.writeValueAsString(requestBody),
MediaType.get("application/json")
);
Request request = new Request.Builder()
.url(API_URL)
.header("Authorization", "Bearer " + API_KEY)
.post(body)
.build();
Response response = client.newCall(request).execute();
return response.body().string();
}
// Usage
public void example() throws Exception {
String schema = """
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100),
age INTEGER,
department VARCHAR(50)
);
""";
String sqlQuery = generateSQLQuery(schema, "Find average age by department");
}
}
Method 2: Assistant API Approach
The Assistant API provides a more conversational and stateful approach to database interactions, maintaining context across multiple queries.
Setup Assistant
import openai
import time
from typing import Dict, Any
def create_database_assistant():
"""Create a specialized database assistant"""
assistant = openai.beta.assistants.create(
name="Database Query Assistant",
instructions="""
You are a database expert assistant. You can:
1. Analyze data and answer questions when data is provided
2. Generate SQL queries when only schema is provided
3. Explain query results and provide insights
4. Maintain context across multiple related queries
Always be accurate and explain your reasoning.
When generating SQL, ensure queries are safe and valid.
""",
model="gpt-4",
tools=[{"type": "code_interpreter"}]
)
return assistant
def query_database_assistant(assistant_id: str, message: str, thread_id: str = None) -> Dict[str, Any]:
"""Query the database assistant"""
# Create or use existing thread
if thread_id is None:
thread = openai.beta.threads.create()
thread_id = thread.id
# Add message to thread
openai.beta.threads.messages.create(
thread_id=thread_id,
role="user",
content=message
)
# Create and run
run = openai.beta.threads.runs.create(
thread_id=thread_id,
assistant_id=assistant_id
)
# Wait for completion
while run.status in ['queued', 'in_progress', 'cancelling']:
time.sleep(1)
run = openai.beta.threads.runs.retrieve(
thread_id=thread_id,
run_id=run.id
)
if run.status == 'completed':
messages = openai.beta.threads.messages.list(thread_id=thread_id)
response = messages.data[0].content[0].text.value
return {
"response": response,
"thread_id": thread_id,
"status": "success"
}
else:
return {
"error": f"Run failed with status: {run.status}",
"thread_id": thread_id,
"status": "error"
}
# Usage Example
if __name__ == "__main__":
# Create assistant
assistant = create_database_assistant()
print(f"Created assistant: {assistant.id}")
# Query with schema
schema_message = """
I have a database with this schema:
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100),
age INTEGER,
department VARCHAR(50)
);
CREATE TABLE projects (
id INTEGER PRIMARY KEY,
name VARCHAR(100),
user_id INTEGER,
status VARCHAR(20),
FOREIGN KEY (user_id) REFERENCES users(id)
);
Generate SQL to find all users in Engineering department with their project count.
"""
result = query_database_assistant(assistant.id, schema_message)
print("Assistant response:", result['response'])
# Follow up question using same thread
followup = "Now modify that query to only show users with more than 2 projects."
result2 = query_database_assistant(assistant.id, followup, result['thread_id'])
print("Follow-up response:", result2['response'])
const OpenAI = require('openai');
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY
});
class DatabaseAssistant {
constructor() {
this.assistantId = null;
}
async createAssistant() {
const assistant = await openai.beta.assistants.create({
name: "Database Query Assistant",
instructions: `
You are a database expert assistant. You can:
1. Analyze data and answer questions when data is provided
2. Generate SQL queries when only schema is provided
3. Explain query results and provide insights
4. Maintain context across multiple related queries
Always be accurate and explain your reasoning.
When generating SQL, ensure queries are safe and valid.
`,
model: "gpt-4",
tools: [{ type: "code_interpreter" }]
});
this.assistantId = assistant.id;
return assistant;
}
async queryAssistant(message, threadId = null) {
try {
// Create or use existing thread
let thread;
if (threadId) {
thread = { id: threadId };
} else {
thread = await openai.beta.threads.create();
}
// Add message to thread
await openai.beta.threads.messages.create(thread.id, {
role: "user",
content: message
});
// Create and run
const run = await openai.beta.threads.runs.create(thread.id, {
assistant_id: this.assistantId
});
// Wait for completion
let runStatus = await openai.beta.threads.runs.retrieve(thread.id, run.id);
while (['queued', 'in_progress', 'cancelling'].includes(runStatus.status)) {
await new Promise(resolve => setTimeout(resolve, 1000));
runStatus = await openai.beta.threads.runs.retrieve(thread.id, run.id);
}
if (runStatus.status === 'completed') {
const messages = await openai.beta.threads.messages.list(thread.id);
const response = messages.data[0].content[0].text.value;
return {
response,
threadId: thread.id,
status: 'success'
};
} else {
return {
error: `Run failed with status: ${runStatus.status}`,
threadId: thread.id,
status: 'error'
};
}
} catch (error) {
return {
error: error.message,
status: 'error'
};
}
}
}
// Usage Example
async function main() {
const dbAssistant = new DatabaseAssistant();
// Create assistant
const assistant = await dbAssistant.createAssistant();
console.log(`Created assistant: ${assistant.id}`);
// Query with schema
const schemaMessage = `
I have a database with this schema:
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100),
age INTEGER,
department VARCHAR(50)
);
Generate SQL to find all users in Engineering department.
`;
const result = await dbAssistant.queryAssistant(schemaMessage);
console.log('Assistant response:', result.response);
// Follow up question
const followup = "Now filter those results to show only Engineering department employees"
const result2 = await dbAssistant.queryAssistant(followup, result.threadId);
console.log('Follow-up response:', result2.response);
}
if (require.main === module) {
main().catch(console.error);
}
module.exports = DatabaseAssistant;
# Create Assistant
curl https://api.openai.com/v1/assistants \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $OPENAI_API_KEY" \
-H "OpenAI-Beta: assistants=v2" \
-d '{
"name": "Database Query Assistant",
"instructions": "You are a database expert assistant. Generate SQL queries and analyze data based on provided schemas.",
"model": "gpt-4",
"tools": [{"type": "code_interpreter"}]
}'
# Create Thread
curl https://api.openai.com/v1/threads \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $OPENAI_API_KEY" \
-H "OpenAI-Beta: assistants=v2" \
-d '{}'
# Add Message to Thread
curl https://api.openai.com/v1/threads/{thread_id}/messages \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $OPENAI_API_KEY" \
-H "OpenAI-Beta: assistants=v2" \
-d '{
"role": "user",
"content": "Schema: CREATE TABLE users (id INTEGER, name VARCHAR(100), department VARCHAR(50)); Generate SQL to count users by department."
}'
# Create Run
curl https://api.openai.com/v1/threads/{thread_id}/runs \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $OPENAI_API_KEY" \
-H "OpenAI-Beta: assistants=v2" \
-d '{
"assistant_id": "{assistant_id}"
}'
# Check Run Status
curl https://api.openai.com/v1/threads/{thread_id}/runs/{run_id} \
-H "Authorization: Bearer $OPENAI_API_KEY" \
-H "OpenAI-Beta: assistants=v2"
# List Messages (after run completes)
curl https://api.openai.com/v1/threads/{thread_id}/messages \
-H "Authorization: Bearer $OPENAI_API_KEY" \
-H "OpenAI-Beta: assistants=v2"
import okhttp3.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class DatabaseAssistant {
private static final String API_BASE = "https://api.openai.com/v1";
private final String apiKey;
private final OkHttpClient client;
private final ObjectMapper mapper;
private String assistantId;
public DatabaseAssistant(String apiKey) {
this.apiKey = apiKey;
this.client = new OkHttpClient.Builder()
.readTimeout(60, TimeUnit.SECONDS)
.build();
this.mapper = new ObjectMapper();
}
public String createAssistant() throws IOException {
ObjectNode requestBody = mapper.createObjectNode();
requestBody.put("name", "Database Query Assistant");
requestBody.put("instructions",
"You are a database expert assistant. Generate SQL queries and analyze data based on provided schemas.");
requestBody.put("model", "gpt-4");
ArrayNode tools = requestBody.putArray("tools");
ObjectNode tool = tools.addObject();
tool.put("type", "code_interpreter");
RequestBody body = RequestBody.create(
mapper.writeValueAsString(requestBody),
MediaType.get("application/json")
);
Request request = new Request.Builder()
.url(API_BASE + "/assistants")
.header("Authorization", "Bearer " + apiKey)
.header("OpenAI-Beta", "assistants=v2")
.post(body)
.build();
try (Response response = client.newCall(request).execute()) {
JsonNode result = mapper.readTree(response.body().string());
this.assistantId = result.get("id").asText();
return this.assistantId;
}
}
public String createThread() throws IOException {
RequestBody body = RequestBody.create("{}", MediaType.get("application/json"));
Request request = new Request.Builder()
.url(API_BASE + "/threads")
.header("Authorization", "Bearer " + apiKey)
.header("OpenAI-Beta", "assistants=v2")
.post(body)
.build();
try (Response response = client.newCall(request).execute()) {
JsonNode result = mapper.readTree(response.body().string());
return result.get("id").asText();
}
}
public void addMessage(String threadID, String content) throws IOException {
ObjectNode requestBody = mapper.createObjectNode();
requestBody.put("role", "user");
requestBody.put("content", content);
RequestBody body = RequestBody.create(
mapper.writeValueAsString(requestBody),
MediaType.get("application/json")
);
Request request = new Request.Builder()
.url(API_BASE + "/threads/" + threadID + "/messages")
.header("Authorization", "Bearer " + apiKey)
.header("OpenAI-Beta", "assistants=v2")
.post(body)
.build();
try (Response response = client.newCall(request).execute()) {
// Message created successfully
}
}
public String createAndWaitForRun(String threadId) throws IOException, InterruptedException {
ObjectNode requestBody = mapper.createObjectNode();
requestBody.put("assistant_id", this.assistantId);
RequestBody body = RequestBody.create(
mapper.writeValueAsString(requestBody),
MediaType.get("application/json")
);
Request request = new Request.Builder()
.url(API_BASE + "/threads/" + threadId + "/runs")
.header("Authorization", "Bearer " + apiKey)
.header("OpenAI-Beta", "assistants=v2")
.post(body)
.build();
String runId;
try (Response response = client.newCall(request).execute()) {
JsonNode result = mapper.readTree(response.body().string());
runId = result.get("id").asText();
}
// Wait for completion
String status;
do {
Thread.sleep(1000);
Request statusRequest = new Request.Builder()
.url(API_BASE + "/threads/" + threadId + "/runs/" + runId)
.header("Authorization", "Bearer " + apiKey)
.header("OpenAI-Beta", "assistants=v2")
.get()
.build();
try (Response statusResponse = client.newCall(statusRequest).execute()) {
JsonNode statusResult = mapper.readTree(statusResponse.body().string());
status = statusResult.get("status").asText();
}
} while ("queued".equals(status) || "in_progress".equals(status) || "cancelling".equals(status));
return status;
}
public String getLatestMessage(String threadId) throws IOException {
Request request = new Request.Builder()
.url(API_BASE + "/threads/" + threadId + "/messages")
.header("Authorization", "Bearer " + apiKey)
.header("OpenAI-Beta", "assistants=v2")
.get()
.build();
try (Response response = client.newCall(request).execute()) {
JsonNode result = mapper.readTree(response.body().string());
JsonNode messages = result.get("data");
if (messages.size() > 0) {
JsonNode latestMessage = messages.get(0);
JsonNode content = latestMessage.get("content").get(0);
return content.get("text").get("value").asText();
}
return "No response received";
}
}
public String queryAssistant(String message, String threadId) throws IOException, InterruptedException {
if (threadId == null) {
threadId = createThread();
}
addMessage(threadId, message);
String status = createAndWaitForRun(threadId);
if ("completed".equals(status)) {
return getLatestMessage(threadId);
} else {
return "Run failed with status: " + status;
}
}
// Usage Example
public static void main(String[] args) throws Exception {
DatabaseAssistant assistant = new DatabaseAssistant(System.getenv("OPENAI_API_KEY"));
// Create assistant
String assistantId = assistant.createAssistant();
System.out.println("Created assistant: " + assistantId);
// Query with schema
String schemaMessage = """
I have a database with this schema:
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name VARCHAR(100),
department VARCHAR(50)
);
Generate SQL to count users by department.
""";
String response = assistant.queryAssistant(schemaMessage, null);
System.out.println("Assistant response: " + response);
}
}
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
)
type DatabaseAssistant struct {
APIKey string
AssistantID string
Client *http.Client
}
type AssistantRequest struct {
Name string `json:"name"`
Instructions string `json:"instructions"`
Model string `json:"model"`
Tools []Tool `json:"tools"`
}
type Tool struct {
Type string `json:"type"`
}
type ThreadRequest struct{}
type MessageRequest struct {
Role string `json:"role"`
Content string `json:"content"`
}
type RunRequest struct {
AssistantID string `json:"assistant_id"`
}
type APIResponse struct {
ID string `json:"id"`
Status string `json:"status"`
Data json.RawMessage `json:"data"`
}
type Message struct {
Content []struct {
Text struct {
Value string `json:"value"`
} `json:"text"`
} `json:"content"`
}
type MessagesResponse struct {
Data []Message `json:"data"`
}
func NewDatabaseAssistant(apiKey string) *DatabaseAssistant {
return &DatabaseAssistant{
APIKey: apiKey,
Client: &http.Client{Timeout: 60 * time.Second},
}
}
func (da *DatabaseAssistant) makeRequest(method, endpoint string, body interface{}) (*http.Response, error) {
var reqBody io.Reader
if body != nil {
jsonBody, err := json.Marshal(body)
if err != nil {
return nil, err
}
reqBody = bytes.NewBuffer(jsonBody)
}
req, err := http.NewRequest(method, "https://api.openai.com/v1"+endpoint, reqBody)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+da.APIKey)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("OpenAI-Beta", "assistants=v2")
return da.Client.Do(req)
}
func (da *DatabaseAssistant) CreateAssistant() error {
assistantReq := AssistantRequest{
Name: "Database Query Assistant",
Instructions: `You are a database expert assistant. Generate SQL queries and analyze data based on provided schemas.`,
Model: "gpt-4",
Tools: []Tool{{Type: "code_interpreter"}},
}
resp, err := da.makeRequest("POST", "/assistants", assistantReq)
if err != nil {
return err
}
defer resp.Body.Close()
var result APIResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return err
}
da.AssistantID = result.ID
return nil
}
func (da *DatabaseAssistant) CreateThread() (string, error) {
resp, err := da.makeRequest("POST", "/threads", ThreadRequest{})
if err != nil {
return "", err
}
defer resp.Body.Close()
var result APIResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return "", err
}
return result.ID, nil
}
func (da *DatabaseAssistant) AddMessage(threadID, content string) error {
messageReq := MessageRequest{
Role: "user",
Content: content,
}
resp, err := da.makeRequest("POST", fmt.Sprintf("/threads/%s/messages", threadID), messageReq)
if err != nil {
return err
}
defer resp.Body.Close()
return nil
}
func (da *DatabaseAssistant) CreateAndWaitForRun(threadID string) (string, error) {
runReq := RunRequest{AssistantID: da.AssistantID}
resp, err := da.makeRequest("POST", fmt.Sprintf("/threads/%s/runs", threadID), runReq)
if err != nil {
return "", err
}
defer resp.Body.Close()
var runResult APIResponse
if err := json.NewDecoder(resp.Body).Decode(&runResult); err != nil {
return "", err
}
runID := runResult.ID
// Wait for completion
for {
time.Sleep(1 * time.Second)
statusResp, err := da.makeRequest("GET", fmt.Sprintf("/threads/%s/runs/%s", threadID, runID), nil)
if err != nil {
return "", err
}
var statusResult APIResponse
json.NewDecoder(statusResp.Body).Decode(&statusResult)
statusResp.Body.Close()
if statusResult.Status == "completed" {
return "completed", nil
} else if statusResult.Status != "queued" && statusResult.Status != "in_progress" {
return statusResult.Status, fmt.Errorf("run failed with status: %s", statusResult.Status)
}
}
}
func (da *DatabaseAssistant) GetLatestMessage(threadID string) (string, error) {
resp, err := da.makeRequest("GET", fmt.Sprintf("/threads/%s/messages", threadID), nil)
if err != nil {
return "", err
}
defer resp.Body.Close()
var messagesResp MessagesResponse
if err := json.NewDecoder(resp.Body).Decode(&messagesResp); err != nil {
return "", err
}
if len(messagesResp.Data) > 0 && len(messagesResp.Data[0].Content) > 0 {
return messagesResp.Data[0].Content[0].Text.Value, nil
}
return "No response received", nil
}
func (da *DatabaseAssistant) QueryAssistant(message, threadID string) (string, string, error) {
if threadID == "" {
var err error
threadID, err = da.CreateThread()
if err != nil {
return "", "", err
}
}
if err := da.AddMessage(threadID, message); err != nil {
return "", threadID, err
}
status, err := da.CreateAndWaitForRun(threadID)
if err != nil {
return "", threadID, err
}
if status == "completed" {
response, err := da.GetLatestMessage(threadID)
return response, threadID, err
}
return fmt.Sprintf("Run failed with status: %s", status), threadID, nil
}
// Usage Example
func main() {
assistant := NewDatabaseAssistant("your-api-key")
// Create assistant
if err := assistant.CreateAssistant(); err != nil {
fmt.Printf("Error creating assistant: %v\n", err)
return
}
fmt.Printf("Created assistant: %s\n", assistant.AssistantID)
// Query with schema
schemaMessage := `
I have a database with this schema:
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name VARCHAR(100),
department VARCHAR(50)
);
Generate SQL to count users by department.
`
response, threadID, err := assistant.QueryAssistant(schemaMessage, "")
if err != nil {
fmt.Printf("Error querying assistant: %v\n", err)
return
}
fmt.Printf("Assistant response: %s\n", response)
fmt.Printf("Thread ID: %s\n", threadID)
// Follow-up query
followup := "Now modify the query to also show average age by department."
response2, _, err := assistant.QueryAssistant(followup, threadID)
if err != nil {
fmt.Printf("Error with follow-up: %v\n", err)
return
}
fmt.Printf("Follow-up response: %s\n", response2)
}
Complete Implementation Example
Here’s a comprehensive implementation that combines both ChatCompletion and Assistant API approaches:
import sqlite3
import openai
import time
from typing import Dict, List, Optional
from enum import Enum
class QueryMethod(Enum):
CHAT_COMPLETION = "chat_completion"
ASSISTANT_API = "assistant_api"
class AIDatabase:
def __init__(self, db_path: str, api_key: str):
self.conn = sqlite3.connect(db_path)
openai.api_key = api_key
self.assistant_id: Optional[str] = None
self.thread_id: Optional[str] = None
def get_schema(self) -> str:
cursor = self.conn.cursor()
cursor.execute("SELECT sql FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()
return "\n".join([table[0] for table in tables])
def execute_query(self, query: str) -> List[Dict]:
cursor = self.conn.cursor()
cursor.execute(query)
columns = [description[0] for description in cursor.description]
rows = cursor.fetchall()
return [dict(zip(columns, row)) for row in rows]
def setup_assistant(self) -> str:
"""Create and configure the database assistant"""
if self.assistant_id:
return self.assistant_id
assistant = openai.beta.assistants.create(
name="Database Query Assistant",
instructions=f"""
You are a database expert assistant. You have access to a database with the following schema:
{self.get_schema()}
Your tasks:
1. Generate valid SQL queries based on user questions
2. Explain query results in natural language
3. Provide insights and recommendations
4. Maintain context across multiple queries
Always ensure SQL queries are safe and valid.
""",
model="gpt-4",
tools=[{"type": "code_interpreter"}]
)
self.assistant_id = assistant.id
# Create persistent thread for conversation
thread = openai.beta.threads.create()
self.thread_id = thread.id
return self.assistant_id
def chat_completion_query(self, question: str) -> Dict:
"""Use ChatCompletion API for one-off queries"""
schema = self.get_schema()
# Generate SQL query
response = openai.chat.completions.create(
model="gpt-4",
messages=[
{
"role": "system",
"content": "Generate only valid SQL queries based on the schema. No explanations."
},
{
"role": "user",
"content": f"Schema:\n{schema}\n\nQuestion: {question}"
}
]
)
sql_query = response.choices[0].message.content.strip()
try:
results = self.execute_query(sql_query)
# Generate explanation
explanation_response = openai.chat.completions.create(
model="gpt-4",
messages=[
{
"role": "user",
"content": f"Explain these query results in natural language:\nQuery: {sql_query}\nResults: {results[:5]}..."
}
]
)
return {
"query": sql_query,
"results": results,
"explanation": explanation_response.choices[0].message.content,
"method": "chat_completion"
}
except Exception as e:
return {"error": str(e), "query": sql_query, "method": "chat_completion"}
def assistant_query(self, question: str) -> Dict:
"""Use Assistant API for conversational queries"""
if not self.assistant_id:
self.setup_assistant()
# Add message to thread
openai.beta.threads.messages.create(
thread_id=self.thread_id,
role="user",
content=f"Question: {question}\n\nPlease generate the SQL query and then execute it to provide results."
)
# Create and run
run = openai.beta.threads.runs.create(
thread_id=self.thread_id,
assistant_id=self.assistant_id
)
# Wait for completion
while run.status in ['queued', 'in_progress', 'cancelling']:
time.sleep(1)
run = openai.beta.threads.runs.retrieve(
thread_id=self.thread_id,
run_id=run.id
)
if run.status == 'completed':
messages = openai.beta.threads.messages.list(thread_id=self.thread_id)
assistant_response = messages.data[0].content[0].text.value
# Extract SQL query from response (if generated)
sql_query = self._extract_sql_from_response(assistant_response)
try:
results = self.execute_query(sql_query) if sql_query else []
return {
"assistant_response": assistant_response,
"query": sql_query,
"results": results,
"thread_id": self.thread_id,
"method": "assistant_api"
}
except Exception as e:
return {
"assistant_response": assistant_response,
"query": sql_query,
"error": str(e),
"method": "assistant_api"
}
else:
return {
"error": f"Assistant run failed with status: {run.status}",
"method": "assistant_api"
}
def _extract_sql_from_response(self, response: str) -> Optional[str]:
"""Extract SQL query from assistant response"""
import re
# Look for SQL code blocks
sql_pattern = r'```sql\s*(.*?)\s*```'
match = re.search(sql_pattern, response, re.DOTALL | re.IGNORECASE)
if match:
return match.group(1).strip()
# Look for SELECT, INSERT, UPDATE, DELETE statements
query_pattern = r'\b(SELECT|INSERT|UPDATE|DELETE)\b.*?(?=;|$)'
match = re.search(query_pattern, response, re.DOTALL | re.IGNORECASE)
if match:
return match.group(0).strip()
return None
def natural_language_query(self, question: str, method: QueryMethod = QueryMethod.CHAT_COMPLETION) -> Dict:
"""Main interface for natural language database queries"""
if method == QueryMethod.CHAT_COMPLETION:
return self.chat_completion_query(question)
elif method == QueryMethod.ASSISTANT_API:
return self.assistant_query(question)
else:
raise ValueError(f"Unknown query method: {method}")
def conversation_query(self, question: str) -> Dict:
"""Use for follow-up questions and conversational queries"""
return self.assistant_query(question)
def close(self):
"""Close database connection"""
self.conn.close()
# Usage Examples
if __name__ == "__main__":
# Initialize AI Database
ai_db = AIDatabase("my_database.db", "your-api-key")
print("=== ChatCompletion Method ===")
result1 = ai_db.natural_language_query(
"What's the average salary by department?",
QueryMethod.CHAT_COMPLETION
)
print(f"Query: {result1.get('query')}")
print(f"Explanation: {result1.get('explanation')}")
print("\n=== Assistant API Method ===")
result2 = ai_db.natural_language_query(
"Show me the top 5 employees by salary",
QueryMethod.ASSISTANT_API
)
print(f"Assistant Response: {result2.get('assistant_response')}")
print("\n=== Follow-up Conversation ===")
followup = ai_db.conversation_query(
"Now filter those results to show only employees from the Engineering department"
)
print(f"Follow-up Response: {followup.get('assistant_response')}")
ai_db.close()
const sqlite3 = require('sqlite3').verbose();
const OpenAI = require('openai');
class AIDatabase {
constructor(dbPath, apiKey) {
this.db = new sqlite3.Database(dbPath);
this.openai = new OpenAI({ apiKey });
this.assistantId = null;
this.threadId = null;
}
async getSchema() {
return new Promise((resolve, reject) => {
this.db.all("SELECT sql FROM sqlite_master WHERE type='table';", (err, rows) => {
if (err) reject(err);
else resolve(rows.map(row => row.sql).join('\n'));
});
});
}
async executeQuery(query) {
return new Promise((resolve, reject) => {
this.db.all(query, (err, rows) => {
if (err) reject(err);
else resolve(rows);
});
});
}
async setupAssistant() {
if (this.assistantId) return this.assistantId;
const schema = await this.getSchema();
const assistant = await this.openai.beta.assistants.create({
name: "Database Query Assistant",
instructions: `
You are a database expert assistant. You can:
1. Analyze data and answer questions when data is provided
2. Generate SQL queries when only schema is provided
3. Explain query results and provide insights
4. Maintain context across multiple related queries
Always be accurate and explain your reasoning.
When generating SQL, ensure queries are safe and valid.
`,
model: "gpt-4",
tools: [{ type: "code_interpreter" }]
});
this.assistantId = assistant.id;
const thread = await this.openai.beta.threads.create();
this.threadId = thread.id;
return this.assistantId;
}
async chatCompletionQuery(question) {
const schema = await this.getSchema();
const response = await this.openai.chat.completions.create({
model: "gpt-4",
messages: [
{
role: "system",
content: "Generate only valid SQL queries based on the schema."
},
{
role: "user",
content: `Schema:\n${schema}\n\nQuestion: ${question}`
}
]
});
const sqlQuery = response.choices[0].message.content.trim();
try {
const results = await this.executeQuery(sqlQuery);
// Generate explanation
const explanationResponse = await this.openai.chat.completions.create({
model: "gpt-4",
messages: [
{
role: "user",
content: `Explain these query results:\nQuery: ${sqlQuery}\nResults: ${JSON.stringify(results.slice(0, 5))}`
}
]
});
return {
query: sqlQuery,
results,
explanation: explanationResponse.choices[0].message.content,
method: 'chat_completion'
};
} catch (error) {
return {
error: error.message,
query: sqlQuery,
method: 'chat_completion'
};
}
}
async assistantQuery(question) {
await this.setupAssistant();
await this.openai.beta.threads.messages.create(this.threadId, {
role: "user",
content: question
});
const run = await this.openai.beta.threads.runs.create(this.threadId, {
assistant_id: this.assistantId
});
let runStatus = await this.openai.beta.threads.runs.retrieve(this.threadId, run.id);
while (['queued', 'in_progress', 'cancelling'].includes(runStatus.status)) {
await new Promise(resolve => setTimeout(resolve, 1000));
runStatus = await this.openai.beta.threads.runs.retrieve(this.threadId, run.id);
}
if (runStatus.status === 'completed') {
const messages = await this.openai.beta.threads.messages.list(this.threadId);
const assistantResponse = messages.data[0].content[0].text.value;
// Extract SQL query from response (if generated)
const sqlQuery = this.extractSQLFromResponse(assistantResponse);
try {
const results = sqlQuery ? await this.executeQuery(sqlQuery) : [];
return {
assistantResponse,
query: sqlQuery,
results,
threadId: this.threadId,
method: 'assistant_api'
};
} catch (error) {
return {
assistantResponse,
query: sqlQuery,
error: error.message,
method: 'assistant_api'
};
}
} else {
return {
error: `Assistant run failed with status: ${runStatus.status}`,
method: 'assistant_api'
};
}
}
extractSQLFromResponse(response) {
const sqlBlockMatch = response.match(/```sql\s*([\s\S]*?)\s*```/i);
if (sqlBlockMatch) {
return sqlBlockMatch[1].trim();
}
const queryMatch = response.match(/\b(SELECT|INSERT|UPDATE|DELETE)\b[\s\S]*?(?=;|$)/i);
if (queryMatch) {
return queryMatch[0].trim();
}
return null;
}
async naturalLanguageQuery(question, method = 'chat_completion') {
if (method === 'chat_completion') {
return await this.chatCompletionQuery(question);
} else if (method === 'assistant_api') {
return await this.assistantQuery(question);
} else {
throw new Error(`Unknown query method: ${method}`);
}
}
async conversationQuery(question) {
return await this.assistantQuery(question);
}
close() {
this.db.close();
}
}
// Usage Example
async function main() {
const aiDb = new AIDatabase('my_database.db', process.env.OPENAI_API_KEY);
try {
console.log('=== ChatCompletion Method ===');
const result1 = await aiDb.naturalLanguageQuery(
"What's the average salary by department?",
'chat_completion'
);
console.log('Query:', result1.query);
console.log('Explanation:', result1.explanation);
console.log('\n=== Assistant API Method ===');
const result2 = await aiDb.naturalLanguageQuery(
"Show me the top 5 employees by salary",
'assistant_api'
);
console.log('Assistant Response:', result2.assistantResponse);
console.log('\n=== Follow-up Conversation ===');
const followup = await aiDb.conversationQuery(
"Now filter those results to show only Engineering department employees"
);
console.log('Follow-up Response:', followup.assistantResponse);
} catch (error) {
console.error('Error:', error);
} finally {
aiDb.close();
}
}
if (require.main === module) {
main();
}
module.exports = AIDatabase;
import java.sql.*;
import java.util.*;
import okhttp3.*;
import com.fasterxml.jackson.databind.*;
import com.fasterxml.jackson.databind.node.*;
public class AIDatabase {
private static final String API_BASE = "https://api.openai.com/v1";
private final String apiKey;
private final OkHttpClient client;
private final ObjectMapper mapper;
private String assistantId;
public AIDatabase(String apiKey) {
this.apiKey = apiKey;
this.client = new OkHttpClient.Builder()
.readTimeout(60, TimeUnit.SECONDS)
.build();
this.mapper = new ObjectMapper();
}
public String createAssistant() throws IOException {
ObjectNode requestBody = mapper.createObjectNode();
requestBody.put("name", "Database Query Assistant");
requestBody.put("instructions",
"You are a database expert assistant. Generate SQL queries and analyze data based on provided schemas.");
requestBody.put("model", "gpt-4");
ArrayNode tools = requestBody.putArray("tools");
ObjectNode tool = tools.addObject();
tool.put("type", "code_interpreter");
RequestBody body = RequestBody.create(
mapper.writeValueAsString(requestBody),
MediaType.get("application/json")
);
Request request = new Request.Builder()
.url(API_BASE + "/assistants")
.header("Authorization", "Bearer " + apiKey)
.header("OpenAI-Beta", "assistants=v2")
.post(body)
.build();
try (Response response = client.newCall(request).execute()) {
JsonNode result = mapper.readTree(response.body().string());
this.assistantId = result.get("id").asText();
return this.assistantId;
}
}
public String createThread() throws IOException {
RequestBody body = RequestBody.create("{}", MediaType.get("application/json"));
Request request = new Request.Builder()
.url(API_BASE + "/threads")
.header("Authorization", "Bearer " + apiKey)
.header("OpenAI-Beta", "assistants=v2")
.post(body)
.build();
try (Response response = client.newCall(request).execute()) {
JsonNode result = mapper.readTree(response.body().string());
return result.get("id").asText();
}
}
public void addMessage(String threadID, String content) throws IOException {
ObjectNode requestBody = mapper.createObjectNode();
requestBody.put("role", "user");
requestBody.put("content", content);
RequestBody body = RequestBody.create(
mapper.writeValueAsString(requestBody),
MediaType.get("application/json")
);
Request request = new Request.Builder()
.url(API_BASE + "/threads/" + threadID + "/messages")
.header("Authorization", "Bearer " + apiKey)
.header("OpenAI-Beta", "assistants=v2")
.post(body)
.build();
try (Response response = client.newCall(request).execute()) {
// Message created successfully
}
}
public String createAndWaitForRun(String threadId) throws IOException, InterruptedException {
ObjectNode requestBody = mapper.createObjectNode();
requestBody.put("assistant_id", this.assistantId);
RequestBody body = RequestBody.create(
mapper.writeValueAsString(requestBody),
MediaType.get("application/json")
);
Request request = new Request.Builder()
.url(API_BASE + "/threads/" + threadId + "/runs")
.header("Authorization", "Bearer " + apiKey)
.header("OpenAI-Beta", "assistants=v2")
.post(body)
.build();
String runId;
try (Response response = client.newCall(request).execute()) {
JsonNode result = mapper.readTree(response.body().string());
runId = result.get("id").asText();
}
// Wait for completion
String status;
do {
Thread.sleep(1000);
Request statusRequest = new Request.Builder()
.url(API_BASE + "/threads/" + threadId + "/runs/" + runId)
.header("Authorization", "Bearer " + apiKey)
.header("OpenAI-Beta", "assistants=v2")
.get()
.build();
try (Response statusResponse = client.newCall(statusRequest).execute()) {
JsonNode statusResult = mapper.readTree(statusResponse.body().string());
status = statusResult.get("status").asText();
}
} while ("queued".equals(status) || "in_progress".equals(status) || "cancelling".equals(status));
return status;
}
public String getLatestMessage(String threadId) throws IOException {
Request request = new Request.Builder()
.url(API_BASE + "/threads/" + threadId + "/messages")
.header("Authorization", "Bearer " + apiKey)
.header("OpenAI-Beta", "assistants=v2")
.get()
.build();
try (Response response = client.newCall(request).execute()) {
JsonNode result = mapper.readTree(response.body().string());
JsonNode messages = result.get("data");
if (messages.size() > 0) {
JsonNode latestMessage = messages.get(0);
JsonNode content = latestMessage.get("content").get(0);
return content.get("text").get("value").asText();
}
return "No response received";
}
}
public String queryAssistant(String message, String threadId) throws IOException, InterruptedException {
if (threadId == null) {
threadId = createThread();
}
addMessage(threadId, message);
String status = createAndWaitForRun(threadId);
if ("completed".equals(status)) {
return getLatestMessage(threadId);
} else {
return "Run failed with status: " + status;
}
}
// Usage Example
public static void main(String[] args) throws Exception {
AIDatabase aiDb = new AIDatabase(System.getenv("OPENAI_API_KEY"));
// Create assistant
String assistantId = aiDb.createAssistant();
System.out.println("Created assistant: " + assistantId);
// Query with schema
String schemaMessage = """
I have a database with this schema:
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name VARCHAR(100),
department VARCHAR(50)
);
Generate SQL to count users by department.
""";
String response = aiDb.queryAssistant(schemaMessage, null);
System.out.println("Assistant response: " + response);
}
}
Best Practices
Security Considerations
- Never expose sensitive data in prompts for public APIs
- Use environment variables for API keys
- Implement proper authentication and authorization
- Sanitize AI-generated SQL queries before execution
Performance Optimization
- Cache frequently used schemas
- Implement query result caching
- Use connection pooling for database connections
- Limit result set sizes
Error Handling
def safe_ai_query(question: str, schema: str, max_retries: int = 3):
for attempt in range(max_retries):
try:
# Generate query
sql_query = generate_sql_query(schema, question)
# Validate query (basic check)
if not sql_query.strip().upper().startswith(('SELECT', 'WITH')):
raise ValueError("Invalid query type generated")
return execute_query(sql_query)
except Exception as e:
if attempt == max_retries - 1:
raise e
continue
Choosing the Right Approach
Factor | Direct Data | Schema-Based |
---|---|---|
Data Size | < 1000 rows | Any size |
Security | Less secure | More secure |
Accuracy | Higher | Moderate |
Performance | Faster | Slower |
Complexity | Simple | Complex |
Conclusion
Both approaches have their place in AI-database integration. Start with direct data attachment for prototypes and small datasets, then move to schema-based query generation for production systems. The Assistant API provides better conversation flow, while ChatCompletions API offers more direct control.
Choose your approach based on your specific needs: data size, security requirements, and system complexity.
Written by Kartik Kalia
Web developer and technical writer passionate about creating exceptional digital experiences.