Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/amp/src/Models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ export const DatasetConfig = Schema.Struct({
identifier: "DatasetConfig",
description: "Configuration associated with a dataset."
})
export type DatasetConfig = typeof DatasetConfig.Type

/**
* Represents information about a table.
Expand Down
4 changes: 2 additions & 2 deletions packages/amp/src/admin/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -471,9 +471,9 @@ export class ProviderGroup extends HttpApiGroup.make("provider")
// =============================================================================

/**
* The api definition for the admin api.
* The specification for the Amp administration API.
*/
export class Api extends HttpApi.make("admin")
export class Api extends HttpApi.make("AmpAdminApi")
.add(DatasetGroup)
.add(JobGroup)
.add(WorkerGroup)
Expand Down
56 changes: 28 additions & 28 deletions packages/amp/src/admin/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import * as Layer from "effect/Layer"
import * as Option from "effect/Option"
import * as Auth from "../Auth.ts"
import type * as Models from "../Models.ts"
import * as AdminApiDefinition from "./api.ts"
import type * as AdminApiDomain from "./domain.ts"
import * as Api from "./api.ts"
import type * as Domain from "./domain.ts"

// =============================================================================
// Admin API Service Types
Expand Down Expand Up @@ -59,16 +59,16 @@ export class AdminApi extends Context.Tag("Amp/AdminApi")<AdminApi, {
name: Models.DatasetName,
manifest: Models.DatasetManifest,
version?: Models.DatasetRevision | undefined
) => Effect.Effect<void, HttpError | AdminApiDefinition.RegisterDatasetError>
) => Effect.Effect<void, HttpError | Api.RegisterDatasetError>

/**
* Get all datasets.
*
* @return The list of all datasets.
*/
readonly getDatasets: () => Effect.Effect<
AdminApiDomain.GetDatasetsResponse,
HttpError | AdminApiDefinition.GetDatasetsError
Domain.GetDatasetsResponse,
HttpError | Api.GetDatasetsError
>

/**
Expand All @@ -82,8 +82,8 @@ export class AdminApi extends Context.Tag("Amp/AdminApi")<AdminApi, {
namespace: Models.DatasetNamespace,
name: Models.DatasetName
) => Effect.Effect<
AdminApiDomain.GetDatasetVersionsResponse,
HttpError | AdminApiDefinition.GetDatasetVersionsError
Domain.GetDatasetVersionsResponse,
HttpError | Api.GetDatasetVersionsError
>

/**
Expand All @@ -99,8 +99,8 @@ export class AdminApi extends Context.Tag("Amp/AdminApi")<AdminApi, {
name: Models.DatasetName,
revision: Models.DatasetRevision
) => Effect.Effect<
AdminApiDomain.GetDatasetVersionResponse,
HttpError | AdminApiDefinition.GetDatasetVersionError
Domain.GetDatasetVersionResponse,
HttpError | Api.GetDatasetVersionError
>

/**
Expand All @@ -122,8 +122,8 @@ export class AdminApi extends Context.Tag("Amp/AdminApi")<AdminApi, {
workerId?: string | undefined
} | undefined
) => Effect.Effect<
AdminApiDomain.DeployDatasetResponse,
HttpError | AdminApiDefinition.DeployDatasetError
Domain.DeployDatasetResponse,
HttpError | Api.DeployDatasetError
>

/**
Expand All @@ -138,7 +138,7 @@ export class AdminApi extends Context.Tag("Amp/AdminApi")<AdminApi, {
namespace: Models.DatasetNamespace,
name: Models.DatasetName,
revision: Models.DatasetRevision
) => Effect.Effect<Models.DatasetManifest, HttpError | AdminApiDefinition.GetDatasetManifestError>
) => Effect.Effect<Models.DatasetManifest, HttpError | Api.GetDatasetManifestError>

/**
* Retrieves sync progress information for a specific dataset revision,
Expand All @@ -154,8 +154,8 @@ export class AdminApi extends Context.Tag("Amp/AdminApi")<AdminApi, {
name: Models.DatasetName,
revision: Models.DatasetRevision
) => Effect.Effect<
AdminApiDomain.GetDatasetSyncProgressResponse,
HttpError | AdminApiDefinition.GetDatasetSyncProgressError
Domain.GetDatasetSyncProgressResponse,
HttpError | Api.GetDatasetSyncProgressError
>

/**
Expand All @@ -169,8 +169,8 @@ export class AdminApi extends Context.Tag("Amp/AdminApi")<AdminApi, {
lastJobId?: number | undefined
status?: string | undefined
}) => Effect.Effect<
AdminApiDomain.GetJobsResponse,
HttpError | AdminApiDefinition.GetJobsError
Domain.GetJobsResponse,
HttpError | Api.GetJobsError
>

/**
Expand All @@ -181,7 +181,7 @@ export class AdminApi extends Context.Tag("Amp/AdminApi")<AdminApi, {
*/
readonly getJobById: (
jobId: number
) => Effect.Effect<Models.JobInfo, HttpError | AdminApiDefinition.GetJobByIdError>
) => Effect.Effect<Models.JobInfo, HttpError | Api.GetJobByIdError>

/**
* Stop a job by ID.
Expand All @@ -191,7 +191,7 @@ export class AdminApi extends Context.Tag("Amp/AdminApi")<AdminApi, {
*/
readonly stopJob: (
jobId: number
) => Effect.Effect<void, HttpError | AdminApiDefinition.StopJobError>
) => Effect.Effect<void, HttpError | Api.StopJobError>

/**
* Delete a job by ID.
Expand All @@ -201,24 +201,24 @@ export class AdminApi extends Context.Tag("Amp/AdminApi")<AdminApi, {
*/
readonly deleteJob: (
jobId: number
) => Effect.Effect<void, HttpError | AdminApiDefinition.DeleteJobError>
) => Effect.Effect<void, HttpError | Api.DeleteJobError>

/**
* Get all workers.
*
* @return The list of workers.
*/
readonly getWorkers: () => Effect.Effect<
AdminApiDomain.GetWorkersResponse,
HttpError | AdminApiDefinition.GetWorkersError
Domain.GetWorkersResponse,
HttpError | Api.GetWorkersError
>

/**
* Get all providers.
*
* @return The list of providers.
*/
readonly getProviders: () => Effect.Effect<AdminApiDomain.GetProvidersResponse, HttpError>
readonly getProviders: () => Effect.Effect<Domain.GetProvidersResponse, HttpError>

/**
* Register a manifest.
Expand All @@ -229,8 +229,8 @@ export class AdminApi extends Context.Tag("Amp/AdminApi")<AdminApi, {
readonly registerManifest: (
manifest: unknown
) => Effect.Effect<
AdminApiDomain.RegisterManifestResponse,
HttpError | AdminApiDefinition.RegisterManifestError
Domain.RegisterManifestResponse,
HttpError | Api.RegisterManifestError
>

/**
Expand All @@ -240,10 +240,10 @@ export class AdminApi extends Context.Tag("Amp/AdminApi")<AdminApi, {
* @returns An effect that resolves to the schema response.
*/
readonly getOutputSchema: (
request: AdminApiDomain.GetOutputSchemaPayload
request: Domain.GetOutputSchemaPayload
) => Effect.Effect<
AdminApiDomain.GetOutputSchemaResponse,
HttpError | AdminApiDefinition.GetOutputSchemaError
Domain.GetOutputSchemaResponse,
HttpError | Api.GetOutputSchemaError
>
}>() {}

Expand All @@ -256,7 +256,7 @@ const make = Effect.fnUntraced(function*(options: MakeOptions) {

const auth = yield* Effect.serviceOption(Auth.Auth)

const client = yield* HttpApiClient.make(AdminApiDefinition.Api, {
const client = yield* HttpApiClient.make(Api.Api, {
baseUrl: options.url,
transformClient: Option.match(auth, {
onNone: constUndefined,
Expand Down
7 changes: 5 additions & 2 deletions packages/amp/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ export * as Auth from "./Auth.ts"
/**
* Operations for interacting with the Amp administration API.
*/
export * as AdminApi from "./AdminApi.ts"
export * as AdminApi from "./admin/api.ts"

export { AmpRegistryApiV1 } from "./registry/api.ts"
/**
* Operations for interacting with the Amp registry API.
*/
export * as RegistryApi from "./registry/api.ts"
156 changes: 156 additions & 0 deletions packages/amp/src/manifest-builder/service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import * as Context from "effect/Context"
import * as Data from "effect/Data"
import * as Effect from "effect/Effect"
import * as Layer from "effect/Layer"
import * as Predicate from "effect/Predicate"
import * as Schema from "effect/Schema"
import * as AdminApi from "../admin/service.ts"
import * as Models from "../Models.ts"

export const ManifestBuildResult = Schema.Struct({
metadata: Models.DatasetMetadata,
manifest: Models.DatasetDerived
})
export type ManifestBuildResult = typeof ManifestBuildResult.Type

export class ManifestBuilderError extends Data.TaggedError("ManifestBuilderError")<{
readonly cause: unknown
readonly message: string
readonly table: string
}> {}

export class ManifestBuilder extends Context.Tag("Amp/ManifestBuilder")<ManifestBuilder, {
readonly build: (config: Models.DatasetConfig) => Effect.Effect<ManifestBuildResult, ManifestBuilderError>
}>() {}

const make = Effect.gen(function*() {
const admin = yield* AdminApi.AdminApi

const build = Effect.fn("ManifestBuilder.build")(
function*(config: Models.DatasetConfig) {
// Extract metadata
const metadata = Models.DatasetMetadata.make({
namespace: config.namespace ?? Models.DatasetNamespace.make("_"),
name: config.name,
readme: config.readme,
repository: config.repository,
description: config.description,
keywords: config.keywords,
license: config.license,
visibility: config.private ? "private" : "public",
sources: config.sources
})

// Build manifest tables - send all tables in one request
const tables = yield* Effect.gen(function*() {
const configTables = config.tables ?? {}
const configFunctions = config.functions ?? {}

// Build function definitions map from config
const functionsMap: Record<string, Models.FunctionDefinition> = {}
for (const [name, func] of Object.entries(configFunctions)) {
functionsMap[name] = Models.FunctionDefinition.make({
source: func.source,
inputTypes: func.inputTypes,
outputType: func.outputType
})
}

// If no tables and no functions, skip schema request entirely
if (Object.keys(configTables).length === 0 && Object.keys(functionsMap).length === 0) {
return []
}

// If no tables but we have functions, still skip schema request
// (when functions-only validation happens server-side, returns empty schema)
if (Object.keys(configTables).length === 0) {
return []
}

// Prepare all table SQL queries
const tableSqlMap: Record<string, string> = {}
for (const [name, table] of Object.entries(configTables)) {
tableSqlMap[name] = table.sql
}

// Call schema endpoint with all tables and functions at once
const response = yield* admin.getOutputSchema({
tables: tableSqlMap,
dependencies: config.dependencies,
functions: Object.keys(functionsMap).length > 0 ? functionsMap : undefined
}).pipe(Effect.catchAll((cause) =>
new ManifestBuilderError({
cause,
message: "Failed to get schemas",
table: "(all tables)"
})
))

// Process each table's schema
const tables: Array<[name: string, table: Models.Table]> = []
for (const [name, table] of Object.entries(configTables)) {
const tableSchema = response.schemas[name]

if (Predicate.isUndefined(tableSchema)) {
return yield* new ManifestBuilderError({
message: `No schema returned for table ${name}`,
table: name,
cause: undefined
})
}

if (tableSchema.networks.length !== 1) {
return yield* new ManifestBuilderError({
cause: undefined,
message: `Expected 1 network for SQL query, got ${tableSchema.networks}`,
table: name
})
}

const network = Models.Network.make(tableSchema.networks[0])
const input = Models.TableInput.make({ sql: table.sql })
const output = Models.Table.make({ input, schema: tableSchema.schema, network })

tables.push([name, output])
}

return tables
})

// Build manifest functions
const functions: Array<[name: string, manifest: Models.FunctionManifest]> = []
for (const [name, func] of Object.entries(config.functions ?? {})) {
const { inputTypes, outputType, source } = func
const manifest = Models.FunctionManifest.make({ name, source, inputTypes, outputType })

functions.push([name, manifest])
}

const manifest = Models.DatasetDerived.make({
kind: "manifest",
startBlock: config.startBlock,
dependencies: config.dependencies,
tables: Object.fromEntries(tables),
functions: Object.fromEntries(functions)
})

return ManifestBuildResult.make({
metadata,
manifest
})
}
)

return {
build
} as const
})

/**
* Layer for creating a `ManifestBuilder`.
*/
export const layer: Layer.Layer<
ManifestBuilder,
never,
AdminApi.AdminApi
> = Layer.effect(ManifestBuilder, make)
4 changes: 2 additions & 2 deletions packages/amp/src/registry/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -584,9 +584,9 @@ export class MyDatasetsApiGroup extends HttpApiGroup.make("myDatasets")
// =============================================================================

/**
* The specification for the Amp Registry v1 API.
* The specification for the Amp Registry API (v1).
*/
export class AmpRegistryApiV1 extends HttpApi.make("AmpRegistryApiV1")
export class ApiV1 extends HttpApi.make("AmpRegistryApiV1")
.add(HealthApiGroup)
.add(DatasetsApiGroup)
.add(OwnedDatasetsApiGroup)
Expand Down
11 changes: 10 additions & 1 deletion packages/amp/test/arrow-test-harness/BufferUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,16 @@ const encodeFloat16 = (value: number): number => {
if (exponent <= 0) return (sign << 15)

mantissa = Math.round(mantissa * 1024)
return (sign << 15) | (exponent << 10) | (mantissa & 0x3FF)

// Handle mantissa overflow - carry to exponent
if (mantissa >= 1024) {
mantissa = 0
exponent += 1
// Check if exponent overflowed to infinity
if (exponent >= 31) return (sign << 15) | 0x7C00
}

return (sign << 15) | (exponent << 10) | mantissa
}

export const createFloat16DataBuffer = (values: ReadonlyArray<number | null>): Uint8Array => {
Expand Down
Loading