In Python, I always got stuck when it came to downloading files — they’d fail midway, get corrupted, or never finish. So I built my own solution that finally just works. Here’s how I solved it.
Are you tired of running into these frustrating download issues in python?
A good CLI and library, but not always reliable:
requests
with Streamimport requests
requests.get("https...", stream=True)
So I thought why don't I make my own and try to overcome these limitations. I tried and I fixed a lot of issues and limitations
I'm already using it for my Telegram bots to download:
And guess what?
It has never disappointed me.
Github Gist of the code
Progress Callback
Function Easily track how far you've reached and how much is remaining.
Smart Request Handling
Automatically uses HEAD and GET requests to determine the best approach for downloading.
Resume Support
Opens the file and matches server and local file size
✅ If server file size equals local size: skip download
✅ If local file is partially downloaded: resume from where it stopped
✅ If no file exists: start from scratch
import asyncio
import logging
import mimetypes
import os
import sys
import time
import traceback
from typing import Callable, Optional
import aiohttp
from aiohttp import ClientTimeout
if sys.platform.startswith("win"):
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
MIME_TYPE_MAP = {
"application/zip": ".zip",
"application/x-tar": ".tar",
"application/gzip": ".gz",
"application/pdf": ".pdf",
"application/json": ".json",
"application/xml": ".xml",
"application/msword": ".doc",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx",
"application/vnd.ms-excel": ".xls",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": ".xlsx",
"application/x-7z-compressed": ".7z",
"image/jpeg": ".jpg",
"image/png": ".png",
"image/gif": ".gif",
"image/webp": ".webp",
"video/mp4": ".mp4",
"video/mpeg": ".mpeg",
"audio/mpeg": ".mp3",
"audio/wav": ".wav",
"text/plain": ".txt",
"text/html": ".html",
"text/csv": ".csv",
"application/octet-stream": ".ts",
}
async def download_file(
url: str,
output_file: str,
callback: Optional[Callable[[int, int, str], None]] = None,
) -> str:
retries = 0
timeout: int = 300
max_retries: int = 5
downloaded_size = 0
chunk_size: int = 1 * 1024 * 1024
headersList = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:137.0) Gecko/20100101 Firefox/137.0",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "gzip, deflate, br, zstd",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
"Priority": "u=0, i",
"TE": "trailers",
}
if os.path.exists(output_file):
downloaded_size = os.path.getsize(output_file)
while retries < max_retries:
try:
async with aiohttp.ClientSession(
connector=aiohttp.TCPConnector(ssl=False),
headers=headersList,
# cookies=cookies, # Depends upon needs
) as session:
total_size = 0
accept_ranges = "none"
try:
async with session.head(
url, timeout=ClientTimeout(total=30)
) as head_response:
if head_response.status in [403, 405]:
print(
"HEAD request not allowed (403), proceeding with GET request."
)
else:
if head_response.status != 200:
raise Exception(
f"Failed to access file: HTTP {head_response.status}"
)
total_size = int(
head_response.headers.get("content-length", 0)
)
content_type = head_response.headers.get("Content-Type", "")
if total_size == 0:
raise Exception("Server did not provide content-length")
accept_ranges = head_response.headers.get(
"Accept-Ranges", "none"
).lower()
except Exception as e:
logging.info(f"Failed to access file: {e}")
raise
if accept_ranges != "bytes":
downloaded_size = 0
mode = "ab" if downloaded_size > 0 else "wb"
headers_range = (
{"Range": f"bytes={downloaded_size}-"}
if downloaded_size > 0
else None
)
async with session.get(
url,
headers=headers_range,
timeout=ClientTimeout(total=timeout),
) as response:
if response.status not in (200, 206):
raise Exception(
f"Failed to download file: HTTP {response.status}"
)
if total_size == 0:
total_size = int(response.headers.get("content-length", 0))
content_type = response.headers.get("Content-Type", "")
if total_size == 0:
raise Exception("Server did not provide content-length")
ext = MIME_TYPE_MAP.get(
content_type, mimetypes.guess_extension(content_type) or ".bin"
)
base_name, current_ext = os.path.splitext(output_file)
if not current_ext or current_ext.lower() != ext.lower():
output_file = base_name + ext
with open(output_file, mode) as file:
async for chunk in response.content.iter_chunked(chunk_size):
file.write(chunk)
downloaded_size += len(chunk)
if callback:
await callback(
downloaded_size, total_size, "Downloading"
)
if downloaded_size != total_size:
raise Exception(
f"Download incomplete: {downloaded_size}/{total_size} bytes"
)
return output_file
except (aiohttp.ClientError, asyncio.TimeoutError, Exception) as e:
traceback.format_exc()
retries += 1
print(f"Attempt {retries}/{max_retries} failed: {str(e)}")
if "HTTP 403" in str(e):
if os.path.exists(output_file):
os.remove(output_file)
return False
elif isinstance(e, asyncio.TimeoutError):
print("Timeout occurred, retrying...")
if retries >= max_retries:
print("Max retries reached. Download failed.")
if os.path.exists(output_file):
os.remove(output_file)
return False
await asyncio.sleep(2**retries)
return False
async def progress_callback(downloaded: int, total: int, status: str):
if total > 0:
progress = (downloaded / total) * 100
print(f"Status: {status}, Progress: {progress:.2f}%")
async def main():
url = "https://file-examples.com/storage/fe0707c5116828d4b9ad356/2017/04/file_example_MP4_640_3MG.mp4"
output_file = "file_example_MP4_640_3MG.mp4"
start_time = time.monotonic()
await download_file(url, output_file, callback=progress_callback)
end_time = time.monotonic()
print("Total download time {} seconds".format(end_time - start_time))
try:
asyncio.run(main())
except KeyboardInterrupt:
print("Download interrupted by user.")
except Exception as e:
print(f"Download failed: {e}")
I hope this will make your downloading work easier!
Give it a try and let me know how it works for you.
And don’t forget to join the newsletter so you won’t miss any upcoming updates! 🎉