Go Code Snippets

使用 grpc stream 上传文件

// The stream service definition.
service StreamService {
  // Uploads a file
  rpc UploadFile (stream FileChunk) returns (UploadStatus) {}
}


// The request message containing a file chunk.
message FileChunk {
  bytes content = 1;
}

// The response message containing the upload status.
message UploadStatus {
  bool success = 1;
}

这个方法的目的是接收一个文件块的流,并返回上传状态。你可以参考¹来了解如何使用gRPC的客户端流式传输来上传文件。

下面是一个可能的实现,你可以根据你的需求进行修改:

func (services *StreamServices)UploadFile(stream proto.StreamService_UploadFileServer) error {
    // 创建一个临时文件
    file, err := ioutil.TempFile("/tmp", "upload-*.png")
    if err != nil {
        return status.Errorf(codes.Internal, "cannot create temp file: %v", err)
    }
    // 关闭并删除文件
    defer func() {
        file.Close()
        os.Remove(file.Name())
    }()

    // 获取第一个请求,包含图片信息
    req, err := stream.Recv()
    if err != nil {
        return status.Errorf(codes.Unknown, "cannot receive image info: %v", err)
    }

    // 获取图片信息
    info := req.GetInfo()
    if info == nil {
        return status.Errorf(codes.InvalidArgument, "image info is missing")
    }

    // 获取图片ID和类型
    imageID := info.GetImageId()
    imageType := info.GetImageType()

    // 验证图片ID和类型
    if len(imageID) == 0 {
        return status.Errorf(codes.InvalidArgument, "image ID is missing")
    }
    if len(imageType) == 0 {
        return status.Errorf(codes.InvalidArgument, "image type is missing")
    }

    // 初始化上传状态
    status := &proto.UploadStatus{
        ImageId: imageID,
        Success: false,
    }

    // 初始化上传大小
    size := 0

    // 循环接收文件块并写入临时文件
    for {
        // 接收下一个请求
        req, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            return status.Errorf(codes.Unknown, "cannot receive chunk data: %v", err)
        }

        // 获取文件块数据
        chunk := req.GetChunkData()
        if len(chunk) == 0 {
            continue
        }

        // 写入临时文件
        n, err := file.Write(chunk)
        if err != nil {
            return status.Errorf(codes.Internal, "cannot write chunk data: %v", err)
        }

        // 更新上传大小
        size += n
    }

    // 重命名临时文件为目标文件名,包含图片ID和类型
    targetPath := fmt.Sprintf("/tmp/%s%s", imageID, imageType)
    err = os.Rename(file.Name(), targetPath)
    if err != nil {
        return status.Errorf(codes.Internal, "cannot save image to file: %v", err)
    }

    // 更新上传状态为成功,并返回给客户端
    status.Success = true
    status.Size = uint32(size)

    return stream.SendAndClose(status)
}
package main

import (
    "context"
    "fmt"
    "net"
    "time"
)

func main() {
    resolver := &net.Resolver{
        PreferGo: true,
    }

    ctx, cancel := context.WithTimeout(context.Background(), /*5*time.Second*/ 9000*time.Millisecond)
    defer cancel()

    ips, err := resolver.LookupIPAddr(ctx, "www.baidu.com")
    if err != nil {
        fmt.Println(err)
        return
    }

    for _, ip := range ips {
        fmt.Println(ip.String())
    }
}