Hogflare is a Cloudflare Workers ingestion layer for PostHog SDKs. It supports PostHog-style ingestion, stateful persons/groups, and SDK feature flags, then streams events and person snapshots into Cloudflare Pipelines so data lands in R2 as Iceberg/Parquet.
- Ingestion endpoints:
/capture,/identify,/alias,/batch,/e,/engage,/groups,/s - Persons and groups:
$set,$set_once,$unset, aliasing, and group properties - SDK config and feature flags:
/array/:token/config,/flags, and/decideare evaluated in the Worker - Request enrichment: Cloudflare IP/geo fields added when missing
- Queryable people: append-only person snapshots can be written to a separate Iceberg table
flowchart TB
SDKs["PostHog SDKs"]
SDKs -->|"ingest"| Worker
SDKs -->|"flags/decide"| Worker
subgraph CF["Cloudflare Workers"]
Worker["Hogflare Worker"]
subgraph DOs["Durable Objects"]
PersonsDO["Persons DO"]
PersonIdDO["PersonID DO<br/>(seq counter)"]
GroupsDO["Groups DO"]
end
Worker <-.->|"read/write"| PersonsDO
Worker <-.->|"read/write"| GroupsDO
PersonsDO -.-> PersonIdDO
end
Worker -->|"events"| EventsPipeline["Events Pipeline"]
Worker -->|"person snapshots"| PersonsPipeline["Persons Pipeline"]
EventsPipeline --> EventsR2["R2 Data Catalog<br/>events table"]
PersonsPipeline --> PersonsR2["R2 Data Catalog<br/>persons table"]
PostHog is a nice-to-use web & product analytics platform. However, self-hosting PostHog is prohibitively complex so most users seem to rely on the cloud offering. This is an alternative for cost-conscious data folks & businesses interested in a low maintenance way to ingest web & product analytics directly into a managed data lake.
A hobby deployment of PostHog includes: postgres, redis, redis7, clickhouse, zookeeper, kafka, worker, web, plugins, proxy, objectstorage, seaweedfs, asyncmigrationscheck, temporal, elasticsearch, temporal-admin-tools, temporal-ui, temporal-django-worker, cyclotron-janitor, capture, replay-capture, property-defs-rs, livestream, feature-flags, cymbal
Admittedly, PostHog does a lot more than this package, but some folks really just want the basics!
- Create R2 Data Catalog-backed Pipelines resources.
- Copy
wrangler.toml.exampletowrangler.tomland set the stream endpoints. - Set Wrangler secrets.
- Build and deploy the Worker.
- Send a capture/identify verification flow and query the Iceberg tables.
The examples below use stable table names for a fresh deployment: default.hogflare_events and default.hogflare_persons. If you use versioned names during migration, substitute those names consistently in the sink commands and queries.
Set these values before creating sinks:
export R2_BUCKET="<bucket-name>"
export R2_CATALOG_TOKEN="<r2-data-catalog-token>"R2_CATALOG_TOKEN is the token used by R2 Data Catalog/R2 SQL clients such as DuckDB or PyIceberg. The bucket must have R2 Data Catalog enabled before creating r2-data-catalog sinks.
Create the events stream, sink, and pipeline:
bunx wrangler pipelines streams create hogflare_events_stream \
--schema-file scripts/events-pipeline-schema.json \
--http-enabled true \
--http-auth true
bunx wrangler pipelines sinks create hogflare_events_sink \
--type r2-data-catalog \
--bucket "$R2_BUCKET" \
--namespace default \
--table hogflare_events \
--catalog-token "$R2_CATALOG_TOKEN" \
--roll-interval 60
bunx wrangler pipelines create hogflare_events_pipeline \
--sql "INSERT INTO hogflare_events_sink SELECT * FROM hogflare_events_stream;"Create the persons stream, sink, and pipeline if you want queryable people in Iceberg:
bunx wrangler pipelines streams create hogflare_persons_stream \
--schema-file scripts/persons-pipeline-schema.json \
--http-enabled true \
--http-auth true
bunx wrangler pipelines sinks create hogflare_persons_sink \
--type r2-data-catalog \
--bucket "$R2_BUCKET" \
--namespace default \
--table hogflare_persons \
--catalog-token "$R2_CATALOG_TOKEN" \
--roll-interval 60
bunx wrangler pipelines create hogflare_persons_pipeline \
--sql "INSERT INTO hogflare_persons_sink SELECT * FROM hogflare_persons_stream;"Each stream creation command prints an HTTP endpoint like https://<stream-id>.ingest.cloudflare.com. Use those endpoints in wrangler.toml.
Copy the example and fill in the stream endpoints:
cp wrangler.toml.example wrangler.tomlname = "hogflare"
main = "build/index.js" # generated entrypoint from worker-build for the Rust worker
compatibility_date = "2025-01-09"
[vars]
CLOUDFLARE_PIPELINE_ENDPOINT = "https://<stream-id>.ingest.cloudflare.com"
CLOUDFLARE_PERSONS_PIPELINE_ENDPOINT = "https://<persons-stream-id>.ingest.cloudflare.com"
CLOUDFLARE_PIPELINE_TIMEOUT_SECS = "10"
# Optional
# POSTHOG_TEAM_ID = "1"
# POSTHOG_GROUP_TYPE_0 = "company"
# POSTHOG_GROUP_TYPE_1 = "team"
# POSTHOG_GROUP_TYPE_2 = "project"
# POSTHOG_GROUP_TYPE_3 = "org"
# POSTHOG_GROUP_TYPE_4 = "workspace"
# POSTHOG_SESSION_RECORDING_ENDPOINT = "/s/"
[[durable_objects.bindings]]
name = "PERSONS"
class_name = "PersonDurableObject"
[[durable_objects.bindings]]
name = "PERSON_ID_COUNTER"
class_name = "PersonIdCounterDurableObject"
[[durable_objects.bindings]]
name = "GROUPS"
class_name = "GroupDurableObject"
[[migrations]]
tag = "v1"
new_sqlite_classes = ["PersonDurableObject"]
[[migrations]]
tag = "v2"
new_sqlite_classes = ["PersonIdCounterDurableObject", "GroupDurableObject"]| Setting | Required | Notes |
|---|---|---|
CLOUDFLARE_PIPELINE_ENDPOINT |
Yes | Events stream HTTP endpoint from wrangler pipelines streams create. |
CLOUDFLARE_PIPELINE_AUTH_TOKEN |
Yes, for authenticated streams | Bearer token used for events stream HTTP ingest. |
CLOUDFLARE_PERSONS_PIPELINE_ENDPOINT |
No | Persons stream endpoint. Set this to write person snapshots to Iceberg. |
CLOUDFLARE_PERSONS_PIPELINE_AUTH_TOKEN |
No | Falls back to CLOUDFLARE_PIPELINE_AUTH_TOKEN when omitted. |
CLOUDFLARE_PIPELINE_TIMEOUT_SECS |
No | Defaults to 10 seconds. |
POSTHOG_API_KEY |
No | Default project token returned by /decide when request/header token is absent. |
POSTHOG_TEAM_ID |
No | Optional team id attached to event and person rows. |
POSTHOG_GROUP_TYPE_0..4 |
No | Maps PostHog group types to group0..group4; set POSTHOG_GROUP_TYPE_0=company to populate group0 for company groups. |
POSTHOG_SESSION_RECORDING_ENDPOINT |
No | Returned in /decide session recording config. |
POSTHOG_SIGNING_SECRET |
No | Enables HMAC request signature checks. |
PERSON_DEBUG_TOKEN |
No | Enables /__debug/person/:id for deployment verification. |
HOGFLARE_FEATURE_FLAGS |
No | JSON flag config used by /decide and /flags. |
Use a Cloudflare API token that can write to Pipelines for CLOUDFLARE_PIPELINE_AUTH_TOKEN. The same token can usually be reused for the persons stream.
bunx wrangler secret put CLOUDFLARE_PIPELINE_AUTH_TOKEN
# Optional. If omitted, the persons pipeline uses CLOUDFLARE_PIPELINE_AUTH_TOKEN.
bunx wrangler secret put CLOUDFLARE_PERSONS_PIPELINE_AUTH_TOKEN
# Optional.
bunx wrangler secret put POSTHOG_SIGNING_SECRET
bunx wrangler secret put PERSON_DEBUG_TOKEN
bunx wrangler secret put HOGFLARE_FEATURE_FLAGSworker-build --release
bunx wrangler deployexport HOGFLARE_URL="https://<your-worker>.workers.dev"
export HOGFLARE_API_KEY="phc_verify_$(date -u +%Y%m%d%H%M%S)"
export HOGFLARE_ANON_ID="${HOGFLARE_API_KEY}_anon"
export HOGFLARE_USER_ID="${HOGFLARE_API_KEY}_user"Send an anonymous capture:
curl -X POST "$HOGFLARE_URL/capture" \
-H "Content-Type: application/json" \
-d "{
\"api_key\": \"$HOGFLARE_API_KEY\",
\"event\": \"verify-anon-capture\",
\"distinct_id\": \"$HOGFLARE_ANON_ID\",
\"properties\": {
\"\$set\": { \"initial_referrer\": \"docs\" },
\"\$set_once\": { \"first_seen_source\": \"readme\" }
}
}"Identify the user and link the anonymous ID:
curl -X POST "$HOGFLARE_URL/identify" \
-H "Content-Type: application/json" \
-d "{
\"api_key\": \"$HOGFLARE_API_KEY\",
\"distinct_id\": \"$HOGFLARE_USER_ID\",
\"properties\": {
\"\$anon_distinct_id\": \"$HOGFLARE_ANON_ID\",
\"\$set\": { \"email\": \"verify@example.com\", \"plan\": \"pro\" },
\"\$set_once\": { \"signup_source\": \"readme\" }
}
}"Send a post-identify capture:
curl -X POST "$HOGFLARE_URL/capture" \
-H "Content-Type: application/json" \
-d "{
\"api_key\": \"$HOGFLARE_API_KEY\",
\"event\": \"verify-identified-capture\",
\"distinct_id\": \"$HOGFLARE_USER_ID\",
\"properties\": { \"button\": \"verify\" }
}"Wait for the sink roll interval, then query R2 SQL:
export R2_WAREHOUSE="<account-id>_<bucket-name>"
export WRANGLER_R2_SQL_AUTH_TOKEN="$R2_CATALOG_TOKEN"
bunx wrangler r2 sql query "$R2_WAREHOUSE" \
"select event, distinct_id, person_id, person_properties
from default.hogflare_events
where api_key = '$HOGFLARE_API_KEY'
order by created_at asc"
bunx wrangler r2 sql query "$R2_WAREHOUSE" \
"select operation, canonical_distinct_id, person_id, distinct_ids, merged_properties
from default.hogflare_persons
where api_key = '$HOGFLARE_API_KEY'
order by updated_at asc"Expected result: the three event rows share one person_id, and the persons table has capture, identify, capture snapshots. After identify, distinct_ids should include both the anonymous and identified IDs.
If POSTHOG_SIGNING_SECRET is set, requests must include a valid signature.
payload='[
{
"api_key": "phc_example",
"event": "purchase",
"distinct_id": "user_12345",
"properties": { "amount": 29.99 }
}
]'
signature=$(printf '%s' "$payload" | openssl dgst -sha256 -hmac "$POSTHOG_SIGNING_SECRET" | awk '{print $2}')
curl -X POST https://<your-worker>.workers.dev/capture \
-H "Content-Type: application/json" \
-H "X-POSTHOG-SIGNATURE: sha256=$signature" \
-d "$payload"Note: X-HUB-SIGNATURE with sha1= is also accepted for GitHub-style webhook compatibility.
import posthog from "posthog-js";
posthog.init("<project_api_key>", {
api_host: "https://<your-worker>.workers.dev",
capture_pageview: true,
});import { PostHog } from "posthog-node";
const client = new PostHog("<project_api_key>", {
host: "https://<your-worker>.workers.dev",
});
client.capture({
distinctId: "user_123",
event: "purchase",
properties: { amount: 29.99 },
});
await client.shutdown();Set the SDK host/base URL to your Worker (https://<your-worker>.workers.dev) and use your project API key. Most SDKs use either api_host (browser/mobile) or host (server).
The repo includes a lightweight fake pipeline (FastAPI + DuckDB) used by tests.
docker compose up --build -d fake-pipeline# .env.local (not committed)
CLOUDFLARE_PIPELINE_ENDPOINT=http://127.0.0.1:8088/
CLOUDFLARE_PERSONS_PIPELINE_ENDPOINT=http://127.0.0.1:8088/
CLOUDFLARE_PIPELINE_TIMEOUT_SECS=5cargo runINSTALL httpfs;
INSTALL iceberg;
LOAD httpfs;
LOAD iceberg;
CREATE SECRET r2_catalog_secret (
TYPE ICEBERG,
TOKEN '<CLOUDFLARE_API_TOKEN>'
);
ATTACH '<ACCOUNT_ID>_<BUCKET>' AS iceberg_catalog (
TYPE ICEBERG,
ENDPOINT 'https://catalog.cloudflarestorage.com/<ACCOUNT_ID>/<BUCKET>'
);
SELECT count(*) FROM iceberg_catalog.default.hogflare_events;
SELECT count(*) FROM iceberg_catalog.default.hogflare_persons;
SELECT * FROM iceberg_catalog.default.hogflare_persons LIMIT 5;If you used versioned table names during a migration, substitute those names here.
Delete Pipelines resources in dependency order: pipelines first, then streams and sinks.
bunx wrangler pipelines list
bunx wrangler pipelines delete <pipeline-id> --force
bunx wrangler pipelines streams list
bunx wrangler pipelines streams delete <stream-id> --force
bunx wrangler pipelines sinks list
bunx wrangler pipelines sinks delete <sink-id> --forcewrangler r2 sql query is read-only. To drop an Iceberg table from R2 Data Catalog, use the Iceberg catalog API. One local option is PyIceberg:
R2_CATALOG_TOKEN="<r2-data-catalog-token>" uv run --with pyiceberg python - <<'PY'
import os
from pyiceberg.catalog.rest import RestCatalog
catalog = RestCatalog(
name="hogflare",
warehouse="<account-id>_<bucket-name>",
uri="https://catalog.cloudflarestorage.com/<account-id>/<bucket-name>",
token=os.environ["R2_CATALOG_TOKEN"],
)
catalog.drop_table(("default", "<table-name>"), purge_requested=True)
PYHogflare includes a host-side importer for backfilling an existing PostHog project into the same Cloudflare Pipeline sink used by the Worker. It reads PostHog's private API with a personal API key, then writes normalized rows to the pipeline:
- persons as
$identifyrows - groups as
$groupidentifyrows - historical events from HogQL with original
timestamp,created_at, and PostHog eventuuidwhen available
The importer writes historical rows directly to the pipeline. It does not mutate Worker Durable Object state.
Required inputs:
export POSTHOG_PROJECT_ID="<project_id>"
export POSTHOG_PERSONAL_API_KEY="phx_..."
export CLOUDFLARE_PIPELINE_ENDPOINT="https://<stream-id>.ingest.cloudflare.com"
export CLOUDFLARE_PIPELINE_AUTH_TOKEN="<pipeline token>" # if your stream requires itOptional inputs:
export POSTHOG_HOST="https://us.posthog.com" # or https://eu.posthog.com / self-hosted URL
export POSTHOG_ENVIRONMENT_ID="<environment_id>" # recommended for current PostHog persons/groups APIs
export HOGFLARE_API_KEY="phc_..."
export POSTHOG_TEAM_ID="1"
export POSTHOG_GROUP_TYPE_0="company"
export IMPORT_FROM="2025-01-01"
export IMPORT_TO="2025-02-01"
export IMPORT_BATCH_SIZE="500"
export IMPORT_PERSONS_OFFSET="0" # resume guardrails
export IMPORT_EVENTS_OFFSET="0"
export IMPORT_EVENTS_AFTER_TIMESTAMP="2024-09-21T03:24:11Z"
export IMPORT_EVENTS_AFTER_UUID="0192129b-c354-77b4-b496-9be7ec571fb4"
export IMPORT_EVENT_UUIDS_FILE="/tmp/missing-event-uuids.txt"
export IMPORT_EVENT_WINDOW_DAYS="7"
export IMPORT_EVENT_WINDOW_HOURS="6" # use days or hours, not both
export IMPORT_MAX_PERSONS="1000" # optional guardrails for smoke tests
export IMPORT_MAX_GROUPS="1000"
export IMPORT_MAX_EVENTS="1000"
export IMPORT_STATE_FILE=".hogflare-import-state.jsonl"
export IMPORT_TARGET_ACCOUNT_ID="<cloudflare_account_id>"
export IMPORT_TARGET_BUCKET="<r2_bucket>"
export IMPORT_TARGET_TABLE="default.hogflare_events_v3"
export WRANGLER_R2_SQL_AUTH_TOKEN="<r2 sql token>"
export IMPORT_CLOUDFLARE_API_TOKEN="<token with Pipelines read>" # optional auto flush discovery
export IMPORT_PIPELINE_FLUSH_SECS="300" # fallback if Pipelines read is unavailableProduction imports require R2 SQL target checks by default. The importer uses stable import
keys, queries the target before each batch, and skips rows that are already present.
Cloudflare Pipeline/R2 is append-only and does not enforce uniqueness by itself. Passing
--no-target-check or IMPORT_TARGET_CHECKS=false opts out and should only be used for
local tests.
Retry behavior is intentionally conservative. Import sends are not blindly retried after a
transport or response error because the pipeline may have accepted the batch even if the
client did not receive the response. The importer aligns its wait window to the Cloudflare
Pipeline sink rolling policy when IMPORT_CLOUDFLARE_API_TOKEN can read Pipelines. Without
that API access, it uses IMPORT_PIPELINE_FLUSH_SECS, defaulting conservatively to 300
seconds. The wait is max(60s, 2 * flush + 30s), unless IMPORT_TARGET_WAIT_SECS is set
explicitly.
The local state file makes normal same-machine resumes cheap, but it is not a substitute for target checks if the state file is lost, multiple importers run concurrently, or a send has an unknown commit state.
Run a dry run first:
cargo run --bin import_posthog -- --dry-runRun the import:
cargo run --bin import_posthogYou can also pass flags instead of env vars:
cargo run --bin import_posthog -- \
--posthog-host https://us.posthog.com \
--project-id 12345 \
--environment-id 67890 \
--personal-api-key "$POSTHOG_PERSONAL_API_KEY" \
--pipeline-endpoint "$CLOUDFLARE_PIPELINE_ENDPOINT" \
--pipeline-auth-token "$CLOUDFLARE_PIPELINE_AUTH_TOKEN" \
--hogflare-api-key phc_example \
--from 2025-01-01 \
--to 2025-02-01 \
--persons-offset 0 \
--events-offset 0 \
--events-after-timestamp 2024-09-21T03:24:11Z \
--events-after-uuid 0192129b-c354-77b4-b496-9be7ec571fb4 \
--event-uuids-file /tmp/missing-event-uuids.txt \
--event-window-hours 6 \
--max-persons 1000 \
--max-groups 1000 \
--max-events 1000 \
--import-state-file .hogflare-import-state.jsonl \
--target-account-id "$CLOUDFLARE_ACCOUNT_ID" \
--target-bucket hogflare \
--target-table default.hogflare_events_v3 \
--target-auth-token "$WRANGLER_R2_SQL_AUTH_TOKEN" \
--cloudflare-api-token "$CLOUDFLARE_API_TOKEN"Use --skip-persons, --skip-groups, or --skip-events to import only part of the project.
Use --skip-person-output when resuming an event import after person rows were already written; it still loads people for event hydration.
/capture(single or batch payloads)/identify/alias/batch(mixed events)/e(event payloads)/engage/groups/s(session replay payloads)
Identify, capture $set / $set_once / $unset, and alias events update a person record stored in a Durable Object. The record tracks distinct_id aliases, person properties, and a sequential id plus a UUID. Events include:
person_id(the person UUID)person_created_atperson_properties
The Durable Object is the source of truth for the current person record. When CLOUDFLARE_PERSONS_PIPELINE_ENDPOINT is configured, Hogflare also writes append-only person snapshots to the persons pipeline so the state is queryable in Iceberg.
/groups($groupidentifypayloads) are forwarded.- Group properties are stored in a Group DO and attached to events as
group_properties. - Group slots (
group0..group4) are mapped byPOSTHOG_GROUP_TYPE_0..4.
- SDK config advertises
sessionRecording: falsewhenPOSTHOG_SESSION_RECORDING_ENDPOINTis unset, so the Worker can keep replay off remotely. SetPOSTHOG_SESSION_RECORDING_ENDPOINT=/s/to turn replay on and route uploads through Hogflare's ingestion path. /saccepts PostHog replay payloads, including gzip/gzip-js compressed browser SDK requests.- Modern
$snapshotpayloads are normalized to$snapshot_itemsrows before they are sent through Cloudflare Pipelines into R2. - Legacy raw chunk payloads are still accepted as
$snapshotrows.
Feature flags and SDK remote config are evaluated in the Worker and exposed via /array/:token/config, /decide, and /flags.
Configuration is a JSON blob in HOGFLARE_FEATURE_FLAGS. It can be either:
{ "flags": [ ... ] }[ ... ](array of flag definitions)
Supported fields per flag:
| Field | Type | Notes |
|---|---|---|
key |
string | Flag key |
active |
bool | Defaults to true |
type |
"boolean" | "multivariate" |
Defaults to boolean |
rollout_percentage |
number | 0–100 |
variants |
array | [{ key, rollout_percentage, payload? }] |
payload |
json | Used for boolean flags |
variant_payloads |
map | { "variant_key": { ... } } |
conditions |
array | See filters below |
group_type |
string | Enables group-based rollout |
evaluation_environments |
array | Optional env gating |
salt |
string | Optional bucketing salt |
id, version, description |
metadata | Returned in flag details |
Filters support these operators:
eq(default),is_notin,not_incontainsregexis_setgt,gte,lt,lte
Value comparisons coerce strings/booleans/numbers when possible (e.g. "21" >= 18).
Request fields honored by /flags and /decide:
flag_keys_to_evaluate— only evaluate these keysevaluation_environments— only evaluate flags whoseevaluation_environmentsincludes one of theseperson_properties,group_properties,groups— override state for evaluation
Rollout bucketing is deterministic:
- Hash:
sha1("{salt}:{hash_id}") hash_idisdistinct_idfor person flags, or the group key whengroup_typeis set- Bucket =
hash % 100(0–99) saltdefaults to the flagkeyif not provided
Example:
{
"flags": [
{
"key": "pro-flag",
"active": true,
"rollout_percentage": 100,
"id": 12,
"version": 3,
"description": "Pro users",
"salt": "pro-flag-salt",
"conditions": [
{
"properties": [
{ "key": "plan", "value": ["pro", "enterprise"], "operator": "in" },
{ "key": "age", "value": 18, "operator": "gte" }
]
}
],
"payload": { "tier": "pro" }
}
]
}Limitations: cohorts and event-based filters are not supported.
- If
POSTHOG_SIGNING_SECRETis set, requests must include a valid HMAC signature.
Hogflare adds Cloudflare request data into properties when those keys are not already present:
$ipfromCF-Connecting-IP$geoip_*from Cloudflare request metadata (country, city, region, lat/long, timezone)cf_*fields:cf_asn,cf_as_organization,cf_colo,cf_metro_code,cf_ray
Each row is a PipelineEvent with these columns:
| Field | Type / Notes |
|---|---|
uuid |
string (UUID v4) |
team_id |
int64 (optional) |
source |
string |
event |
string |
distinct_id |
string |
timestamp |
RFC3339 timestamp (optional) |
created_at |
RFC3339 timestamp |
properties |
JSON |
context |
JSON |
person_id |
string (person UUID) |
person_created_at |
RFC3339 timestamp |
person_properties |
JSON |
group0..group4 |
string (group key slots) |
group_properties |
JSON (by group type) |
api_key |
string |
extra |
JSON |
Each row is a PersonPipelineRecord snapshot with these columns:
| Field | Type / Notes |
|---|---|
uuid |
string (snapshot UUID v4) |
team_id |
int64 (optional) |
source |
string |
operation |
capture, identify, alias, engage, session_recording |
person_id |
string (person UUID) |
person_int_id |
int64 |
canonical_distinct_id |
string |
distinct_ids |
string list / array |
created_at |
person creation timestamp |
updated_at |
snapshot timestamp |
version |
person version |
properties |
JSON $set properties |
properties_set_once |
JSON $set_once properties |
merged_properties |
JSON merged person properties |
api_key |
string |
source_event_uuid |
event row UUID that produced the snapshot |
