当前位置: 首页 > news >正文

Go开发一个简单的数据库mcp

以下是参考别人写的,本来想弄一个自然语言查询,发现太难了。。。

package mainimport ("context""encoding/csv""flag""fmt""log""strings"_ "github.com/go-sql-driver/mysql""github.com/jmoiron/sqlx""github.com/mark3labs/mcp-go/mcp""github.com/mark3labs/mcp-go/server"
)const (StatementTypeNoExplainCheck = ""StatementTypeSelect         = "SELECT"StatementTypeInsert         = "INSERT"StatementTypeUpdate         = "UPDATE"StatementTypeDelete         = "DELETE"
)var (Host stringUser stringPass stringPort intDb   stringDSN stringReadOnly         boolWithExplainCheck boolDB *sqlx.DB
)type ExplainResult struct {Id           *string `db:"id"`SelectType   *string `db:"select_type"`Table        *string `db:"table"`Partitions   *string `db:"partitions"`Type         *string `db:"type"`PossibleKeys *string `db:"possible_keys"`Key          *string `db:"key"`KeyLen       *string `db:"key_len"`Ref          *string `db:"ref"`Rows         *string `db:"rows"`Filtered     *string `db:"filtered"`Extra        *string `db:"Extra"`
}type ShowCreateTableResult struct {Table       string `db:"Table"`CreateTable string `db:"Create Table"`
}func main() {flag.StringVar(&Host, "host", "localhost", "MySQL hostname")flag.StringVar(&User, "user", "root", "MySQL username")flag.StringVar(&Pass, "pass", "", "MySQL password")flag.IntVar(&Port, "port", 3306, "MySQL port")flag.StringVar(&Db, "db", "", "MySQL database")flag.StringVar(&DSN, "dsn", "", "MySQL DSN")flag.BoolVar(&ReadOnly, "read-only", false, "Enable read-only mode")flag.BoolVar(&WithExplainCheck, "with-explain-check", false, "Check query plan with `EXPLAIN` before executing")flag.Parse()if len(DSN) == 0 {DSN = fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?parseTime=true&loc=Local", User, Pass, Host, Port, Db)}s := server.NewMCPServer("go-mcp-mysql","1.0",)// 工具listDatabaseTool := mcp.NewTool("list_database",mcp.WithDescription("展示所有的数据库"),)listTableTool := mcp.NewTool("list_table",mcp.WithDescription("展示所有的表"),)createTableTool := mcp.NewTool("create_table",mcp.WithDescription("创建一个新表"),mcp.WithString("query",mcp.Required(),mcp.Description("查询刚才创建的表"),),)alterTableTool := mcp.NewTool("alter_table",mcp.WithDescription("修改表结构"),mcp.WithString("query",mcp.Required(),mcp.Description("查询刚才的修改"),),)descTableTool := mcp.NewTool("desc_table",mcp.WithDescription("查看建表语句"),mcp.WithString("name",mcp.Required(),mcp.Description("The name of the table to describe"),),)// Data ToolsreadQueryTool := mcp.NewTool("read_query",mcp.WithDescription("查询"),mcp.WithString("query",mcp.Required(),mcp.Description("The SQL query to execute"),),)writeQueryTool := mcp.NewTool("write_query",mcp.WithDescription("新增"),mcp.WithString("query",mcp.Required(),mcp.Description("The SQL query to execute"),),)updateQueryTool := mcp.NewTool("update_query",mcp.WithDescription("修改"),mcp.WithString("query",mcp.Required(),mcp.Description("The SQL query to execute"),),)deleteQueryTool := mcp.NewTool("delete_query",mcp.WithDescription("删除"),mcp.WithString("query",mcp.Required(),mcp.Description("The SQL query to execute"),),)s.AddTool(listDatabaseTool, func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {result, err := HandleQuery("SHOW DATABASES", StatementTypeNoExplainCheck)if err != nil {return mcp.NewToolResultError(err.Error()), nil}return mcp.NewToolResultText(result), nil})s.AddTool(listTableTool, func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {result, err := HandleQuery("SHOW TABLES", StatementTypeNoExplainCheck)if err != nil {return mcp.NewToolResultError(err.Error()), nil}return mcp.NewToolResultText(result), nil})if !ReadOnly {s.AddTool(createTableTool, func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {args := request.Params.Arguments.(map[string]interface{})result, err := HandleExec(args["query"].(string), StatementTypeNoExplainCheck)if err != nil {return mcp.NewToolResultError(err.Error()), nil}return mcp.NewToolResultText(result), nil})}if !ReadOnly {s.AddTool(alterTableTool, func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {args := request.Params.Arguments.(map[string]interface{})result, err := HandleExec(args["query"].(string), StatementTypeNoExplainCheck)if err != nil {return mcp.NewToolResultError(err.Error()), nil}return mcp.NewToolResultText(result), nil})}s.AddTool(descTableTool, func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {args := request.Params.Arguments.(map[string]interface{})result, err := HandleDescTable(args["name"].(string))if err != nil {return mcp.NewToolResultError(err.Error()), nil}return mcp.NewToolResultText(result), nil})s.AddTool(readQueryTool, func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {args := request.Params.Arguments.(map[string]interface{})result, err := HandleQuery(args["query"].(string), StatementTypeSelect)if err != nil {return mcp.NewToolResultError(err.Error()), nil}return mcp.NewToolResultText(result), nil})if !ReadOnly {s.AddTool(writeQueryTool, func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {args := request.Params.Arguments.(map[string]interface{})result, err := HandleExec(args["query"].(string), StatementTypeInsert)if err != nil {return mcp.NewToolResultError(err.Error()), nil}return mcp.NewToolResultText(result), nil})}if !ReadOnly {s.AddTool(updateQueryTool, func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {args := request.Params.Arguments.(map[string]interface{})result, err := HandleExec(args["query"].(string), StatementTypeUpdate)if err != nil {return mcp.NewToolResultError(err.Error()), nil}return mcp.NewToolResultText(result), nil})}if !ReadOnly {s.AddTool(deleteQueryTool, func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {args := request.Params.Arguments.(map[string]interface{})result, err := HandleExec(args["query"].(string), StatementTypeDelete)if err != nil {return mcp.NewToolResultError(err.Error()), nil}return mcp.NewToolResultText(result), nil})}if err := server.ServeStdio(s); err != nil {log.Fatalf("Server error: %v", err)}
}func GetDB() (*sqlx.DB, error) {if DB != nil {return DB, nil}db, err := sqlx.Connect("mysql", DSN)if err != nil {return nil, fmt.Errorf("failed to establish database connection: %v", err)}DB = dbreturn DB, nil
}func HandleQuery(query, expect string) (string, error) {result, headers, err := DoQuery(query, expect)if err != nil {return "", err}s, err := MapToCSV(result, headers)if err != nil {return "", err}return s, nil
}func DoQuery(query, expect string) ([]map[string]interface{}, []string, error) {db, err := GetDB()if err != nil {return nil, nil, err}if len(expect) > 0 {if err := HandleExplain(query, expect); err != nil {return nil, nil, err}}rows, err := db.Queryx(query)if err != nil {return nil, nil, err}cols, err := rows.Columns()if err != nil {return nil, nil, err}result := []map[string]interface{}{}for rows.Next() {row, err := rows.SliceScan()if err != nil {return nil, nil, err}resultRow := map[string]interface{}{}for i, col := range cols {switch v := row[i].(type) {case []byte:resultRow[col] = string(v)default:resultRow[col] = v}}result = append(result, resultRow)}return result, cols, nil
}func HandleExec(query, expect string) (string, error) {db, err := GetDB()if err != nil {return "", err}if len(expect) > 0 {if err := HandleExplain(query, expect); err != nil {return "", err}}result, err := db.Exec(query)if err != nil {return "", err}ra, err := result.RowsAffected()if err != nil {return "", err}switch expect {case StatementTypeInsert:li, err := result.LastInsertId()if err != nil {return "", err}return fmt.Sprintf("%d rows affected, last insert id: %d", ra, li), nildefault:return fmt.Sprintf("%d rows affected", ra), nil}
}func HandleExplain(query, expect string) error {if !WithExplainCheck {return nil}db, err := GetDB()if err != nil {return err}rows, err := db.Queryx(fmt.Sprintf("EXPLAIN %s", query))if err != nil {return err}result := []ExplainResult{}for rows.Next() {var row ExplainResultif err := rows.StructScan(&row); err != nil {return err}result = append(result, row)}if len(result) != 1 {return fmt.Errorf("unable to check query plan, denied")}match := falseswitch expect {case StatementTypeInsert:fallthroughcase StatementTypeUpdate:fallthroughcase StatementTypeDelete:if *result[0].SelectType == expect {match = true}default:// for SELECT type query, the select_type will be multiple values// here we check if it's not INSERT, UPDATE or DELETEmatch = truefor _, typ := range []string{StatementTypeInsert, StatementTypeUpdate, StatementTypeDelete} {if *result[0].SelectType == typ {match = falsebreak}}}if !match {return fmt.Errorf("query plan does not match expected pattern, denied")}return nil
}func HandleDescTable(name string) (string, error) {db, err := GetDB()if err != nil {return "", err}rows, err := db.Queryx(fmt.Sprintf("SHOW CREATE TABLE %s", name))if err != nil {return "", err}result := []ShowCreateTableResult{}for rows.Next() {var row ShowCreateTableResultif err := rows.StructScan(&row); err != nil {return "", err}result = append(result, row)}if len(result) == 0 {return "", fmt.Errorf("table %s does not exist", name)}return result[0].CreateTable, nil
}func MapToCSV(m []map[string]interface{}, headers []string) (string, error) {var csvBuf strings.Builderwriter := csv.NewWriter(&csvBuf)if err := writer.Write(headers); err != nil {return "", fmt.Errorf("failed to write headers: %v", err)}for _, item := range m {row := make([]string, len(headers))for i, header := range headers {value, exists := item[header]if !exists {return "", fmt.Errorf("key '%s' not found in map", header)}row[i] = fmt.Sprintf("%v", value)}if err := writer.Write(row); err != nil {return "", fmt.Errorf("failed to write row: %v", err)}}writer.Flush()if err := writer.Error(); err != nil {return "", fmt.Errorf("error flushing CSV writer: %v", err)}return csvBuf.String(), nil
}

执行go mod tidy 安装需要的包

编译 go build

然后添加到自定义mcp里面

{"mcpServers": {"mysql": {"command": "xxx.exe","args": ["--dsn", "username:password@tcp(localhost:3306)/mydb?parseTime=true&loc=Local"]}}
}

使用

image
image

Schema Tools
list_databaseList all databases in the MySQL server.
Parameters: None
Returns: A list of matching database names.
list_tableList all tables in the MySQL server.
Parameters:
name: If provided, list tables with the specified name, same as SQL SHOW TABLES LIKE '%name%'. Otherwise, list all tables.
Returns: A list of matching table names.
create_tableCreate a new table in the MySQL server.
Parameters:
query: The SQL query to create the table.
Returns: x rows affected.
alter_tableAlter an existing table in the MySQL server. The LLM is informed not to drop an existing table or column.
Parameters:
query: The SQL query to alter the table.
Returns: x rows affected.
desc_tableDescribe the structure of a table.
Parameters:
name: The name of the table to describe.
Returns: The structure of the table.
Data Tools
read_queryExecute a read-only SQL query.
Parameters:
query: The SQL query to execute.
Returns: The result of the query.
write_queryExecute a write SQL query.
Parameters:
query: The SQL query to execute.
Returns: x rows affected, last insert id: <last_insert_id>.
update_queryExecute an update SQL query.
Parameters:
query: The SQL query to execute.
Returns: x rows affected.
delete_queryExecute a delete SQL query.
Parameters:
query: The SQL query to execute.
Returns: x rows affected.
http://www.sczhlp.com/news/42610/

相关文章:

  • CF Round 1045 (Div. 2) 比赛总结
  • 好的网站模板优化大师免费版下载
  • 女生做网站主题有哪些sem专业培训公司
  • 小企业怎么做网站营销策略模板
  • 凌晨网站建设公司今日头条热搜
  • 甘露园网站建设seo服务公司怎么收费
  • 重庆做app开发的公司关键词诊断优化全部关键词
  • 2025.8.27模拟赛
  • Adobe Photoshop 2025 详细安装教程(附带安装包)PS2025
  • 测试文章标题
  • 免费推广网站大全黄色站长推荐黄色
  • 美食网站html代码微信群二维码推广平台
  • 网站设计原型工具网络推广价格
  • 苏州有哪些做网站公司好香港服务器
  • 做app网站seo教程技术资源
  • 武汉网站制作在线友情链接发布网
  • 做网站可以把文字做成图片吗上海网站建设seo
  • 南昌网站排名优化软件百度公司图片
  • 做买家秀的网站电子商务营销策略有哪些
  • html如何做购物网站网络推广图片大全
  • 青岛做网站哪里好广告联盟平台
  • 厦门旅游网站设计网络推广工作好吗
  • 网站设计的价格百度云盘搜索引擎入口
  • 吕梁网站建设网络公司排行榜
  • 做网站设计是什么专业廊坊seo
  • 阿坝州住房和城乡建设厅网站seo产品优化推广
  • 怎么自己创立网站网站降权查询工具
  • 惠州做网络推广的公司个人网站如何优化关键词
  • 烟台网站建设工资爱站网长尾关键词挖掘工具福利片
  • 服务网络是什么意思seo主要优化哪些