video-upload-patterns

Video Upload Patterns

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "video-upload-patterns" with this command: npx skills add mindmorass/reflex/mindmorass-reflex-video-upload-patterns

Video Upload Patterns

Best practices for uploading videos to major platforms.

YouTube Upload

Basic Upload

from googleapiclient.discovery import build from googleapiclient.http import MediaFileUpload from google.oauth2.credentials import Credentials

def upload_video_youtube( credentials: Credentials, file_path: str, title: str, description: str, tags: list[str], category_id: str = "22", # People & Blogs privacy: str = "private" ) -> dict: """Upload video to YouTube.""" youtube = build('youtube', 'v3', credentials=credentials)

body = {
    "snippet": {
        "title": title,
        "description": description,
        "tags": tags,
        "categoryId": category_id
    },
    "status": {
        "privacyStatus": privacy,
        "selfDeclaredMadeForKids": False
    }
}

media = MediaFileUpload(
    file_path,
    mimetype='video/*',
    resumable=True,
    chunksize=1024*1024  # 1MB chunks
)

request = youtube.videos().insert(
    part="snippet,status",
    body=body,
    media_body=media
)

response = None
while response is None:
    status, response = request.next_chunk()
    if status:
        print(f"Uploading: {int(status.progress() * 100)}%")

return response

def set_thumbnail( credentials: Credentials, video_id: str, thumbnail_path: str ): """Set custom thumbnail for video.""" youtube = build('youtube', 'v3', credentials=credentials)

media = MediaFileUpload(thumbnail_path, mimetype='image/jpeg')

return youtube.thumbnails().set(
    videoId=video_id,
    media_body=media
).execute()

def schedule_video( credentials: Credentials, video_id: str, publish_at: str # ISO 8601 format ): """Schedule video for future publication.""" youtube = build('youtube', 'v3', credentials=credentials)

return youtube.videos().update(
    part="status",
    body={
        "id": video_id,
        "status": {
            "privacyStatus": "private",
            "publishAt": publish_at
        }
    }
).execute()

Resumable Upload with Progress

import os import time from googleapiclient.errors import HttpError

class YouTubeUploader: MAX_RETRIES = 10 RETRIABLE_STATUS_CODES = [500, 502, 503, 504]

def __init__(self, credentials):
    self.youtube = build('youtube', 'v3', credentials=credentials)

def upload_with_retry(
    self,
    file_path: str,
    metadata: dict,
    progress_callback=None
) -> dict:
    """Upload with automatic retry on failure."""
    file_size = os.path.getsize(file_path)

    media = MediaFileUpload(
        file_path,
        mimetype='video/*',
        resumable=True,
        chunksize=5 * 1024 * 1024  # 5MB chunks
    )

    request = self.youtube.videos().insert(
        part="snippet,status",
        body=metadata,
        media_body=media
    )

    response = None
    retry = 0

    while response is None:
        try:
            status, response = request.next_chunk()
            if status:
                progress = status.progress()
                bytes_uploaded = int(file_size * progress)
                if progress_callback:
                    progress_callback(progress, bytes_uploaded, file_size)

        except HttpError as e:
            if e.resp.status in self.RETRIABLE_STATUS_CODES:
                retry += 1
                if retry > self.MAX_RETRIES:
                    raise
                sleep_seconds = 2 ** retry
                print(f"Retry {retry}, sleeping {sleep_seconds}s")
                time.sleep(sleep_seconds)
            else:
                raise

    return response

TikTok Upload

Content Posting API

import requests from dataclasses import dataclass from typing import Optional

@dataclass class TikTokVideo: video_url: str # URL to video file title: str privacy_level: str = "SELF_ONLY" # SELF_ONLY, MUTUAL_FOLLOW_FRIENDS, FOLLOWER_OF_CREATOR, PUBLIC_TO_EVERYONE disable_duet: bool = False disable_comment: bool = False disable_stitch: bool = False

class TikTokUploader: def init(self, access_token: str): self.access_token = access_token self.base_url = "https://open.tiktokapis.com/v2" self.headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" }

def init_video_upload(self, video: TikTokVideo) -> dict:
    """Initialize video upload and get upload URL."""
    response = requests.post(
        f"{self.base_url}/post/publish/video/init/",
        headers=self.headers,
        json={
            "post_info": {
                "title": video.title,
                "privacy_level": video.privacy_level,
                "disable_duet": video.disable_duet,
                "disable_comment": video.disable_comment,
                "disable_stitch": video.disable_stitch
            },
            "source_info": {
                "source": "PULL_FROM_URL",
                "video_url": video.video_url
            }
        }
    )
    return response.json()

def check_publish_status(self, publish_id: str) -> dict:
    """Check status of video publishing."""
    response = requests.post(
        f"{self.base_url}/post/publish/status/fetch/",
        headers=self.headers,
        json={"publish_id": publish_id}
    )
    return response.json()

def upload_from_file(self, file_path: str, video: TikTokVideo) -> dict:
    """Upload video from local file (requires file hosting)."""
    # TikTok requires video URL, so you need to host the file first
    # This is a placeholder for the workflow
    raise NotImplementedError(
        "TikTok requires video URL. Host file and use init_video_upload()"
    )

Direct Upload (Chunk-based)

class TikTokDirectUploader: """Direct upload using chunk-based approach."""

def __init__(self, access_token: str):
    self.access_token = access_token
    self.base_url = "https://open.tiktokapis.com/v2"

def init_upload(
    self,
    file_size: int,
    chunk_size: int = 10 * 1024 * 1024  # 10MB
) -> dict:
    """Initialize direct upload."""
    total_chunks = (file_size + chunk_size - 1) // chunk_size

    response = requests.post(
        f"{self.base_url}/post/publish/inbox/video/init/",
        headers={
            "Authorization": f"Bearer {self.access_token}",
            "Content-Type": "application/json"
        },
        json={
            "source_info": {
                "source": "FILE_UPLOAD",
                "video_size": file_size,
                "chunk_size": chunk_size,
                "total_chunk_count": total_chunks
            }
        }
    )
    return response.json()

def upload_chunk(
    self,
    upload_url: str,
    chunk_data: bytes,
    chunk_index: int,
    total_chunks: int
):
    """Upload a single chunk."""
    response = requests.put(
        upload_url,
        headers={
            "Content-Type": "video/mp4",
            "Content-Range": f"bytes {chunk_index * len(chunk_data)}-{(chunk_index + 1) * len(chunk_data) - 1}/{total_chunks * len(chunk_data)}"
        },
        data=chunk_data
    )
    return response

def upload_file(self, file_path: str) -> dict:
    """Upload complete file in chunks."""
    import os

    file_size = os.path.getsize(file_path)
    chunk_size = 10 * 1024 * 1024  # 10MB

    # Initialize
    init_response = self.init_upload(file_size, chunk_size)
    upload_url = init_response['data']['upload_url']
    publish_id = init_response['data']['publish_id']

    # Upload chunks
    with open(file_path, 'rb') as f:
        chunk_index = 0
        total_chunks = (file_size + chunk_size - 1) // chunk_size

        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break

            self.upload_chunk(upload_url, chunk, chunk_index, total_chunks)
            chunk_index += 1
            print(f"Uploaded chunk {chunk_index}/{total_chunks}")

    return {"publish_id": publish_id}

Vimeo Upload

import vimeo

class VimeoUploader: def init(self, token: str, key: str, secret: str): self.client = vimeo.VimeoClient( token=token, key=key, secret=secret )

def upload_video(
    self,
    file_path: str,
    name: str,
    description: str = "",
    privacy: str = "nobody"  # anybody, nobody, password, disable
) -> dict:
    """Upload video to Vimeo."""
    uri = self.client.upload(file_path, data={
        'name': name,
        'description': description,
        'privacy': {
            'view': privacy
        }
    })

    # Get video details
    response = self.client.get(uri)
    return response.json()

def upload_with_progress(
    self,
    file_path: str,
    name: str,
    progress_callback=None
) -> dict:
    """Upload with progress tracking."""
    def progress_handler(uploaded, total):
        if progress_callback:
            progress_callback(uploaded / total, uploaded, total)

    uri = self.client.upload(
        file_path,
        data={'name': name},
        progress=progress_handler
    )

    return self.client.get(uri).json()

def replace_video(self, video_id: str, file_path: str) -> dict:
    """Replace existing video file."""
    uri = f"/videos/{video_id}"
    return self.client.replace(uri, file_path)

def set_thumbnail(self, video_id: str, time_code: float) -> dict:
    """Set thumbnail from video frame."""
    response = self.client.post(
        f"/videos/{video_id}/pictures",
        data={'time': time_code, 'active': True}
    )
    return response.json()

def add_to_folder(self, video_id: str, folder_id: str):
    """Add video to folder/project."""
    self.client.put(f"/me/projects/{folder_id}/videos/{video_id}")

Bulk Upload Manager

import asyncio from dataclasses import dataclass from enum import Enum from typing import Callable, Optional from pathlib import Path

class UploadPlatform(Enum): YOUTUBE = "youtube" TIKTOK = "tiktok" VIMEO = "vimeo"

class UploadStatus(Enum): PENDING = "pending" UPLOADING = "uploading" PROCESSING = "processing" COMPLETE = "complete" FAILED = "failed"

@dataclass class VideoUploadJob: file_path: Path title: str description: str tags: list[str] platforms: list[UploadPlatform] thumbnail_path: Optional[Path] = None scheduled_time: Optional[str] = None status: UploadStatus = UploadStatus.PENDING results: dict = None

def __post_init__(self):
    self.results = {}

class BulkUploadManager: def init( self, youtube_creds=None, tiktok_token=None, vimeo_client=None ): self.youtube_creds = youtube_creds self.tiktok_token = tiktok_token self.vimeo_client = vimeo_client self.jobs: list[VideoUploadJob] = []

def add_job(self, job: VideoUploadJob):
    """Add upload job to queue."""
    self.jobs.append(job)

async def process_job(self, job: VideoUploadJob):
    """Process single upload job."""
    job.status = UploadStatus.UPLOADING

    for platform in job.platforms:
        try:
            if platform == UploadPlatform.YOUTUBE and self.youtube_creds:
                result = upload_video_youtube(
                    self.youtube_creds,
                    str(job.file_path),
                    job.title,
                    job.description,
                    job.tags
                )
                job.results['youtube'] = result

            elif platform == UploadPlatform.VIMEO and self.vimeo_client:
                result = self.vimeo_client.upload_video(
                    str(job.file_path),
                    job.title,
                    job.description
                )
                job.results['vimeo'] = result

        except Exception as e:
            job.results[platform.value] = {"error": str(e)}

    job.status = UploadStatus.COMPLETE

async def process_all(self, concurrent_limit: int = 3):
    """Process all jobs with concurrency limit."""
    semaphore = asyncio.Semaphore(concurrent_limit)

    async def process_with_limit(job):
        async with semaphore:
            await self.process_job(job)

    await asyncio.gather(
        *[process_with_limit(job) for job in self.jobs]
    )

def get_status_report(self) -> dict:
    """Get status of all jobs."""
    return {
        "total": len(self.jobs),
        "pending": sum(1 for j in self.jobs if j.status == UploadStatus.PENDING),
        "uploading": sum(1 for j in self.jobs if j.status == UploadStatus.UPLOADING),
        "complete": sum(1 for j in self.jobs if j.status == UploadStatus.COMPLETE),
        "failed": sum(1 for j in self.jobs if j.status == UploadStatus.FAILED),
        "jobs": [
            {
                "file": str(j.file_path),
                "status": j.status.value,
                "results": j.results
            }
            for j in self.jobs
        ]
    }

Metadata Templates

from dataclasses import dataclass from typing import Optional from string import Template

@dataclass class VideoMetadataTemplate: title_template: str description_template: str tags: list[str] category: str

def render(self, **kwargs) -> dict:
    """Render template with variables."""
    return {
        "title": Template(self.title_template).safe_substitute(**kwargs),
        "description": Template(self.description_template).safe_substitute(**kwargs),
        "tags": self.tags,
        "category": self.category
    }

Example templates

GAMING_TEMPLATE = VideoMetadataTemplate( title_template="$game_name - $episode_title | Episode $episode_num", description_template=""" $episode_title

Welcome back to $game_name! In this episode, we $episode_summary.

Timestamps: $timestamps

Subscribe for more $game_name content!

#$game_tag #gaming #letsplay """.strip(), tags=["gaming", "letsplay", "gameplay"], category="20" # Gaming )

TUTORIAL_TEMPLATE = VideoMetadataTemplate( title_template="$topic Tutorial - $subtitle | $series_name", description_template=""" Learn $topic in this comprehensive tutorial!

In this video: $outline

Resources: $resources

Don't forget to like and subscribe! """.strip(), tags=["tutorial", "howto", "learn"], category="27" # Education )

Upload Validation

import subprocess import os from dataclasses import dataclass

@dataclass class VideoValidation: is_valid: bool duration: float resolution: str codec: str file_size: int errors: list[str]

def validate_video(file_path: str, platform: str = "youtube") -> VideoValidation: """Validate video meets platform requirements.""" errors = []

# Get video info
probe = subprocess.run([
    "ffprobe", "-v", "quiet",
    "-print_format", "json",
    "-show_format", "-show_streams",
    file_path
], capture_output=True, text=True)

import json
info = json.loads(probe.stdout)
video_stream = next(s for s in info['streams'] if s['codec_type'] == 'video')

duration = float(info['format']['duration'])
file_size = int(info['format']['size'])
resolution = f"{video_stream['width']}x{video_stream['height']}"
codec = video_stream['codec_name']

# Platform-specific validation
if platform == "youtube":
    if duration > 12 * 3600:  # 12 hours
        errors.append("Video exceeds 12 hour limit")
    if file_size > 256 * 1024 * 1024 * 1024:  # 256GB
        errors.append("File exceeds 256GB limit")

elif platform == "tiktok":
    if duration > 10 * 60:  # 10 minutes
        errors.append("Video exceeds 10 minute limit")
    if file_size > 4 * 1024 * 1024 * 1024:  # 4GB
        errors.append("File exceeds 4GB limit")

elif platform == "vimeo":
    if duration > 12 * 3600:
        errors.append("Video exceeds 12 hour limit")

return VideoValidation(
    is_valid=len(errors) == 0,
    duration=duration,
    resolution=resolution,
    codec=codec,
    file_size=file_size,
    errors=errors
)

References

  • YouTube Data API

  • TikTok Content Posting API

  • Vimeo API

  • FFprobe Documentation

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

ffmpeg-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
General

site-crawler

No summary provided by upstream source.

Repository SourceNeeds Review
General

ai-video-generation

No summary provided by upstream source.

Repository SourceNeeds Review
General

n8n-patterns

No summary provided by upstream source.

Repository SourceNeeds Review