Next.js + Tus + Uppy + OSS

11 days ago

Prerequisites

OSS provides API operations that are compatible with S3.

Dependencies

@uppy/tus, @uppy/core,@tus/server,@tus/s3-store

What is Uppy?

A file uploader built-in Tus, can simply understand it as a tus client.

Get Started

  1. Add example code from Framework integrations into your next.js project.
  2. Differently, use @tus/s3-store to replace FileStore, like this:
// /pages/api/upload/[[...file]].ts

// const tusServer = new Server({
// 	// `path` needs to match the route declared by the next file router
// 	path: '/api/upload',
// 	datastore: new FileStore({ directory: './files' }),
// });

const s3Store = new S3Store({
  partSize: 8 * 1024 * 1024, // Each uploaded part will have ~8MiB,
  s3ClientConfig: {
    bucket: process.env.AWS_BUCKET,
    region: process.env.AWS_REGION,
    credentials: {
      accessKeyId: process.env.AWS_ACCESS_KEY_ID,
      secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
    },
  },
});
const server = new Server({ path: "/files", datastore: s3Store });
// ...

Now, client and server implementation has been successfully added to you next.js project.

How do they work?

@tus/s3-store internally encapsulates S3's apis invoking, provides a simple way to integrate with you applications.
For most developer, it's unnecessary to care about the internal implementation.
If you want to know how resume upload and multipart upload work, go ahead with reading.

Explore the entire upload process in source code.

// route.ts

// ...
export const GET = async (req: NextRequest) => server.handleWeb(req);
export const PATCH = async (req: NextRequest) => server.handleWeb(req);
export const POST = async (req: NextRequest) => server.handleWeb(req);
export const DELETE = async (req: NextRequest) => server.handleWeb(req);
export const OPTIONS = async (req: NextRequest) => server.handleWeb(req);
export const HEAD = async (req: NextRequest) => server.handleWeb(req);

// All requests will be forwarded and processed by handleWeb
// @tus server.ts
import {EventEmitter} from 'node:events'
// ...
export class Server extends EventEmitter {
  constructor() {
  this.handlers = {
    // ...
    this.handlers = {
      // GET handlers should be written in the implementations
      GET: new GetHandler(this.datastore, this.options),
      // These methods are handled under the tus protocol
      HEAD: new HeadHandler(this.datastore, this.options),
      OPTIONS: new OptionsHandler(this.datastore, this.options),
      PATCH: new PatchHandler(this.datastore, this.options),
      POST: new PostHandler(this.datastore, this.options),
      DELETE: new DeleteHandler(this.datastore, this.options),
    }
  }

  // ...
  async handleWeb(req: Request) {
    return this.handler(req)
  }

  private async handler(req: Request) {
    // ...
    // Invoke the handler for the method requested
    const handler = this.handlers[req.method as keyof Handlers]
    if (handler) {
      return handler.send(req, context, headers).catch(onError)
    }
    return this.write(context, headers, 404, 'Not found\n')
  }
}

1. Client send a POST request

TODO: This request will not send if cached.

Request Headers contains:

Response Headers contains:

// tus utils Upload.ts
export class Upload {
  id: TUpload['id']
  metadata: TUpload['metadata']
  size: TUpload['size']
  offset: TUpload['offset']
  creation_date: TUpload['creation_date']
  storage: TUpload['storage']

  constructor(upload: TUpload) {
    if (!upload.id) {
      throw new Error('[File] constructor must be given an ID')
    }

    this.id = upload.id
    this.size = upload.size
    this.offset = upload.offset
    this.metadata = upload.metadata
    this.storage = upload.storage

    this.creation_date = upload.creation_date ?? new Date().toISOString()
  }

  get sizeIsDeferred(): boolean {
    return this.size === undefined
  }
}

// PostHandler.ts
export class PostHandler extends BaseHandler {
  // ...

  async send() {
    // ...
    const upload = new Upload({
      id,
      size: upload_length ? Number.parseInt(upload_length, 10) : undefined,
      offset: 0,
      metadata,
    })
    // ...
    await this.store.create(upload)
    url = this.generateUrl(req, upload.id)
    // ...
    return this.write(
      responseData.status,
      responseData.headers,
      responseData.body
    )
  }
}

// s3-store/index.ts
export class S3Store extends DataStore {
  constuctor() {
    this.client = new S3(...);
  }
  public async create(upload: Upload) {
    const res = await this.client.createMultipartUpload(request);
    // the res contains a uploadId which uniquely globally in OSS server
    // ...
  }
}

2. Client send a PATCH request

Headers:

The first PATCH request:

Request Headers: Upload-Offset: 0
Response Headers: Upload-Offset: 1048576

The second PATCH request:

Request Headers: Upload-Offset: 1048576
Response Headers: Upload-Offset: 2097152

You can clearly know:
The current PATCH request will carry the offset that has been processed, and return offset that this request processed for sending next PATCH request.

You may have a question:
How to know the offset if it's interrupted? like close the application and reupload.
The answer is HEAD request. You will understand after later.

Payload:

Let's see what happened in PatchHandler.ts:

export class PatchHandler extends BaseHandler {
  async send(...) {
    // ...
    const id = this.getFileIdFromRequest(req); // use decodeURIComponent internal
    // ...
    const offset = Number.parseInt(req.headers.get('upload-offset') as string, 10)
    // ...
    upload = await this.store.getUpload(id);
    // ...
    const upload_length = req.headers.get('upload-length')
    const size = Number.parseInt(upload_length, 10)
    // ...
    // this function will upload a .info file in oss
    await this.store.declareUploadLength(id, size)
    // pivotal func invoking in declareUploadLength internal:
    // declareUploadLength
    // => this.client.getObject() get MetaData and Body
    // => this.client.putObject({... Key: "xxx.info", Metadata: {"upload-id":uploadId, "tus-version": xxx}})
    upload.size = size
    // ...
    const maxBodySize = await this.calculateMaxBodySize(req, upload, maxFileSize)
    // calculateMaxBodySize internal function calls:
    // this.store.write(stream as StreamLimiter, upload.id, upload.offset) by steam
    // => this.client.getObject(xxx.info) get Metadata from .info file in oss
    // => retrieveParts => this.client.listParts
    // => downloadIncompletePart => await this.client.getObject(xxx.part)  get incomplete part
    // => if (incompletePart) => this.deleteIncompletePart(id) => this.client.deleteObject(xxx.part)
    // => await this.uploadParts
    // => return newOffset

    newOffset = await this.writeToStore(req.body, upload, maxBodySize, context)
    // writeToStore internally invoke store.write
    // ...
    // response status & headers & body
  }
}
// s3-store/index.ts
export class S3Store extends DataStore {
  // ...
   /**
   * Gets the number of complete parts/chunks already uploaded to S3.
   * Retrieves only consecutive parts.
   */
  protected async retrieveParts(
    id: string,
    partNumberMarker?: string
  ): Promise<Array<AWS.Part>> {
    const metadata = await this.getMetadata(id);

    const params: AWS.ListPartsCommandInput = {
      Bucket: this.bucket,
      Key: id,
      UploadId: metadata["upload-id"],
      PartNumberMarker: partNumberMarker,
    };

    const data = await this.client.listParts(params);
    // REsponse Elements introduce: https://help.aliyun.com/zh/oss/developer-reference/listparts?spm=a2c4g.11186623.help-menu-31815.d_5_1_6_1_6.7ee260ffBA8WMy
    // The ListParts interface is used to list all successfully uploaded Parts that belong to the specified Upload ID.

    let parts = data.Parts ?? [];

    if (data.IsTruncated) {
      // If not all results are returned this time, 
      // the response request will contain a NextPartNumberMarker element to mark the next requested PartNumberMarker value.
      const rest = await this.retrieveParts(id, data.NextPartNumberMarker);
      parts = [...parts, ...rest];
    }

    if (!partNumberMarker) {
      parts.sort((a, b) => a.PartNumber! - b.PartNumber!);
    }

    return parts;
  }

  // ...
  protected async getIncompletePart(id: string): Promise<Readable | undefined> {
    // what's the different between incompletePart & complete part
    // :incompletePart: 
    //   is the last data block in the upload process that has not reached the minimum slice size and cannot be uploaded as regular OSS slice
    //   will be stored in OSS as a object, name should be "xxx.part".
    //   caused by upload interrupted.
    // :completePart:
    //   is the data block that has been successfully uploaded to an OSS Multipart Upload.
    //   OSS: The minimum size for all but the last Part is 5MB; the last Part has no size limit.
    //   https://doc.oss.aliyuncs.com/#_Toc336676775
    try {
      const data = await this.client.getObject({
        Bucket: this.bucket,
        Key: this.partKey(id, true), // key: xxxxx.part
      });
      // get .part from oss
      return data.Body as Readable;
    } catch (error) {
      if (error instanceof NoSuchKey) {
        return undefined;
      }
      throw error;
    }
  }

  // ...
  protected async getIncompletePartSize(
    id: string
  ): Promise<number | undefined> {
    try {
      // more info about headObject:
      // https://help.aliyun.com/zh/oss/developer-reference/headobject?spm=a2c4g.11186623.help-menu-31815.d_5_1_6_0_6.5e406534c0QjAe
      // The HeadObject interface is used to get metadata about a file (Object). Using this interface does not return the contents of the file.
      const data = await this.client.headObject({
        Bucket: this.bucket,
        Key: this.partKey(id, true), // like: xxx.part
      });
      return data.ContentLength;
    } catch (error) {
      if (error instanceof NotFound) {
        return undefined;
      }
      throw error;
    }
  }

  // ...
  public async getUpload(id: string): Promise<Upload> {
    metadata = await this.getMetadata(id);
    // ...
    const parts = await this.retrieveParts(id);
    offset = calcOffsetFromParts(parts);

    // ...
    const incompletePartSize = await this.getIncompletePartSize(id);
    return new Upload({...})
  }

  // ...
  public async write() {
    const metadata = await this.getMetadata(id);
    const parts = await this.retrieveParts(id);
    const partNumber: number = parts.length > 0 ? parts[parts.length - 1].PartNumber! : 0;
    const nextPartNumber = partNumber + 1;
    const incompletePart = await this.downloadIncompletePart(id);
    // downloadIncompletePart => getIncompletePart() to get parts and write to a temp file that named by 'tus-s3-incomplete-part-xxxx'.
    const requestedOffset = offset;
    if (incompletePart) {
      await this.deleteIncompletePart(id);
      // invoke client.deleteObject(xxx.part) to delete parts in oss
      // ...
    }

    // ...
    await this.uploadParts() // upload parts to oss
    // The size of the incomplete part should not be counted, because the
    // process of the incomplete part should be fully transparent to the user.
    const newOffset = requestedOffset + bytesUploaded - (incompletePart?.size ?? 0);
    if (metadata.file.size === newOffset) {
      const parts = await this.retrieveParts(id);
      await this.finishMultipartUpload(metadata, parts);
      // finishMultipartUpload will invoke client.completeMultipartUpload to merge parts to object in oss
      await this.completeMetadata(metadata.file);
      // ...
    }
  }
}

function calcOffsetFromParts(parts?: Array<AWS.Part>) {
  return parts && parts.length > 0 ? parts.reduce((a, b) => a + b.Size, 0) : 0;
}

3. Client send a HEAD request

This requst was sent when upload got interrupted or failed for another reason.

Let's see what happened in HeadHandler.ts:

export class HeadHandler extends BaseHandler {
  async send(...) {
    const id = this.getFileIdFromRequest(req)
    // ...
    file = await this.store.getUpload(id) // explained above
    // ...
    // response headers contains: Cache-Control, Upload-Offset, Upload-Length, Upload-Metadata ...
  }
}

4. Client send a DELETE request

DELETE request will be sent when the client wants to delete an upload.

Let's see what happened in DeleteHandler.ts:

export class DeleteHandler extends BaseHandler {
  async send() {
    const id = this.getFileIdFromRequest(req)
    // ...
    const upload = await this.store.getUpload(id)
    await this.store.remove(id)
  }
}
// s3-store/index.ts
export class S3Store extends DataStore {
  remove(id: string) {
    try {
      const { "upload-id": uploadId } = await this.getMetadata(id);
      if (uploadId) {
        // more info about abortMultipartUpload: 
        // https://help.aliyun.com/zh/oss/developer-reference/abortmultipartupload?spm=a2c4g.11186623.0.0.24192382SnrAWr#reference-txp-bvx-wdb
        // AbortMultipartUpload used to cancel MultipartUpload event and delete related Parts.
        await this.client.abortMultipartUpload({
          Bucket: this.bucket,
          Key: id,
          UploadId: uploadId,
        });
      }
    }

    // ...
    // remove xxx file and xxx.info file
    await this.client.deleteObjects({
      Bucket: this.bucket,
      Delete: {
        Objects: [{ Key: id }, { Key: this.infoKey(id) }],
      },
    });
    // ...
  }
}

TODO: other requests

Comments

Sign in to leave a comment

Latest

No comments yet