Skip to main content

RAG Any Docs with Fused

TL;DR: With a few Fused UDFs you can turn any public docs site into a fully searchable RAG pipeline — connect data, process it, and expose it as an API.

This example shows how to build a RAG pipeline over Overture Maps documentation using Fused — from ingestion to a queryable API endpoint. The same pattern works for any public docs site.

RAG pipeline Canvas overview

Try it out on Fused Canvas

How it works

Overview

The pipeline follows a crawl → embed → search pattern, implemented as four UDFs:

  • site_to_llmstxtcrawls any docs site via its sitemap and writes a clean llms.txt to S3
  • docs_raggingembeds the llms.txt in chunks and stores vectors in a LanceDB table at /mount/lancedb/
  • overture_docs_searchingsearches by embedding an incoming question and returning the most relevant chunks
  • ai_utils — shared helpers for embedding, LanceDB reads/writes, and agentic tool calling

Step 1: Crawl Any Docs Site into llms.txt

The first challenge is getting clean text out of a docs site. HTML is noisy: navbars, footers, sidebars, and scripts all get in the way. The site_to_llmstxt UDF accepts any docs site URL, crawls every page in parallel via crawl_page.map(), strips boilerplate, and writes a single clean llms.txt to S3.

Show site_to_llmstxt UDF code
@fused.udf(cache_max_age='1d')  # Caching for 1d to prevent accidental reruns
def udf(
base_url: str = "https://docs.overturemaps.org/",
site_name: str = "Overture_maps_documentation",
mode: str = "full", # "curated" | "full"
sitemap_path: str = "/sitemap.xml",
max_pages: int = 0, # 0 = no limit, useful for testing
output_dir: str = "s3://fused-asset/demos/llms_txt/",
):
"""Generate an llms.txt from any public docs site with a sitemap."""
import re
import time
import requests
import pandas as pd
from xml.etree import ElementTree
from bs4 import BeautifulSoup

def fetch(url: str, retries: int = 3, timeout: int = 15) -> str | None:
headers = {"User-Agent": "llms-txt-generator/1.0 (sitemap crawler)"}
for attempt in range(retries):
try:
r = requests.get(url, headers=headers, timeout=timeout)
r.raise_for_status()
return r.text
except Exception as e:
if attempt == retries - 1:
print(f" ✗ Failed {url}: {e}")
return None
time.sleep(1.5 ** attempt)

def parse_sitemap(xml_text: str) -> list[str]:
"""Parse a sitemap or sitemap index and return all page URLs."""
try:
root = ElementTree.fromstring(xml_text)
except ElementTree.ParseError:
return []
ns = {"sm": "http://www.sitemaps.org/schemas/sitemap/0.9"}
urls = []
for sitemap_tag in root.findall("sm:sitemap", ns):
loc = sitemap_tag.find("sm:loc", ns)
if loc is not None and loc.text:
sub_xml = fetch(loc.text.strip())
if sub_xml:
urls.extend(parse_sitemap(sub_xml))
for url_tag in root.findall("sm:url", ns):
loc = url_tag.find("sm:loc", ns)
if loc is not None and loc.text:
urls.append(loc.text.strip())
return urls

sitemap_url = base_url.rstrip("/") + sitemap_path
print(f"Fetching sitemap: {sitemap_url}")
sitemap_xml = fetch(sitemap_url)
if not sitemap_xml:
raise RuntimeError(f"Could not fetch sitemap at {sitemap_url}")

all_urls = parse_sitemap(sitemap_xml)
print(f"Found {len(all_urls)} URLs in sitemap")

skip_patterns = re.compile(
r"/(tags?|search|404|sitemap|assets|_|static)/|"
r"\.(xml|json|txt|png|jpg|svg|css|js)$",
re.IGNORECASE,
)
page_urls = [u for u in all_urls if not skip_patterns.search(u)]
page_urls = sorted(set(page_urls))

if max_pages > 0:
page_urls = page_urls[:max_pages]

print(f"Processing {len(page_urls)} doc pages (mode={mode})")

print(f"Crawling {len(page_urls)} pages in parallel...")
results = crawl_page.map(
[{"url": u} for u in page_urls],
engine="local",
max_workers=10,
).df()

pages = results.to_dict("records")
print(f"Successfully extracted {len(pages)} pages")

header = f"# {site_name}\n\n"

if mode == "curated":
lines = [header, "## Pages\n"]
for p in pages:
lines.append(f"- [{p['title']}]({p['url']})")
lines.append(f"\n---\n\nGenerated from {base_url} sitemap. Total pages: {len(pages)}")
output = "\n".join(lines)
else:
sep = "=" * 80
sections = [header, sep + "\n"]
for p in pages:
sections.append(f"## {p['title']}")
sections.append(f"URL: {p['url']}\n")
if p["content"]:
sections.append(p["content"])
sections.append("\n" + sep + "\n")
sections.append(f"\n---\n\nGenerated from {base_url} sitemap. Total pages: {len(pages)}")
output = "\n".join(sections)

if output_dir:
import fsspec
safe_name = re.sub(r"[^\w]+", "_", site_name.lower()).strip("_")
output_path = output_dir.rstrip("/") + f"/{safe_name}/llms.txt"
with fsspec.open(output_path, "w") as f:
f.write(output)
print(f"Written to {output_path}")
return output_path
else:
return output


@fused.udf
def crawl_page(url: str = "https://docs.overturemaps.org/"):
"""Fetch and extract content from a single docs page. Used by udf.map()."""
import re
import time
import requests
from bs4 import BeautifulSoup

headers = {"User-Agent": "llms-txt-generator/1.0 (sitemap crawler)"}
for attempt in range(3):
try:
r = requests.get(url, headers=headers, timeout=15)
r.raise_for_status()
html = r.text
break
except Exception:
if attempt == 2:
return None
time.sleep(1.5 ** attempt)
else:
return None

soup = BeautifulSoup(html, "html.parser")
for tag in soup.select(
"nav, footer, aside, .sidebar, .toc, .table-of-contents, "
".navbar, .pagination, .edit-this-page, script, style, "
"[class*='sidebar'], [class*='navbar'], [class*='footer'], "
"[class*='toc'], [class*='pagination'], [class*='breadcrumb'], "
"[aria-hidden='true']"
):
tag.decompose()

title = ""
h1 = soup.find("h1")
if h1:
title = h1.get_text(strip=True)
if not title:
title_tag = soup.find("title")
if title_tag:
title = re.sub(r"\s*[|–—-].*$", "", title_tag.get_text(strip=True))

content_el = (
soup.find("article")
or soup.find(class_=re.compile(r"md-content|markdown|content|documentation"))
or soup.find("main")
or soup.body
)

if content_el is None:
import pandas as pd
return pd.DataFrame([{"url": url, "title": title, "description": "", "content": ""}])

def extract_content(el) -> str:
from bs4 import NavigableString, Tag

BLOCK_TAGS = {"p", "div", "section", "li", "dt", "dd", "blockquote",
"h1", "h2", "h3", "h4", "h5", "h6", "tr", "thead", "tbody"}

def walk(node) -> str:
if isinstance(node, NavigableString):
return str(node)
if not isinstance(node, Tag):
return ""
tag = node.name.lower() if node.name else ""
if tag == "pre":
inner_code = node.find("code")
lang = ""
if inner_code and inner_code.get("class"):
for cls in inner_code.get("class", []):
m = re.match(r"language-(\w+)", cls)
if m:
lang = m.group(1)
break
code_text = (inner_code or node).get_text()
return f"\n\n```{lang}\n{code_text.strip()}\n```\n\n"
if tag == "code":
text = node.get_text()
if "\n" in text:
return f"\n\n```\n{text.strip()}\n```\n\n"
return f"`{text}`"
if tag in {"h1", "h2", "h3", "h4", "h5", "h6"}:
level = int(tag[1])
return f"\n\n{'#' * level} {node.get_text(strip=True)}\n\n"
parts = [walk(child) for child in node.children]
text = "".join(parts)
if tag in BLOCK_TAGS:
text = text.strip()
return f"\n\n{text}\n\n" if text else ""
return text

raw = walk(el)
return re.sub(r"\n{3,}", "\n\n", raw).strip()

content = extract_content(content_el)

description = ""
for line in content.splitlines():
line = line.strip()
if len(line) >= 40 and not line.startswith("#") and not line.startswith("`"):
description = line[:160] + ("..." if len(line) > 160 else "")
break

import pandas as pd
return pd.DataFrame([{"url": url, "title": title, "description": description, "content": content}])

Step 2: Chunk, Embed, and Store in LanceDB

The docs_ragging UDF fetches the llms.txt from S3, splits it into overlapping chunks, embeds each chunk using the Qwen3-Embedding-8B model via OpenRouter, and writes the result to a LanceDB table on the shared volume at /mount/lancedb/.

Show docs_ragging UDF code
# ===================== CONFIGURATION =====================
LANCEDB_TABLE_NAME = "overture_docs_17_04_2026"
LANCEDB_BASE_PATH = "/mount/lancedb/"
EMBEDDING_MODEL = "qwen/qwen3-embedding-8b"
SOURCE_S3_PATH = "s3://fused-asset/demos/llms_txt/overture_maps_documentation/llms.txt"
CHUNK_SIZE = 1300
OVERLAP_PERCENT = 0.15
OVERLAP_SIZE = int(CHUNK_SIZE * OVERLAP_PERCENT)
EMBEDDING_BATCH_SIZE = 400
# =========================================================


@fused.udf(cache_max_age=0)
def udf(
table_name: str = LANCEDB_TABLE_NAME,
overwrite: bool = True,
):
import pandas as pd
import fsspec

ai = fused.load("ai_utils")

print(f"Fetching docs from: {SOURCE_S3_PATH}")
with fsspec.open(SOURCE_S3_PATH, "r") as f:
content = f.read()
print(f"Fetched {len(content):,} characters")

chunks = chunk_with_overlap(content, CHUNK_SIZE, OVERLAP_SIZE)
print(f"Created {len(chunks)} chunks")

df = pd.DataFrame(
{
"chunk_id": list(range(1, len(chunks) + 1)),
"code": chunks,
"source_url": SOURCE_S3_PATH,
}
)

print("Embedding chunks via ai_utils.embed_df...")
df = ai.embed_df(
df,
text_col="code",
embedding_col="embedding",
provider="qwen",
embedding_model=EMBEDDING_MODEL,
batch_size=EMBEDDING_BATCH_SIZE,
max_workers=16,
)

print(f"Writing {len(df)} rows to LanceDB table: {table_name} at {LANCEDB_BASE_PATH}")
table_path = ai.write_table(
df,
table_name=table_name,
embedding_col="embedding",
base_path=LANCEDB_BASE_PATH,
overwrite=overwrite,
)

visible_tables = ai.list_tables(base_path=LANCEDB_BASE_PATH)
table_exists_now = ai.table_exists(table_name, base_path=LANCEDB_BASE_PATH)

return pd.DataFrame(
{
"status": ["success"],
"table_name": [table_name],
"table_path": [table_path],
"lancedb_base_path": [LANCEDB_BASE_PATH],
"table_exists_after_write": [table_exists_now],
"source_url": [SOURCE_S3_PATH],
"content_chars": [len(content)],
"chunks": [len(chunks)],
"chunk_size": [CHUNK_SIZE],
"overlap_size": [OVERLAP_SIZE],
}
)


def chunk_with_overlap(text, chunk_size, overlap_size):
"""Split text into chunks with fixed overlap."""
chunks = []
start = 0
text_length = len(text)

while start < text_length:
end = start + chunk_size
chunk = text[start:end]
chunks.append(chunk)
start = end - overlap_size
if start + chunk_size >= text_length and start < text_length:
final_chunk = text[start:]
if final_chunk and final_chunk not in chunks and len(final_chunk) > overlap_size:
chunks.append(final_chunk)
break

return chunks

After running, the UDF returns a status table:

status   table_name                 table_path                              chunks  chunk_size  overlap_size  content_chars
success overture_docs_17_04_2026 /mount/lancedb/overture_docs_17_04_2026 639 1300 195 706121

Embeddings are generated in batches of 400 chunks with up to 16 concurrent workers. Switching to a different embedding model is a one-line change in the EMBEDDING_MODEL constant at the top of the UDF.


Step 3: Semantic Search as a Fused API

The overture_docs_searching UDF accepts a natural-language question, embeds it with the same model, and runs a cosine similarity search against the stored chunks in LanceDB. Because it's a standard Fused UDF, it's immediately available as an HTTP endpoint.

Show overture_docs_searching UDF code
@fused.udf
def udf(
question: str = "Latest release?",
):
"""Searches Overture docs RAG for matching areas in the documentation."""
import pandas as pd

collection_name = "overture_docs_17_04_2026"
limit = 30
base_path = "/mount/lancedb/"

ai = fused.load("ai_utils")

if not ai.table_exists(collection_name, base_path=base_path):
available = ai.list_tables(base_path=base_path)
return pd.DataFrame(
{
"status": ["error"],
"error": [f"LanceDB table '{collection_name}' was not found."],
"base_path": [base_path],
"available_tables": [", ".join(available) if available else "(none)"],
}
)

df = ai.search(question, collection_name, top_k=limit)

if df is None or len(df) == 0:
return pd.DataFrame({"status": ["empty"], "message": ["No vector matches found."]})

cols_to_drop = [c for c in ["_distance", "_relevance_score", "vector", "embedding"] if c in df.columns]
if cols_to_drop:
df = df.drop(columns=cols_to_drop)

return df.head(5)

Asking "Latest release?" returns a ranked table of the most relevant doc chunks:

chunk_id   similarity   code
215 0.123 ## Overture Maps Engineering Blog ... August 20, 2025 — 2025-08-20.0 release ...
164 0.112 The 2025-08-20.0 release of Overture data and v1.11.0 of the schema are now available ...
214 0.095 April 22 - Exploring our beta release / May 16 - 2024-05-16-beta.0 release notes ...
213 0.092 ### 2026 — January 21 - 2026-01-21 release notes / February 11 - Overture Has ...
165 0.092 The base, buildings, divisions, places, and transportation themes are in GA ...
Speeding up repeated queries

Each search request re-embeds the question on every call. Wrap the UDF with @fused.cache to cache results for repeated or identical queries and cut latency significantly.


Using This for Any Site

To RAG a different docs site, update these values:

  1. In site_to_llmstxt, update base_url on line 2 of the UDF parameters:
base_url: str = "https://docs.myproject.io/",  # URL of the docs site to crawl
  1. In docs_ragging, update the two constants at the top of the file:
SOURCE_S3_PATH = "s3://your-bucket/myproject/llms.txt"  # S3 path where site_to_llmstxt wrote the output
LANCEDB_TABLE_NAME = "myproject_docs_v1" # unique name for this project's vector table
  1. In overture_docs_searching, update collection_name to match:
collection_name = "myproject_docs_v1"  # must match LANCEDB_TABLE_NAME above
Considerations
  • Large document sets may take time to index — crawling and embedding hundreds of pages runs in parallel but is still I/O and compute-bound
  • Re-indexing overwrites the existing LanceDB table and re-embeds all chunks, which incurs additional embedding API costs; only re-index when the source docs change significantly

Connect to Slack

Once the search UDF is live as a Fused API endpoint, you can connect it to Slack so your team can query the docs directly from a channel. The Fused Slack integration lets you wire any UDF endpoint to a Slack bot — no extra infrastructure needed.

Fused Bot answering Overture docs questions in Slack


Next Steps

To go further with this pipeline, explore these pages:

  • Caching — cache embedding and search results to avoid recomputing on repeated queries
  • UDFs as API — expose any UDF as an HTTP endpoint with a shared token
  • Slack integration — route search queries through a Slack bot
  • Building for Agents — use UDF endpoints as tools in an agentic workflow

Try the full pipeline on Fused Canvas to see all four UDFs wired together.