grpc-expert

Expert guidance for gRPC services, Protocol Buffers, microservices communication, and streaming patterns.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "grpc-expert" with this command: npx skills add personamanagmentlayer/pcl/personamanagmentlayer-pcl-grpc-expert

gRPC Expert

Expert guidance for gRPC services, Protocol Buffers, microservices communication, and streaming patterns.

Core Concepts

gRPC Fundamentals

  • Protocol Buffers (protobuf)

  • Service definitions

  • RPC patterns (unary, server streaming, client streaming, bidirectional)

  • HTTP/2 transport

  • Code generation

  • Interceptors and middleware

Communication Patterns

  • Unary RPC (request-response)

  • Server streaming RPC

  • Client streaming RPC

  • Bidirectional streaming RPC

  • Deadline/timeout handling

  • Error handling and status codes

Production Features

  • Load balancing

  • Service discovery

  • Health checking

  • Authentication (TLS, tokens)

  • Monitoring and tracing

  • Retry policies

Protocol Buffer Definition

syntax = "proto3";

package user.v1;

import "google/protobuf/timestamp.proto";

service UserService { // Unary RPC rpc GetUser(GetUserRequest) returns (GetUserResponse);

// Server streaming rpc ListUsers(ListUsersRequest) returns (stream User);

// Client streaming rpc CreateUsers(stream CreateUserRequest) returns (CreateUsersResponse);

// Bidirectional streaming rpc Chat(stream ChatMessage) returns (stream ChatMessage); }

message User { string id = 1; string email = 2; string name = 3; google.protobuf.Timestamp created_at = 4; UserRole role = 5; }

enum UserRole { USER_ROLE_UNSPECIFIED = 0; USER_ROLE_USER = 1; USER_ROLE_ADMIN = 2; }

message GetUserRequest { string id = 1; }

message GetUserResponse { User user = 1; }

message ListUsersRequest { int32 page_size = 1; string page_token = 2; }

message CreateUserRequest { string email = 1; string name = 2; }

message CreateUsersResponse { repeated string user_ids = 1; int32 created_count = 2; }

message ChatMessage { string user_id = 1; string message = 2; google.protobuf.Timestamp timestamp = 3; }

Python gRPC Server

import grpc from concurrent import futures import logging from typing import Iterator

import user_pb2 import user_pb2_grpc

class UserService(user_pb2_grpc.UserServiceServicer): def init(self): self.users = {}

def GetUser(self, request, context):
    """Unary RPC"""
    user_id = request.id

    if user_id not in self.users:
        context.abort(grpc.StatusCode.NOT_FOUND, f"User {user_id} not found")

    user = self.users[user_id]
    return user_pb2.GetUserResponse(user=user)

def ListUsers(self, request, context):
    """Server streaming RPC"""
    page_size = request.page_size or 10

    for i, user in enumerate(self.users.values()):
        if i >= page_size:
            break
        yield user

def CreateUsers(self, request_iterator, context):
    """Client streaming RPC"""
    created_ids = []

    for request in request_iterator:
        user_id = self._generate_id()
        user = user_pb2.User(
            id=user_id,
            email=request.email,
            name=request.name
        )
        self.users[user_id] = user
        created_ids.append(user_id)

    return user_pb2.CreateUsersResponse(
        user_ids=created_ids,
        created_count=len(created_ids)
    )

def Chat(self, request_iterator, context):
    """Bidirectional streaming RPC"""
    for message in request_iterator:
        # Echo back with modification
        response = user_pb2.ChatMessage(
            user_id="server",
            message=f"Echo: {message.message}",
            timestamp=message.timestamp
        )
        yield response

def serve(): server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) user_pb2_grpc.add_UserServiceServicer_to_server(UserService(), server)

server.add_insecure_port('[::]:50051')
server.start()
print("Server started on port 50051")
server.wait_for_termination()

if name == 'main': logging.basicConfig() serve()

Go gRPC Server

package main

import ( "context" "io" "log" "net" "time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
pb "example.com/user/v1"

)

type userServer struct { pb.UnimplementedUserServiceServer users map[string]*pb.User }

func (s *userServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) { user, exists := s.users[req.Id] if !exists { return nil, status.Errorf(codes.NotFound, "user %s not found", req.Id) }

return &pb.GetUserResponse{User: user}, nil

}

func (s *userServer) ListUsers(req *pb.ListUsersRequest, stream pb.UserService_ListUsersServer) error { pageSize := req.PageSize if pageSize == 0 { pageSize = 10 }

count := 0
for _, user := range s.users {
    if count >= int(pageSize) {
        break
    }

    if err := stream.Send(user); err != nil {
        return err
    }
    count++
}

return nil

}

func (s *userServer) CreateUsers(stream pb.UserService_CreateUsersServer) error { var userIds []string

for {
    req, err := stream.Recv()
    if err == io.EOF {
        return stream.SendAndClose(&pb.CreateUsersResponse{
            UserIds:      userIds,
            CreatedCount: int32(len(userIds)),
        })
    }
    if err != nil {
        return err
    }

    userId := generateID()
    user := &pb.User{
        Id:    userId,
        Email: req.Email,
        Name:  req.Name,
    }

    s.users[userId] = user
    userIds = append(userIds, userId)
}

}

func (s *userServer) Chat(stream pb.UserService_ChatServer) error { for { msg, err := stream.Recv() if err == io.EOF { return nil } if err != nil { return err }

    response := &pb.ChatMessage{
        UserId:    "server",
        Message:   "Echo: " + msg.Message,
        Timestamp: msg.Timestamp,
    }

    if err := stream.Send(response); err != nil {
        return err
    }
}

}

func main() { lis, err := net.Listen("tcp", ":50051") if err != nil { log.Fatalf("failed to listen: %v", err) }

s := grpc.NewServer()
pb.RegisterUserServiceServer(s, &userServer{
    users: make(map[string]*pb.User),
})

log.Println("Server listening on :50051")
if err := s.Serve(lis); err != nil {
    log.Fatalf("failed to serve: %v", err)
}

}

gRPC Client with Interceptors

import grpc import user_pb2 import user_pb2_grpc from typing import Callable

def logging_interceptor( method: Callable, request, call_details: grpc.ClientCallDetails ): """Unary-unary interceptor for logging""" print(f"Calling method: {call_details.method}") print(f"Request: {request}")

response = method(request, call_details)

print(f"Response: {response}")
return response

class UserClient: def init(self, host: str = 'localhost:50051'): # Create channel with interceptor self.channel = grpc.insecure_channel(host) self.channel = grpc.intercept_channel( self.channel, logging_interceptor ) self.stub = user_pb2_grpc.UserServiceStub(self.channel)

def get_user(self, user_id: str) -> user_pb2.User:
    """Call unary RPC"""
    request = user_pb2.GetUserRequest(id=user_id)

    try:
        response = self.stub.GetUser(
            request,
            timeout=5.0  # 5 second timeout
        )
        return response.user
    except grpc.RpcError as e:
        print(f"RPC failed: {e.code()} - {e.details()}")
        raise

def list_users(self, page_size: int = 10):
    """Call server streaming RPC"""
    request = user_pb2.ListUsersRequest(page_size=page_size)

    try:
        for user in self.stub.ListUsers(request):
            yield user
    except grpc.RpcError as e:
        print(f"RPC failed: {e.code()} - {e.details()}")

def create_users_batch(self, users: list):
    """Call client streaming RPC"""
    def request_generator():
        for user in users:
            yield user_pb2.CreateUserRequest(
                email=user['email'],
                name=user['name']
            )

    try:
        response = self.stub.CreateUsers(request_generator())
        return response
    except grpc.RpcError as e:
        print(f"RPC failed: {e.code()} - {e.details()}")

def close(self):
    self.channel.close()

Best Practices

Design

  • Use semantic versioning for protobuf packages

  • Design backward-compatible changes

  • Use proper field numbering (never reuse)

  • Include metadata in messages

  • Use enums with explicit UNSPECIFIED value

  • Design for pagination in list operations

Performance

  • Enable HTTP/2 connection pooling

  • Use streaming for large data transfers

  • Implement proper timeouts

  • Use compression for large payloads

  • Batch operations when possible

  • Monitor and tune thread pool sizes

Production

  • Implement health checks

  • Use TLS for secure communication

  • Add authentication/authorization

  • Implement retry policies with backoff

  • Monitor gRPC metrics (latency, errors)

  • Use service discovery for dynamic endpoints

Anti-Patterns

❌ Breaking backward compatibility ❌ No timeout configuration ❌ Ignoring error status codes ❌ Not handling stream cancellation ❌ Over-sized messages ❌ No health checking ❌ Missing authentication

Resources

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

finance-expert

No summary provided by upstream source.

Repository SourceNeeds Review
General

trading-expert

No summary provided by upstream source.

Repository SourceNeeds Review
General

dart-expert

No summary provided by upstream source.

Repository SourceNeeds Review
General

postgresql-expert

No summary provided by upstream source.

Repository SourceNeeds Review