使用 grpc stream 上传文件
// The stream service definition.
service StreamService {
// Uploads a file
rpc UploadFile (stream FileChunk) returns (UploadStatus) {}
}
// The request message containing a file chunk.
message FileChunk {
bytes content = 1;
}
// The response message containing the upload status.
message UploadStatus {
bool success = 1;
}
这个方法的目的是接收一个文件块的流,并返回上传状态。你可以参考¹来了解如何使用gRPC的客户端流式传输来上传文件。
下面是一个可能的实现,你可以根据你的需求进行修改:
func (services *StreamServices)UploadFile(stream proto.StreamService_UploadFileServer) error {
// 创建一个临时文件
file, err := ioutil.TempFile("/tmp", "upload-*.png")
if err != nil {
return status.Errorf(codes.Internal, "cannot create temp file: %v", err)
}
// 关闭并删除文件
defer func() {
file.Close()
os.Remove(file.Name())
}()
// 获取第一个请求,包含图片信息
req, err := stream.Recv()
if err != nil {
return status.Errorf(codes.Unknown, "cannot receive image info: %v", err)
}
// 获取图片信息
info := req.GetInfo()
if info == nil {
return status.Errorf(codes.InvalidArgument, "image info is missing")
}
// 获取图片ID和类型
imageID := info.GetImageId()
imageType := info.GetImageType()
// 验证图片ID和类型
if len(imageID) == 0 {
return status.Errorf(codes.InvalidArgument, "image ID is missing")
}
if len(imageType) == 0 {
return status.Errorf(codes.InvalidArgument, "image type is missing")
}
// 初始化上传状态
status := &proto.UploadStatus{
ImageId: imageID,
Success: false,
}
// 初始化上传大小
size := 0
// 循环接收文件块并写入临时文件
for {
// 接收下一个请求
req, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return status.Errorf(codes.Unknown, "cannot receive chunk data: %v", err)
}
// 获取文件块数据
chunk := req.GetChunkData()
if len(chunk) == 0 {
continue
}
// 写入临时文件
n, err := file.Write(chunk)
if err != nil {
return status.Errorf(codes.Internal, "cannot write chunk data: %v", err)
}
// 更新上传大小
size += n
}
// 重命名临时文件为目标文件名,包含图片ID和类型
targetPath := fmt.Sprintf("/tmp/%s%s", imageID, imageType)
err = os.Rename(file.Name(), targetPath)
if err != nil {
return status.Errorf(codes.Internal, "cannot save image to file: %v", err)
}
// 更新上传状态为成功,并返回给客户端
status.Success = true
status.Size = uint32(size)
return stream.SendAndClose(status)
}
package main
import (
"context"
"fmt"
"net"
"time"
)
func main() {
resolver := &net.Resolver{
PreferGo: true,
}
ctx, cancel := context.WithTimeout(context.Background(), /*5*time.Second*/ 9000*time.Millisecond)
defer cancel()
ips, err := resolver.LookupIPAddr(ctx, "www.baidu.com")
if err != nil {
fmt.Println(err)
return
}
for _, ip := range ips {
fmt.Println(ip.String())
}
}