/*! @license
* Shaka Player
* Copyright 2025 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
goog.provide('shaka.msf.ControlStream');
goog.require('shaka.log');
goog.require('shaka.msf.BufferControlWriter');
goog.require('shaka.msf.Utils');
goog.require('shaka.util.Mutex');
goog.requireType('shaka.config.MsfFilterType');
goog.requireType('shaka.msf.Reader');
goog.requireType('shaka.msf.Writer');
shaka.msf.ControlStream = class {
/**
* @param {!shaka.msf.Reader} reader
* @param {!shaka.msf.Writer} writer
* @param {!shaka.msf.Utils.Version} version
*/
constructor(reader, writer, version) {
/** @private {!shaka.msf.ControlStreamDecoder} */
this.decoder_ = new shaka.msf.ControlStreamDecoder(reader, version);
/** @private {!shaka.msf.ControlStreamEncoder} */
this.encoder_ = new shaka.msf.ControlStreamEncoder(writer, version);
/** @private {!shaka.util.Mutex} */
this.mutex_ = new shaka.util.Mutex();
}
/**
* Will error if two messages are read at once.
*
* @return {!Promise<shaka.msf.Utils.Message>}
*/
async receive() {
const message = await this.decoder_.message();
return message;
}
/**
* @param {shaka.msf.Utils.Message} msg
* @return {!Promise}
*/
async send(msg) {
await this.mutex_.acquire('ControlStream.send');
try {
shaka.log.debug('Sending control message:', msg);
await this.encoder_.message(msg);
} finally {
this.mutex_.release();
}
}
};
shaka.msf.ControlStreamDecoder = class {
/**
* @param {!shaka.msf.Reader} reader
* @param {!shaka.msf.Utils.Version} version
*/
constructor(reader, version) {
/** @private {!shaka.msf.Reader} */
this.reader_ = reader;
/** @private {!shaka.msf.Utils.Version} */
this.version_ = version;
}
/**
* @return {!Promise<shaka.msf.Utils.MessageType>}
* @private
*/
async messageType_() {
const type = await this.reader_.u53();
// Read the 16-bit MSB length field
const lengthBytes = await this.reader_.read(2);
const messageLength = (lengthBytes[0] << 8) | lengthBytes[1]; // MSB format
shaka.log.v1(`Raw message type: 0x${type.toString(16)}`,
`Message length (16-bit MSB): ${messageLength} bytes,
actual length: ${this.reader_.getByteLength()}`);
let msgType;
switch (type) {
case shaka.msf.Utils.MessageTypeId.GOAWAY:
msgType = shaka.msf.Utils.MessageType.GOAWAY;
break;
case shaka.msf.Utils.MessageTypeId.MAX_REQUEST_ID:
msgType = shaka.msf.Utils.MessageType.MAX_REQUEST_ID;
break;
case shaka.msf.Utils.MessageTypeId.REQUESTS_BLOCKED:
msgType = shaka.msf.Utils.MessageType.REQUESTS_BLOCKED;
break;
case shaka.msf.Utils.MessageTypeId.SUBSCRIBE:
msgType = shaka.msf.Utils.MessageType.SUBSCRIBE;
break;
case shaka.msf.Utils.MessageTypeId.SUBSCRIBE_OK:
msgType = shaka.msf.Utils.MessageType.SUBSCRIBE_OK;
break;
case shaka.msf.Utils.MessageTypeId.SUBSCRIBE_ERROR:
msgType = shaka.msf.Utils.MessageType.SUBSCRIBE_ERROR;
break;
case shaka.msf.Utils.MessageTypeId.SUBSCRIBE_UPDATE:
msgType = shaka.msf.Utils.MessageType.SUBSCRIBE_UPDATE;
break;
case shaka.msf.Utils.MessageTypeId.UNSUBSCRIBE:
msgType = shaka.msf.Utils.MessageType.UNSUBSCRIBE;
break;
case shaka.msf.Utils.MessageTypeId.PUBLISH_DONE:
msgType = shaka.msf.Utils.MessageType.PUBLISH_DONE;
break;
case shaka.msf.Utils.MessageTypeId.PUBLISH:
msgType = shaka.msf.Utils.MessageType.PUBLISH;
break;
case shaka.msf.Utils.MessageTypeId.PUBLISH_OK:
msgType = shaka.msf.Utils.MessageType.PUBLISH_OK;
break;
case shaka.msf.Utils.MessageTypeId.PUBLISH_ERROR:
msgType = shaka.msf.Utils.MessageType.PUBLISH_ERROR;
break;
case shaka.msf.Utils.MessageTypeId.FETCH:
msgType = shaka.msf.Utils.MessageType.FETCH;
break;
case shaka.msf.Utils.MessageTypeId.FETCH_OK:
msgType = shaka.msf.Utils.MessageType.FETCH_OK;
break;
case shaka.msf.Utils.MessageTypeId.FETCH_ERROR:
msgType = shaka.msf.Utils.MessageType.FETCH_ERROR;
break;
case shaka.msf.Utils.MessageTypeId.FETCH_CANCEL:
msgType = shaka.msf.Utils.MessageType.FETCH_CANCEL;
break;
case shaka.msf.Utils.MessageTypeId.TRACK_STATUS:
msgType = shaka.msf.Utils.MessageType.TRACK_STATUS;
break;
case shaka.msf.Utils.MessageTypeId.TRACK_STATUS_OK:
msgType = shaka.msf.Utils.MessageType.TRACK_STATUS_OK;
break;
case shaka.msf.Utils.MessageTypeId.TRACK_STATUS_ERROR:
msgType = shaka.msf.Utils.MessageType.TRACK_STATUS_ERROR;
break;
case shaka.msf.Utils.MessageTypeId.PUBLISH_NAMESPACE:
msgType = shaka.msf.Utils.MessageType.PUBLISH_NAMESPACE;
break;
case shaka.msf.Utils.MessageTypeId.PUBLISH_NAMESPACE_OK:
msgType = shaka.msf.Utils.MessageType.PUBLISH_NAMESPACE_OK;
break;
case shaka.msf.Utils.MessageTypeId.PUBLISH_NAMESPACE_ERROR:
msgType = shaka.msf.Utils.MessageType.PUBLISH_NAMESPACE_ERROR;
break;
case shaka.msf.Utils.MessageTypeId.PUBLISH_NAMESPACE_DONE:
msgType = shaka.msf.Utils.MessageType.PUBLISH_NAMESPACE_DONE;
break;
case shaka.msf.Utils.MessageTypeId.PUBLISH_NAMESPACE_CANCEL:
msgType = shaka.msf.Utils.MessageType.PUBLISH_NAMESPACE_CANCEL;
break;
case shaka.msf.Utils.MessageTypeId.SUBSCRIBE_NAMESPACE:
msgType = shaka.msf.Utils.MessageType.SUBSCRIBE_NAMESPACE;
break;
case shaka.msf.Utils.MessageTypeId.SUBSCRIBE_NAMESPACE_OK:
msgType = shaka.msf.Utils.MessageType.SUBSCRIBE_NAMESPACE_OK;
break;
case shaka.msf.Utils.MessageTypeId.SUBSCRIBE_NAMESPACE_ERROR:
msgType = shaka.msf.Utils.MessageType.SUBSCRIBE_NAMESPACE_ERROR;
break;
case shaka.msf.Utils.MessageTypeId.UNSUBSCRIBE_NAMESPACE:
msgType = shaka.msf.Utils.MessageType.UNSUBSCRIBE_NAMESPACE;
break;
default:
throw new Error(`Unknown message type: 0x${type.toString(16)}`);
}
shaka.log.v1(`Parsed message type: ${msgType} (0x${type.toString(16)})`);
return msgType;
}
/**
* @return {!Promise<shaka.msf.Utils.Message>}
*/
async message() {
const type = await this.messageType_();
/** @type {shaka.msf.Utils.Message} */
let result;
switch (type) {
case shaka.msf.Utils.MessageType.GOAWAY:
result = await this.goaway_();
break;
case shaka.msf.Utils.MessageType.MAX_REQUEST_ID:
result = await this.maxRequestId_();
break;
case shaka.msf.Utils.MessageType.REQUESTS_BLOCKED:
result = await this.requestsBlocked_();
break;
case shaka.msf.Utils.MessageType.SUBSCRIBE:
result = await this.subscribe_();
break;
case shaka.msf.Utils.MessageType.SUBSCRIBE_OK:
result = await this.subscribeOk_();
break;
case shaka.msf.Utils.MessageType.SUBSCRIBE_ERROR:
result = await this.subscribeError_();
break;
case shaka.msf.Utils.MessageType.SUBSCRIBE_UPDATE:
result = await this.subscribeUpdate_();
break;
case shaka.msf.Utils.MessageType.UNSUBSCRIBE:
result = await this.unsubscribe_();
break;
case shaka.msf.Utils.MessageType.PUBLISH_DONE:
result = await this.publishDone_();
break;
case shaka.msf.Utils.MessageType.PUBLISH:
result = await this.publish_();
break;
case shaka.msf.Utils.MessageType.PUBLISH_OK:
result = await this.publishOk_();
break;
case shaka.msf.Utils.MessageType.PUBLISH_ERROR:
result = await this.publishError_();
break;
case shaka.msf.Utils.MessageType.FETCH:
throw new Error(`Unsupported message type: ${type}`);
case shaka.msf.Utils.MessageType.FETCH_OK:
result = await this.fetchOk_();
break;
case shaka.msf.Utils.MessageType.FETCH_ERROR:
result = await this.fetchError_();
break;
case shaka.msf.Utils.MessageType.FETCH_CANCEL:
case shaka.msf.Utils.MessageType.TRACK_STATUS:
case shaka.msf.Utils.MessageType.TRACK_STATUS_OK:
case shaka.msf.Utils.MessageType.TRACK_STATUS_ERROR:
throw new Error(`Unsupported message type: ${type}`);
case shaka.msf.Utils.MessageType.PUBLISH_NAMESPACE:
result = await this.publishNamespace_();
break;
case shaka.msf.Utils.MessageType.PUBLISH_NAMESPACE_OK:
result = await this.publishNamespaceOk_();
break;
case shaka.msf.Utils.MessageType.PUBLISH_NAMESPACE_ERROR:
result = await this.publishNamespaceError_();
break;
case shaka.msf.Utils.MessageType.PUBLISH_NAMESPACE_DONE:
result = await this.publishNamespaceDone_();
break;
case shaka.msf.Utils.MessageType.PUBLISH_NAMESPACE_CANCEL:
result = await this.publishNamespaceCancel_();
break;
case shaka.msf.Utils.MessageType.SUBSCRIBE_NAMESPACE:
result = await this.subscribeNamespace_();
break;
case shaka.msf.Utils.MessageType.SUBSCRIBE_NAMESPACE_OK:
result = await this.subscribeNamespaceOk_();
break;
case shaka.msf.Utils.MessageType.SUBSCRIBE_NAMESPACE_ERROR:
result = await this.subscribeNamespaceError_();
break;
case shaka.msf.Utils.MessageType.UNSUBSCRIBE_NAMESPACE:
result = await this.unsubscribeNamespace_();
break;
default:
throw new Error(`Unsupported message type: ${type}`);
}
shaka.log.debug(`Successfully parsed ${type} message:`, result);
return result;
}
/**
* Read params using the appropriate decoding for the current version
* @return {!Promise<!Array<shaka.msf.Utils.KeyValuePair>>}
* @private
*/
readParams_() {
if (shaka.msf.Utils.isDraft16(this.version_)) {
return this.reader_.deltaKeyValuePairs();
}
return this.reader_.keyValuePairs();
}
/**
* @param {!Array<shaka.msf.Utils.KeyValuePair>} params
* @param {bigint} key
* @param {bigint} defaultValue
* @return {bigint}
* @private
*/
findParamVarInt_(params, key, defaultValue) {
const p = params.find((p) => p.type === key);
if (p && typeof p.value === 'bigint') {
return p.value;
}
return defaultValue;
}
/**
* @param {!Array<shaka.msf.Utils.KeyValuePair>} params
* @param {bigint} key
* @return {Uint8Array}
* @private
*/
findParamBytes_(params, key) {
const p = params.find((p) => p.type === key);
if (p && ArrayBuffer.isView(p.value)) {
const bytes = /** @type {!Uint8Array} */ (p.value);
return bytes;
}
return null;
}
/**
* @param {number} orderCode
* @return {!shaka.msf.Utils.GroupOrder}
* @private
*/
parseGroupOrder_(orderCode) {
switch (orderCode) {
case 0:
return shaka.msf.Utils.GroupOrder.PUBLISHER;
case 1:
return shaka.msf.Utils.GroupOrder.ASCENDING;
case 2:
return shaka.msf.Utils.GroupOrder.DESCENDING;
default:
throw new Error(`Invalid GroupOrder value: ${orderCode}`);
}
}
/**
* @param {!Uint8Array} bytes
* @return {!shaka.msf.Utils.Location}
* @private
*/
parseLocationFromBytes_(bytes) {
let offset = 0;
const {value: group, bytesRead: gb} =
this.decodeVarIntFromBytes_(bytes, offset);
offset += gb;
const {value: object} = this.decodeVarIntFromBytes_(bytes, offset);
return {group, object};
}
/**
* @param {!Uint8Array} bytes
* @param {number} offset
* @return {!{ value: bigint, bytesRead: number }}
* @private
*/
decodeVarIntFromBytes_(bytes, offset) {
const first = bytes[offset];
const prefix = first >> 6;
let length;
switch (prefix) {
case 0:
length = 1;
break;
case 1:
length = 2;
break;
case 2:
length = 4;
break;
case 3:
length = 8;
break;
default:
throw new Error(`Invalid var int prefix: ${prefix}`);
}
let value = BigInt(first & 0x3f);
for (let i = 1; i < length; i++) {
value = (value << BigInt(8)) | BigInt(bytes[offset + i]);
}
return {value, bytesRead: length};
}
/**
* @return {!Promise<shaka.msf.Utils.GroupOrder>}
* @private
*/
async decodeGroupOrder_() {
const orderCode = await this.reader_.u8();
shaka.log.debug(`Raw group order code: ${orderCode}`);
switch (orderCode) {
case 0:
return shaka.msf.Utils.GroupOrder.PUBLISHER;
case 1:
return shaka.msf.Utils.GroupOrder.ASCENDING;
case 2:
return shaka.msf.Utils.GroupOrder.DESCENDING;
default:
throw new Error(`Invalid GroupOrder value: ${orderCode}`);
}
}
/**
* @return {!Promise<shaka.msf.Utils.Location>}
* @private
*/
async location_() {
return {
group: await this.reader_.u62(),
object: await this.reader_.u62(),
};
}
/**
* @return {!Promise<shaka.msf.Utils.Goaway>}
* @private
*/
async goaway_() {
const newSessionUri = await this.reader_.string();
return {
kind: shaka.msf.Utils.MessageType.GOAWAY,
newSessionUri,
};
}
/**
* @return {!Promise<shaka.msf.Utils.MaxRequestId>}
* @private
*/
async maxRequestId_() {
const requestId = await this.reader_.u62();
return {
kind: shaka.msf.Utils.MessageType.MAX_REQUEST_ID,
requestId,
};
}
/**
* @return {!Promise<shaka.msf.Utils.RequestsBlocked>}
* @private
*/
async requestsBlocked_() {
const maximumRequestId = await this.reader_.u62();
return {
kind: shaka.msf.Utils.MessageType.REQUESTS_BLOCKED,
maximumRequestId,
};
}
/**
* @return {!Promise<shaka.msf.Utils.Subscribe>}
* @private
*/
async subscribe_() {
const requestId = await this.reader_.u62();
const namespace = await this.reader_.tuple();
const name = await this.reader_.string();
const subscriberPriority = await this.reader_.u8();
const groupOrder = await this.decodeGroupOrder_();
const forward = await this.reader_.u8Bool();
const filterType = /** @type {shaka.msf.Utils.FilterType} */(
await this.reader_.u8());
let startLocation;
if (filterType === shaka.msf.Utils.FilterType.ABSOLUTE_START ||
filterType === shaka.msf.Utils.FilterType.ABSOLUTE_RANGE) {
startLocation = await this.location_();
}
let endGroup;
if (filterType === shaka.msf.Utils.FilterType.ABSOLUTE_RANGE) {
endGroup = await this.reader_.u62();
}
const params = await this.reader_.keyValuePairs();
return {
kind: shaka.msf.Utils.MessageType.SUBSCRIBE,
requestId,
namespace,
name,
subscriberPriority,
groupOrder,
forward,
filterType: /** @type {shaka.config.MsfFilterType} */(filterType),
startLocation,
endGroup,
params,
};
}
/**
* @return {!Promise<shaka.msf.Utils.SubscribeOk>}
* @private
*/
subscribeOk_() {
if (shaka.msf.Utils.isDraft16(this.version_)) {
return this.subscribeOkDraft16_();
}
return this.subscribeOkDraft14_();
}
/**
* @return {!Promise<shaka.msf.Utils.SubscribeOk>}
* @private
*/
async subscribeOkDraft14_() {
const requestId = await this.reader_.u62();
const trackAlias = await this.reader_.u62();
const expires = await this.reader_.u62();
const groupOrder = await this.decodeGroupOrder_();
const contentExists = await this.reader_.u8Bool();
let largest;
if (contentExists) {
largest = await this.location_();
}
const params = await this.reader_.keyValuePairs();
return {
kind: shaka.msf.Utils.MessageType.SUBSCRIBE_OK,
requestId,
trackAlias,
expires,
groupOrder,
contentExists,
largest,
params,
};
}
/**
* @return {!Promise<shaka.msf.Utils.SubscribeOk>}
* @private
*/
async subscribeOkDraft16_() {
const requestId = await this.reader_.u62();
const trackAlias = await this.reader_.u62();
const params = await this.reader_.deltaKeyValuePairs();
const PARAM_EXPIRES = BigInt(0x08);
const PARAM_LARGEST_OBJECT = BigInt(0x09);
const PARAM_GROUP_ORDER = BigInt(0x22);
const expires =
this.findParamVarInt_(params, PARAM_EXPIRES, BigInt(0));
const groupOrderVal =
this.findParamVarInt_(params, PARAM_GROUP_ORDER, BigInt(0));
const groupOrder =
this.parseGroupOrder_(Number(groupOrderVal));
const largestBytes =
this.findParamBytes_(params, PARAM_LARGEST_OBJECT);
let contentExists = false;
let largest;
if (largestBytes && largestBytes.length > 0) {
contentExists = true;
largest = this.parseLocationFromBytes_(largestBytes);
}
// TODO: read track extensions (currently skip any remaining bytes)
return {
kind: shaka.msf.Utils.MessageType.SUBSCRIBE_OK,
requestId,
trackAlias,
expires,
groupOrder,
contentExists,
largest,
params,
};
}
/**
* @return {!Promise<shaka.msf.Utils.SubscribeError>}
* @private
*/
async subscribeError_() {
const requestId = await this.reader_.u62();
const code = await this.reader_.u62();
let retryInterval;
if (shaka.msf.Utils.isDraft16(this.version_)) {
retryInterval = await this.reader_.u62();
}
const reason = await this.reader_.string();
return {
kind: shaka.msf.Utils.MessageType.SUBSCRIBE_ERROR,
requestId,
code,
retryInterval,
reason,
};
}
/**
* @return {!Promise<shaka.msf.Utils.SubscribeUpdate>}
* @private
*/
async subscribeUpdate_() {
const requestId = await this.reader_.u62();
const subscriptionRequestId = await this.reader_.u62();
const startLocation = await this.location_();
const endGroup = await this.reader_.u62();
const subscriberPriority = await this.reader_.u8();
const forward = await this.reader_.u8Bool();
const params = await this.reader_.keyValuePairs();
return {
kind: shaka.msf.Utils.MessageType.SUBSCRIBE_UPDATE,
requestId,
subscriptionRequestId,
startLocation,
endGroup,
subscriberPriority,
forward,
params,
};
}
/**
* @return {!Promise<shaka.msf.Utils.Unsubscribe>}
* @private
*/
async unsubscribe_() {
const requestId = await this.reader_.u62();
return {
kind: shaka.msf.Utils.MessageType.UNSUBSCRIBE,
requestId,
};
}
/**
* @return {!Promise<shaka.msf.Utils.PublishDone>}
* @private
*/
async publishDone_() {
const requestId = await this.reader_.u62();
const code = await this.reader_.u62();
const streamCount = await this.reader_.u53();
const reason = await this.reader_.string();
return {
kind: shaka.msf.Utils.MessageType.PUBLISH_DONE,
requestId,
code,
streamCount,
reason,
};
}
/**
* @return {!Promise<shaka.msf.Utils.Publish>}
* @private
*/
async publish_() {
const requestId = await this.reader_.u62();
const namespace = await this.reader_.tuple();
const name = await this.reader_.string();
const trackAlias = await this.reader_.u62();
const groupOrder = await this.decodeGroupOrder_();
const contentExists = await this.reader_.u8Bool();
let largestLocation;
if (contentExists) {
largestLocation = await this.location_();
}
const forward = await this.reader_.u8Bool();
const params = await this.reader_.keyValuePairs();
return {
kind: shaka.msf.Utils.MessageType.PUBLISH,
requestId,
namespace,
name,
trackAlias,
groupOrder,
contentExists,
largestLocation,
forward,
params,
};
}
/**
* @return {!Promise<shaka.msf.Utils.PublishOk>}
* @private
*/
async publishOk_() {
const requestId = await this.reader_.u62();
const forward = await this.reader_.u8Bool();
const subscriberPriority = await this.reader_.u8();
const groupOrder = await this.decodeGroupOrder_();
const filterType = /** @type {shaka.msf.Utils.FilterType} */(
await this.reader_.u8());
let startLocation;
if (filterType === shaka.msf.Utils.FilterType.ABSOLUTE_START ||
filterType === shaka.msf.Utils.FilterType.ABSOLUTE_RANGE) {
startLocation = await this.location_();
}
let endGroup;
if (filterType === shaka.msf.Utils.FilterType.ABSOLUTE_RANGE) {
endGroup = await this.reader_.u62();
}
const params = await this.reader_.keyValuePairs();
return {
kind: shaka.msf.Utils.MessageType.PUBLISH_OK,
requestId,
forward,
subscriberPriority,
groupOrder,
filterType,
startLocation,
endGroup,
params,
};
}
/**
* @return {!Promise<shaka.msf.Utils.PublishError>}
* @private
*/
async publishError_() {
const requestId = await this.reader_.u62();
const code = await this.reader_.u62();
let retryInterval;
if (shaka.msf.Utils.isDraft16(this.version_)) {
retryInterval = await this.reader_.u62();
}
const reason = await this.reader_.string();
return {
kind: shaka.msf.Utils.MessageType.PUBLISH_ERROR,
requestId,
code,
retryInterval,
reason,
};
}
/**
* @return {!Promise<shaka.msf.Utils.FetchOk>}
* @private
*/
async fetchOk_() {
const requestId = await this.reader_.u62();
const groupOrder = await this.decodeGroupOrder_();
const endOfTrack = await this.reader_.u8();
const endGroup = await this.reader_.u62();
const endObject = await this.reader_.u62();
const params = await this.reader_.keyValuePairs();
return {
kind: shaka.msf.Utils.MessageType.FETCH_OK,
requestId,
groupOrder,
endOfTrack,
endGroup,
endObject,
params,
};
}
/**
* @return {!Promise<shaka.msf.Utils.FetchError>}
* @private
*/
async fetchError_() {
const requestId = await this.reader_.u62();
const code = await this.reader_.u62();
let retryInterval;
if (shaka.msf.Utils.isDraft16(this.version_)) {
retryInterval = await this.reader_.u62();
}
const reason = await this.reader_.string();
return {
kind: shaka.msf.Utils.MessageType.FETCH_ERROR,
requestId,
code,
retryInterval,
reason,
};
}
/**
* @return {!Promise<shaka.msf.Utils.PublishNamespace>}
* @private
*/
async publishNamespace_() {
const requestId = await this.reader_.u62();
const namespace = await this.reader_.tuple();
const params = await this.readParams_();
return {
kind: shaka.msf.Utils.MessageType.PUBLISH_NAMESPACE,
requestId,
namespace,
params,
};
}
/**
* @return {!Promise<shaka.msf.Utils.PublishNamespaceOk>}
* @private
*/
async publishNamespaceOk_() {
const requestId = await this.reader_.u62();
return {
kind: shaka.msf.Utils.MessageType.PUBLISH_NAMESPACE_OK,
requestId,
};
}
/**
* @return {!Promise<shaka.msf.Utils.PublishNamespaceError>}
* @private
*/
async publishNamespaceError_() {
const requestId = await this.reader_.u62();
const code = await this.reader_.u62();
let retryInterval;
if (shaka.msf.Utils.isDraft16(this.version_)) {
retryInterval = await this.reader_.u62();
}
const reason = await this.reader_.string();
return {
kind: shaka.msf.Utils.MessageType.PUBLISH_NAMESPACE_ERROR,
requestId,
code,
retryInterval,
reason,
};
}
/**
* @return {!Promise<shaka.msf.Utils.PublishNamespaceDone>}
* @private
*/
async publishNamespaceDone_() {
const namespace = await this.reader_.tuple();
return {
kind: shaka.msf.Utils.MessageType.PUBLISH_NAMESPACE_DONE,
namespace,
};
}
/**
* @return {!Promise<shaka.msf.Utils.PublishNamespaceCancel>}
* @private
*/
async publishNamespaceCancel_() {
const namespace = await this.reader_.tuple();
const code = await this.reader_.u62();
const reason = await this.reader_.string();
return {
kind: shaka.msf.Utils.MessageType.PUBLISH_NAMESPACE_CANCEL,
namespace,
code,
reason,
};
}
/**
* Parse SUBSCRIBE_NAMESPACE message. The relay sends this to ask us to
* announce any namespaces we want to publish. As a pure subscriber,
* we have nothing to announce — the message is parsed and ignored.
*
* @return {!Promise<shaka.msf.Utils.SubscribeNamespace>}
* @private
*/
async subscribeNamespace_() {
const requestId = await this.reader_.u62();
const namespace = await this.reader_.tuple();
const params = await this.reader_.keyValuePairs();
return {
kind: shaka.msf.Utils.MessageType.SUBSCRIBE_NAMESPACE,
requestId,
namespace,
params,
};
}
/**
* @return {!Promise<shaka.msf.Utils.SubscribeNamespaceOk>}
* @private
*/
async subscribeNamespaceOk_() {
const requestId = await this.reader_.u62();
return {
kind: shaka.msf.Utils.MessageType.SUBSCRIBE_NAMESPACE_OK,
requestId,
};
}
/**
* @return {!Promise<shaka.msf.Utils.SubscribeNamespaceError>}
* @private
*/
async subscribeNamespaceError_() {
const requestId = await this.reader_.u62();
const code = await this.reader_.u62();
let retryInterval;
if (shaka.msf.Utils.isDraft16(this.version_)) {
retryInterval = await this.reader_.u62();
}
const reason = await this.reader_.string();
return {
kind: shaka.msf.Utils.MessageType.SUBSCRIBE_NAMESPACE_ERROR,
requestId,
code,
retryInterval,
reason,
};
}
/**
* @return {!Promise<shaka.msf.Utils.UnsubscribeNamespace>}
* @private
*/
async unsubscribeNamespace_() {
const namespace = await this.reader_.tuple();
return {
kind: shaka.msf.Utils.MessageType.UNSUBSCRIBE_NAMESPACE,
namespace,
};
}
};
shaka.msf.ControlStreamEncoder = class {
/**
* @param {!shaka.msf.Writer} writer
* @param {!shaka.msf.Utils.Version} version
*/
constructor(writer, version) {
/** @private {!shaka.msf.Writer} */
this.writer_ = writer;
/** @private {!shaka.msf.Utils.Version} */
this.version_ = version;
}
/**
* @param {shaka.msf.Utils.Message} msg
* @return {!Promise}
*/
async message(msg) {
shaka.log.debug(`Encoding message of type: ${msg.kind}`);
// Create a BufferControlWriter to marshal the message
const writer = new shaka.msf.BufferControlWriter(this.version_);
// Marshal the message based on its type
switch (msg.kind) {
case shaka.msf.Utils.MessageType.GOAWAY:
writer.marshalGoaway(
/** @type {!shaka.msf.Utils.Goaway} */ (msg));
break;
case shaka.msf.Utils.MessageType.MAX_REQUEST_ID:
writer.marshalMaxRequestId(
/** @type {!shaka.msf.Utils.MaxRequestId} */ (msg));
break;
case shaka.msf.Utils.MessageType.REQUESTS_BLOCKED:
writer.marshalRequestsBlocked(
/** @type {!shaka.msf.Utils.RequestsBlocked} */ (msg));
break;
case shaka.msf.Utils.MessageType.SUBSCRIBE:
writer.marshalSubscribe(
/** @type {!shaka.msf.Utils.Subscribe} */ (msg));
break;
case shaka.msf.Utils.MessageType.SUBSCRIBE_OK:
writer.marshalSubscribeOk(
/** @type {!shaka.msf.Utils.SubscribeOk} */ (msg));
break;
case shaka.msf.Utils.MessageType.SUBSCRIBE_ERROR:
writer.marshalSubscribeError(
/** @type {!shaka.msf.Utils.SubscribeError} */ (msg));
break;
case shaka.msf.Utils.MessageType.SUBSCRIBE_UPDATE:
writer.marshalSubscribeUpdate(
/** @type {!shaka.msf.Utils.SubscribeUpdate} */ (msg));
break;
case shaka.msf.Utils.MessageType.UNSUBSCRIBE:
writer.marshalUnsubscribe(
/** @type {!shaka.msf.Utils.Unsubscribe} */ (msg));
break;
case shaka.msf.Utils.MessageType.PUBLISH_DONE:
writer.marshalPublishDone(
/** @type {!shaka.msf.Utils.PublishDone} */ (msg));
break;
case shaka.msf.Utils.MessageType.PUBLISH:
writer.marshalPublish(
/** @type {!shaka.msf.Utils.Publish} */ (msg));
break;
case shaka.msf.Utils.MessageType.PUBLISH_OK:
writer.marshalPublishOk(
/** @type {!shaka.msf.Utils.PublishOk} */ (msg));
break;
case shaka.msf.Utils.MessageType.PUBLISH_ERROR:
writer.marshalPublishError(
/** @type {!shaka.msf.Utils.PublishError} */ (msg));
break;
case shaka.msf.Utils.MessageType.FETCH:
writer.marshalFetch(
/** @type {!shaka.msf.Utils.Fetch} */ (msg));
break;
case shaka.msf.Utils.MessageType.FETCH_OK:
writer.marshalFetchOk(
/** @type {!shaka.msf.Utils.FetchOk} */ (msg));
break;
case shaka.msf.Utils.MessageType.FETCH_ERROR:
writer.marshalFetchError(
/** @type {!shaka.msf.Utils.FetchError} */ (msg));
break;
case shaka.msf.Utils.MessageType.FETCH_CANCEL:
writer.marshalFetchCancel(
/** @type {!shaka.msf.Utils.FetchCancel} */ (msg));
break;
case shaka.msf.Utils.MessageType.TRACK_STATUS:
case shaka.msf.Utils.MessageType.TRACK_STATUS_OK:
case shaka.msf.Utils.MessageType.TRACK_STATUS_ERROR:
throw new Error(`Unsupported message type for encoding: ${msg.kind}`);
case shaka.msf.Utils.MessageType.PUBLISH_NAMESPACE:
writer.marshalPublishNamespace(
/** @type {!shaka.msf.Utils.PublishNamespace} */ (msg));
break;
case shaka.msf.Utils.MessageType.PUBLISH_NAMESPACE_OK:
writer.marshalPublishNamespaceOk(
/** @type {!shaka.msf.Utils.PublishNamespaceOk} */ (msg));
break;
case shaka.msf.Utils.MessageType.PUBLISH_NAMESPACE_ERROR:
writer.marshalPublishNamespaceError(
/** @type {!shaka.msf.Utils.PublishNamespaceError} */ (msg));
break;
case shaka.msf.Utils.MessageType.PUBLISH_NAMESPACE_DONE:
writer.marshalPublishNamespaceDone(
/** @type {!shaka.msf.Utils.PublishNamespaceDone} */ (msg));
break;
case shaka.msf.Utils.MessageType.PUBLISH_NAMESPACE_CANCEL:
writer.marshalPublishNamespaceCancel(
/** @type {!shaka.msf.Utils.PublishNamespaceCancel} */ (msg));
break;
case shaka.msf.Utils.MessageType.SUBSCRIBE_NAMESPACE:
writer.marshalSubscribeNamespace(
/** @type {!shaka.msf.Utils.SubscribeNamespace} */ (msg));
break;
case shaka.msf.Utils.MessageType.SUBSCRIBE_NAMESPACE_OK:
writer.marshalSubscribeNamespaceOk(
/** @type {!shaka.msf.Utils.SubscribeNamespaceOk} */ (msg));
break;
case shaka.msf.Utils.MessageType.SUBSCRIBE_NAMESPACE_ERROR:
writer.marshalSubscribeNamespaceError(
/** @type {!shaka.msf.Utils.SubscribeNamespaceError} */ (msg));
break;
case shaka.msf.Utils.MessageType.UNSUBSCRIBE_NAMESPACE:
writer.marshalUnsubscribeNamespace(
/** @type {!shaka.msf.Utils.UnsubscribeNamespace} */ (msg));
break;
default:
throw new Error(`Unsupported message type for encoding: ${msg.kind}`);
}
// Get the marshaled bytes and write them to the output stream
const bytes = writer.getBytes();
shaka.log.v1(
`Marshaled ${bytes.length} bytes for message type: ${msg.kind}`);
// Write the bytes directly to the output stream
await this.writer_.write(bytes);
}
};