Socket
Socket
Sign inDemoInstall

stream-chat

Package Overview
Dependencies
Maintainers
10
Versions
293
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

stream-chat - npm Package Compare versions

Comparing version 8.39.0 to 8.40.0

dist/types/store.d.ts

9

dist/types/channel_state.d.ts

@@ -66,9 +66,8 @@ import { Channel } from './channel';

/**
* formatMessage - Takes the message object. Parses the dates, sets __html
* and sets the status to received if missing. Returns a message object
* Takes the message object, parses the dates, sets `__html`
* and sets the status to `received` if missing; returns a new message object.
*
* @param {MessageResponse<StreamChatGenerics>} message a message object
*
* @param {MessageResponse<StreamChatGenerics>} message `MessageResponse` object
*/
formatMessage(message: MessageResponse<StreamChatGenerics>): FormatMessageResponse<StreamChatGenerics>;
formatMessage: (message: MessageResponse<StreamChatGenerics>) => FormatMessageResponse<StreamChatGenerics>;
/**

@@ -75,0 +74,0 @@ * addMessagesSorted - Add the list of messages to state and resorts the messages

@@ -329,3 +329,3 @@ /// <reference types="node" />

*/
lastMessage(): FormatMessageResponse<StreamChatGenerics>;
lastMessage(): FormatMessageResponse<StreamChatGenerics> | undefined;
/**

@@ -364,4 +364,6 @@ * markRead - Send the mark read event for this user, only works if the `read_events` setting is enabled

/**
* getReplies - List the message replies for a parent message
* getReplies - List the message replies for a parent message.
*
* The recommended way of working with threads is to use the Thread class.
*
* @param {string} parent_id The message parent id, ie the top of the thread

@@ -368,0 +370,0 @@ * @param {MessagePaginationOptions & { user?: UserResponse<StreamChatGenerics>; user_id?: string }} options Pagination params, ie {limit:10, id_lte: 10}

@@ -7,2 +7,3 @@ export * from './base64';

export * from './thread';
export * from './thread_manager';
export * from './connection';

@@ -19,2 +20,3 @@ export * from './events';

export { isOwnUser, chatCodes, logChatPromiseExecution, formatMessage } from './utils';
export * from './store';
//# sourceMappingURL=index.d.ts.map

@@ -1,34 +0,98 @@

import { StreamChat } from './client';
import { DefaultGenerics, ExtendableGenerics, MessageResponse, ThreadResponse, ChannelResponse, FormatMessageResponse, ReactionResponse, UserResponse } from './types';
declare type ThreadReadStatus<StreamChatGenerics extends ExtendableGenerics = DefaultGenerics> = Record<string, {
last_read: Date;
last_read_message_id: string;
unread_messages: number;
user: UserResponse<StreamChatGenerics>;
}>;
export declare class Thread<StreamChatGenerics extends ExtendableGenerics = DefaultGenerics> {
id: string;
latestReplies: FormatMessageResponse<StreamChatGenerics>[];
participants: ThreadResponse['thread_participants'];
message: FormatMessageResponse<StreamChatGenerics>;
channel: ChannelResponse<StreamChatGenerics>;
_channel: ReturnType<StreamChat<StreamChatGenerics>['channel']>;
replyCount: number;
_client: StreamChat<StreamChatGenerics>;
read: ThreadReadStatus<StreamChatGenerics>;
data: Record<string, any>;
constructor(client: StreamChat<StreamChatGenerics>, t: ThreadResponse<StreamChatGenerics>);
getClient(): StreamChat<StreamChatGenerics>;
import type { Channel } from './channel';
import type { StreamChat } from './client';
import { StateStore } from './store';
import type { AscDesc, DefaultGenerics, ExtendableGenerics, FormatMessageResponse, MessagePaginationOptions, MessageResponse, ThreadResponse, UserResponse } from './types';
declare type QueryRepliesOptions<SCG extends ExtendableGenerics> = {
sort?: {
created_at: AscDesc;
}[];
} & MessagePaginationOptions & {
user?: UserResponse<SCG>;
user_id?: string;
};
export declare type ThreadState<SCG extends ExtendableGenerics = DefaultGenerics> = {
/**
* addReply - Adds or updates a latestReplies to the thread
*
* @param {MessageResponse<StreamChatGenerics>} message reply message to be added.
* Determines if the thread is currently opened and on-screen. When the thread is active,
* all new messages are immediately marked as read.
*/
addReply(message: MessageResponse<StreamChatGenerics>): void;
updateReply(message: MessageResponse<StreamChatGenerics>): void;
updateMessageOrReplyIfExists(message: MessageResponse<StreamChatGenerics>): void;
addReaction(reaction: ReactionResponse<StreamChatGenerics>, message?: MessageResponse<StreamChatGenerics>, enforce_unique?: boolean): void;
removeReaction(reaction: ReactionResponse<StreamChatGenerics>, message?: MessageResponse<StreamChatGenerics>): void;
active: boolean;
channel: Channel<SCG>;
createdAt: Date;
deletedAt: Date | null;
isLoading: boolean;
isStateStale: boolean;
pagination: ThreadRepliesPagination;
/**
* Thread is identified by and has a one-to-one relation with its parent message.
* We use parent message id as a thread id.
*/
parentMessage: FormatMessageResponse<SCG>;
participants: ThreadResponse<SCG>['thread_participants'];
read: ThreadReadState;
replies: Array<FormatMessageResponse<SCG>>;
replyCount: number;
updatedAt: Date | null;
};
export declare type ThreadRepliesPagination = {
isLoadingNext: boolean;
isLoadingPrev: boolean;
nextCursor: string | null;
prevCursor: string | null;
};
export declare type ThreadUserReadState<SCG extends ExtendableGenerics = DefaultGenerics> = {
lastReadAt: Date;
unreadMessageCount: number;
user: UserResponse<SCG>;
lastReadMessageId?: string;
};
export declare type ThreadReadState<SCG extends ExtendableGenerics = DefaultGenerics> = Record<string, ThreadUserReadState<SCG> | undefined>;
export declare class Thread<SCG extends ExtendableGenerics = DefaultGenerics> {
readonly state: StateStore<ThreadState<SCG>>;
readonly id: string;
private client;
private unsubscribeFunctions;
private failedRepliesMap;
constructor({ client, threadData }: {
client: StreamChat<SCG>;
threadData: ThreadResponse<SCG>;
});
get channel(): Channel<SCG>;
get hasStaleState(): boolean;
get ownUnreadCount(): number;
activate: () => void;
deactivate: () => void;
reload: () => Promise<void>;
hydrateState: (thread: Thread<SCG>) => void;
registerSubscriptions: () => void;
private subscribeMarkActiveThreadRead;
private subscribeReloadActiveStaleThread;
private subscribeMarkThreadStale;
private subscribeNewReplies;
private subscribeRepliesRead;
private subscribeReplyDeleted;
private subscribeMessageUpdated;
unregisterSubscriptions: () => void;
deleteReplyLocally: ({ message }: {
message: MessageResponse<SCG>;
}) => void;
upsertReplyLocally: ({ message, timestampChanged, }: {
message: MessageResponse<SCG>;
timestampChanged?: boolean | undefined;
}) => void;
updateParentMessageLocally: (message: MessageResponse<SCG>) => void;
updateParentMessageOrReplyLocally: (message: MessageResponse<SCG>) => void;
markAsRead: ({ force }?: {
force?: boolean | undefined;
}) => Promise<import("./types").EventAPIResponse<SCG> | null>;
private throttledMarkAsRead;
queryReplies: ({ limit, sort, ...otherOptions }?: QueryRepliesOptions<SCG>) => Promise<import("./types").GetRepliesAPIResponse<SCG>>;
loadNextPage: ({ limit }?: {
limit?: number | undefined;
}) => Promise<void>;
loadPrevPage: ({ limit }?: {
limit?: number | undefined;
}) => Promise<void>;
private loadPage;
}
export {};
//# sourceMappingURL=thread.d.ts.map

@@ -48,10 +48,36 @@ /// <reference types="node" />

/**
* formatMessage - Takes the message object. Parses the dates, sets __html
* and sets the status to received if missing. Returns a message object
* Takes the message object, parses the dates, sets `__html`
* and sets the status to `received` if missing; returns a new message object.
*
* @param {MessageResponse<StreamChatGenerics>} message a message object
*
* @param {MessageResponse<StreamChatGenerics>} message `MessageResponse` object
*/
export declare function formatMessage<StreamChatGenerics extends ExtendableGenerics = DefaultGenerics>(message: MessageResponse<StreamChatGenerics>): FormatMessageResponse<StreamChatGenerics>;
export declare function addToMessageList<StreamChatGenerics extends ExtendableGenerics = DefaultGenerics>(messages: Array<FormatMessageResponse<StreamChatGenerics>>, message: FormatMessageResponse<StreamChatGenerics>, timestampChanged?: boolean, sortBy?: 'pinned_at' | 'created_at', addIfDoesNotExist?: boolean): FormatMessageResponse<StreamChatGenerics>[];
export declare function formatMessage<StreamChatGenerics extends ExtendableGenerics = DefaultGenerics>(message: MessageResponse<StreamChatGenerics> | FormatMessageResponse<StreamChatGenerics>): FormatMessageResponse<StreamChatGenerics>;
export declare const findIndexInSortedArray: <T, L>({ needle, sortedArray, selectValueToCompare, sortDirection, }: {
needle: T;
sortedArray: readonly T[];
/**
* In array of objects (like messages), pick a specific
* property to compare needle value to.
*
* @example
* ```ts
* selectValueToCompare: (message) => message.created_at.getTime()
* ```
*/
selectValueToCompare?: ((arrayElement: T) => T | L) | undefined;
/**
* @default ascending
* @description
* ```md
* ascending - [1,2,3,4,5...]
* descending - [...5,4,3,2,1]
* ```
*/
sortDirection?: "ascending" | "descending" | undefined;
}) => number;
export declare function addToMessageList<T extends FormatMessageResponse>(messages: readonly T[], newMessage: T, timestampChanged?: boolean, sortBy?: 'pinned_at' | 'created_at', addIfDoesNotExist?: boolean): T[];
export declare const throttle: <T extends (...args: unknown[]) => unknown>(fn: T, timeout?: number, { leading, trailing }?: {
leading?: boolean | undefined;
trailing?: boolean | undefined;
}) => (...args: Parameters<T>) => void;
declare type MessagePaginationUpdatedParams<StreamChatGenerics extends ExtendableGenerics = DefaultGenerics> = {

@@ -58,0 +84,0 @@ parentSet: MessageSet;

{
"name": "stream-chat",
"version": "8.39.0",
"version": "8.40.0",
"description": "JS SDK for the Stream Chat API",

@@ -46,3 +46,3 @@ "author": "GetStream",

"jsonwebtoken": "~9.0.0",
"ws": "^7.4.4"
"ws": "^7.5.10"
},

@@ -99,3 +99,3 @@ "devDependencies": {

"lint-staged": "^15.2.2",
"mocha": "^9.1.3",
"mocha": "^10.7.0",
"nyc": "^15.1.0",

@@ -108,3 +108,3 @@ "prettier": "^2.2.1",

"standard-version": "^9.3.2",
"typescript": "^4.2.3",
"typescript": "4.2.3",
"uuid": "^8.3.2"

@@ -124,3 +124,3 @@ },

"eslint-fix": "npx eslint --fix '**/*.{js,md,ts}' --max-warnings 0 --ignore-path ./.eslintignore",
"test-unit": "NODE_ENV=test mocha --exit --bail --timeout 20000 --require ./babel-register test/unit/*.js",
"test-unit": "NODE_ENV=test mocha --exit --bail --timeout 20000 --require ./babel-register test/unit/*.{js,test.ts}",
"test-coverage": "nyc yarn test-unit",

@@ -139,3 +139,4 @@ "test": "yarn test-unit",

"node": ">=16"
}
},
"packageManager": "yarn@1.22.21+sha1.1959a18351b811cdeedbd484a8f86c3cc3bbaf72"
}

@@ -17,3 +17,3 @@ import { Channel } from './channel';

} from './types';
import { addToMessageList } from './utils';
import { addToMessageList, formatMessage } from './utils';
import { DEFAULT_MESSAGE_SET_PAGINATION } from './constants';

@@ -63,2 +63,3 @@

messageSets: MessageSet[] = [];
constructor(channel: Channel<StreamChatGenerics>) {

@@ -139,22 +140,8 @@ this._channel = channel;

/**
* formatMessage - Takes the message object. Parses the dates, sets __html
* and sets the status to received if missing. Returns a message object
* Takes the message object, parses the dates, sets `__html`
* and sets the status to `received` if missing; returns a new message object.
*
* @param {MessageResponse<StreamChatGenerics>} message a message object
*
* @param {MessageResponse<StreamChatGenerics>} message `MessageResponse` object
*/
formatMessage(message: MessageResponse<StreamChatGenerics>): FormatMessageResponse<StreamChatGenerics> {
return {
...message,
/**
* @deprecated please use `html`
*/
__html: message.html,
// parse the date..
pinned_at: message.pinned_at ? new Date(message.pinned_at) : null,
created_at: message.created_at ? new Date(message.created_at) : new Date(),
updated_at: message.updated_at ? new Date(message.updated_at) : new Date(),
status: message.status || 'received',
};
}
formatMessage = (message: MessageResponse<StreamChatGenerics>) => formatMessage<StreamChatGenerics>(message);

@@ -672,3 +659,3 @@ /**

type: 'deleted',
deleted_at: user.deleted_at,
deleted_at: user.deleted_at ? new Date(user.deleted_at) : null,
};

@@ -675,0 +662,0 @@ }

@@ -710,3 +710,3 @@ import { ChannelState } from './channel_state';

*/
lastMessage() {
lastMessage(): FormatMessageResponse<StreamChatGenerics> | undefined {
// get last 5 messages, sort, return the latest

@@ -828,4 +828,6 @@ // get a slice of the last 5

/**
* getReplies - List the message replies for a parent message
* getReplies - List the message replies for a parent message.
*
* The recommended way of working with threads is to use the Thread class.
*
* @param {string} parent_id The message parent id, ie the top of the thread

@@ -832,0 +834,0 @@ * @param {MessagePaginationOptions & { user?: UserResponse<StreamChatGenerics>; user_id?: string }} options Pagination params, ie {limit:10, id_lte: 10}

@@ -7,2 +7,3 @@ export * from './base64';

export * from './thread';
export * from './thread_manager';
export * from './connection';

@@ -19,1 +20,2 @@ export * from './events';

export { isOwnUser, chatCodes, logChatPromiseExecution, formatMessage } from './utils';
export * from './store';

@@ -1,142 +0,505 @@

import { StreamChat } from './client';
import {
import type { Channel } from './channel';
import type { StreamChat } from './client';
import { StateStore } from './store';
import type {
AscDesc,
DefaultGenerics,
ExtendableGenerics,
FormatMessageResponse,
MessagePaginationOptions,
MessageResponse,
ReadResponse,
ThreadResponse,
ChannelResponse,
FormatMessageResponse,
ReactionResponse,
UserResponse,
} from './types';
import { addToMessageList, formatMessage } from './utils';
import { addToMessageList, findIndexInSortedArray, formatMessage, throttle } from './utils';
type ThreadReadStatus<StreamChatGenerics extends ExtendableGenerics = DefaultGenerics> = Record<
type QueryRepliesOptions<SCG extends ExtendableGenerics> = {
sort?: { created_at: AscDesc }[];
} & MessagePaginationOptions & { user?: UserResponse<SCG>; user_id?: string };
export type ThreadState<SCG extends ExtendableGenerics = DefaultGenerics> = {
/**
* Determines if the thread is currently opened and on-screen. When the thread is active,
* all new messages are immediately marked as read.
*/
active: boolean;
channel: Channel<SCG>;
createdAt: Date;
deletedAt: Date | null;
isLoading: boolean;
isStateStale: boolean;
pagination: ThreadRepliesPagination;
/**
* Thread is identified by and has a one-to-one relation with its parent message.
* We use parent message id as a thread id.
*/
parentMessage: FormatMessageResponse<SCG>;
participants: ThreadResponse<SCG>['thread_participants'];
read: ThreadReadState;
replies: Array<FormatMessageResponse<SCG>>;
replyCount: number;
updatedAt: Date | null;
};
export type ThreadRepliesPagination = {
isLoadingNext: boolean;
isLoadingPrev: boolean;
nextCursor: string | null;
prevCursor: string | null;
};
export type ThreadUserReadState<SCG extends ExtendableGenerics = DefaultGenerics> = {
lastReadAt: Date;
unreadMessageCount: number;
user: UserResponse<SCG>;
lastReadMessageId?: string;
};
export type ThreadReadState<SCG extends ExtendableGenerics = DefaultGenerics> = Record<
string,
{
last_read: Date;
last_read_message_id: string;
unread_messages: number;
user: UserResponse<StreamChatGenerics>;
}
ThreadUserReadState<SCG> | undefined
>;
export class Thread<StreamChatGenerics extends ExtendableGenerics = DefaultGenerics> {
id: string;
latestReplies: FormatMessageResponse<StreamChatGenerics>[] = [];
participants: ThreadResponse['thread_participants'] = [];
message: FormatMessageResponse<StreamChatGenerics>;
channel: ChannelResponse<StreamChatGenerics>;
_channel: ReturnType<StreamChat<StreamChatGenerics>['channel']>;
replyCount = 0;
_client: StreamChat<StreamChatGenerics>;
read: ThreadReadStatus<StreamChatGenerics> = {};
// eslint-disable-next-line @typescript-eslint/no-explicit-any
data: Record<string, any> = {};
const DEFAULT_PAGE_LIMIT = 50;
const DEFAULT_SORT: { created_at: AscDesc }[] = [{ created_at: -1 }];
const MARK_AS_READ_THROTTLE_TIMEOUT = 1000;
constructor(client: StreamChat<StreamChatGenerics>, t: ThreadResponse<StreamChatGenerics>) {
const {
parent_message_id,
parent_message,
latest_replies,
thread_participants,
reply_count,
channel,
read,
...data
} = t;
export class Thread<SCG extends ExtendableGenerics = DefaultGenerics> {
public readonly state: StateStore<ThreadState<SCG>>;
public readonly id: string;
this.id = parent_message_id;
this.message = formatMessage(parent_message);
this.latestReplies = latest_replies.map(formatMessage);
this.participants = thread_participants;
this.replyCount = reply_count;
this.channel = channel;
this._channel = client.channel(t.channel.type, t.channel.id);
this._client = client;
if (read) {
for (const r of read) {
this.read[r.user.id] = {
...r,
last_read: new Date(r.last_read),
};
}
}
this.data = data;
private client: StreamChat<SCG>;
private unsubscribeFunctions: Set<() => void> = new Set();
private failedRepliesMap: Map<string, FormatMessageResponse<SCG>> = new Map();
constructor({ client, threadData }: { client: StreamChat<SCG>; threadData: ThreadResponse<SCG> }) {
this.state = new StateStore<ThreadState<SCG>>({
active: false,
channel: client.channel(threadData.channel.type, threadData.channel.id),
createdAt: new Date(threadData.created_at),
deletedAt: threadData.deleted_at ? new Date(threadData.deleted_at) : null,
isLoading: false,
isStateStale: false,
pagination: repliesPaginationFromInitialThread(threadData),
parentMessage: formatMessage(threadData.parent_message),
participants: threadData.thread_participants,
read: formatReadState(threadData.read ?? []),
replies: threadData.latest_replies.map(formatMessage),
replyCount: threadData.reply_count ?? 0,
updatedAt: threadData.updated_at ? new Date(threadData.updated_at) : null,
});
this.id = threadData.parent_message_id;
this.client = client;
}
getClient(): StreamChat<StreamChatGenerics> {
return this._client;
get channel() {
return this.state.getLatestValue().channel;
}
/**
* addReply - Adds or updates a latestReplies to the thread
*
* @param {MessageResponse<StreamChatGenerics>} message reply message to be added.
*/
addReply(message: MessageResponse<StreamChatGenerics>) {
if (message.parent_id !== this.message.id) {
throw new Error('Message does not belong to this thread');
}
this.latestReplies = addToMessageList(this.latestReplies, formatMessage(message), true);
get hasStaleState() {
return this.state.getLatestValue().isStateStale;
}
updateReply(message: MessageResponse<StreamChatGenerics>) {
this.latestReplies = this.latestReplies.map((m) => {
if (m.id === message.id) {
return formatMessage(message);
}
return m;
});
get ownUnreadCount() {
return ownUnreadCountSelector(this.client.userID)(this.state.getLatestValue());
}
updateMessageOrReplyIfExists(message: MessageResponse<StreamChatGenerics>) {
if (!message.parent_id && message.id !== this.message.id) {
public activate = () => {
this.state.partialNext({ active: true });
};
public deactivate = () => {
this.state.partialNext({ active: false });
};
public reload = async () => {
if (this.state.getLatestValue().isLoading) {
return;
}
if (message.parent_id && message.parent_id !== this.message.id) {
return;
this.state.partialNext({ isLoading: true });
try {
const thread = await this.client.getThread(this.id, { watch: true });
this.hydrateState(thread);
} finally {
this.state.partialNext({ isLoading: false });
}
};
if (message.parent_id && message.parent_id === this.message.id) {
this.updateReply(message);
public hydrateState = (thread: Thread<SCG>) => {
if (thread === this) {
// skip if the instances are the same
return;
}
if (!message.parent_id && message.id === this.message.id) {
this.message = formatMessage(message);
if (thread.id !== this.id) {
throw new Error("Cannot hydrate thread state with using thread's state");
}
}
addReaction(
reaction: ReactionResponse<StreamChatGenerics>,
message?: MessageResponse<StreamChatGenerics>,
enforce_unique?: boolean,
) {
if (!message) return;
const {
read,
replyCount,
replies,
parentMessage,
participants,
createdAt,
deletedAt,
updatedAt,
} = thread.state.getLatestValue();
this.latestReplies = this.latestReplies.map((m) => {
if (m.id === message.id) {
return formatMessage(
this._channel.state.addReaction(reaction, message, enforce_unique) as MessageResponse<StreamChatGenerics>,
);
// Preserve pending replies and append them to the updated list of replies
const pendingReplies = Array.from(this.failedRepliesMap.values());
this.state.partialNext({
read,
replyCount,
replies: pendingReplies.length ? replies.concat(pendingReplies) : replies,
parentMessage,
participants,
createdAt,
deletedAt,
updatedAt,
isStateStale: false,
});
};
public registerSubscriptions = () => {
if (this.unsubscribeFunctions.size) {
// Thread is already listening for events and changes
return;
}
this.unsubscribeFunctions.add(this.subscribeMarkActiveThreadRead());
this.unsubscribeFunctions.add(this.subscribeReloadActiveStaleThread());
this.unsubscribeFunctions.add(this.subscribeMarkThreadStale());
this.unsubscribeFunctions.add(this.subscribeNewReplies());
this.unsubscribeFunctions.add(this.subscribeRepliesRead());
this.unsubscribeFunctions.add(this.subscribeReplyDeleted());
this.unsubscribeFunctions.add(this.subscribeMessageUpdated());
};
private subscribeMarkActiveThreadRead = () => {
return this.state.subscribeWithSelector(
(nextValue) => [nextValue.active, ownUnreadCountSelector(this.client.userID)(nextValue)],
([active, unreadMessageCount]) => {
if (!active || !unreadMessageCount) return;
this.throttledMarkAsRead();
},
);
};
private subscribeReloadActiveStaleThread = () =>
this.state.subscribeWithSelector(
(nextValue) => [nextValue.active, nextValue.isStateStale],
([active, isStateStale]) => {
if (active && isStateStale) {
this.reload();
}
},
);
private subscribeMarkThreadStale = () =>
this.client.on('user.watching.stop', (event) => {
const { channel } = this.state.getLatestValue();
if (!this.client.userID || this.client.userID !== event.user?.id || event.channel?.cid !== channel.cid) {
return;
}
return m;
this.state.partialNext({ isStateStale: true });
}).unsubscribe;
private subscribeNewReplies = () =>
this.client.on('message.new', (event) => {
if (!this.client.userID || event.message?.parent_id !== this.id) {
return;
}
const isOwnMessage = event.message.user?.id === this.client.userID;
const { active, read } = this.state.getLatestValue();
this.upsertReplyLocally({
message: event.message,
// Message from current user could have been added optimistically,
// so the actual timestamp might differ in the event
timestampChanged: isOwnMessage,
});
if (active) {
this.throttledMarkAsRead();
}
const nextRead: ThreadReadState = {};
for (const userId of Object.keys(read)) {
const userRead = read[userId];
if (userRead) {
let nextUserRead: ThreadUserReadState = userRead;
if (userId === event.user?.id) {
// The user who just sent a message to the thread has no unread messages
// in that thread
nextUserRead = {
...nextUserRead,
lastReadAt: event.created_at ? new Date(event.created_at) : new Date(),
user: event.user,
unreadMessageCount: 0,
};
} else if (active && userId === this.client.userID) {
// Do not increment unread count for the current user in an active thread
} else {
// Increment unread count for all users except the author of the new message
nextUserRead = {
...nextUserRead,
unreadMessageCount: userRead.unreadMessageCount + 1,
};
}
nextRead[userId] = nextUserRead;
}
}
this.state.partialNext({ read: nextRead });
}).unsubscribe;
private subscribeRepliesRead = () =>
this.client.on('message.read', (event) => {
if (!event.user || !event.created_at || !event.thread) return;
if (event.thread.parent_message_id !== this.id) return;
const userId = event.user.id;
const createdAt = event.created_at;
const user = event.user;
this.state.next((current) => ({
...current,
read: {
...current.read,
[userId]: {
lastReadAt: new Date(createdAt),
user,
lastReadMessageId: event.last_read_message_id,
unreadMessageCount: 0,
},
},
}));
}).unsubscribe;
private subscribeReplyDeleted = () =>
this.client.on('message.deleted', (event) => {
if (event.message?.parent_id === this.id && event.hard_delete) {
return this.deleteReplyLocally({ message: event.message });
}
}).unsubscribe;
private subscribeMessageUpdated = () => {
const unsubscribeFunctions = ['message.updated', 'reaction.new', 'reaction.deleted'].map(
(eventType) =>
this.client.on(eventType, (event) => {
if (event.message) {
this.updateParentMessageOrReplyLocally(event.message);
}
}).unsubscribe,
);
return () => unsubscribeFunctions.forEach((unsubscribe) => unsubscribe());
};
public unregisterSubscriptions = () => {
this.unsubscribeFunctions.forEach((cleanupFunction) => cleanupFunction());
this.unsubscribeFunctions.clear();
};
public deleteReplyLocally = ({ message }: { message: MessageResponse<SCG> }) => {
const { replies } = this.state.getLatestValue();
const index = findIndexInSortedArray({
needle: formatMessage(message),
sortedArray: replies,
sortDirection: 'ascending',
selectValueToCompare: (reply) => reply.created_at.getTime(),
});
}
removeReaction(reaction: ReactionResponse<StreamChatGenerics>, message?: MessageResponse<StreamChatGenerics>) {
if (!message) return;
const actualIndex =
replies[index]?.id === message.id ? index : replies[index - 1]?.id === message.id ? index - 1 : null;
this.latestReplies = this.latestReplies.map((m) => {
if (m.id === message.id) {
return formatMessage(
this._channel.state.removeReaction(reaction, message) as MessageResponse<StreamChatGenerics>,
);
if (actualIndex === null) {
return;
}
const updatedReplies = [...replies];
updatedReplies.splice(actualIndex, 1);
this.state.partialNext({
replies: updatedReplies,
});
};
public upsertReplyLocally = ({
message,
timestampChanged = false,
}: {
message: MessageResponse<SCG>;
timestampChanged?: boolean;
}) => {
if (message.parent_id !== this.id) {
throw new Error('Reply does not belong to this thread');
}
const formattedMessage = formatMessage(message);
if (message.status === 'failed') {
// store failed reply so that it's not lost when reloading or hydrating
this.failedRepliesMap.set(formattedMessage.id, formattedMessage);
} else if (this.failedRepliesMap.has(message.id)) {
this.failedRepliesMap.delete(message.id);
}
this.state.next((current) => ({
...current,
replies: addToMessageList(current.replies, formattedMessage, timestampChanged),
}));
};
public updateParentMessageLocally = (message: MessageResponse<SCG>) => {
if (message.id !== this.id) {
throw new Error('Message does not belong to this thread');
}
this.state.next((current) => {
const formattedMessage = formatMessage(message);
const newData: typeof current = {
...current,
deletedAt: formattedMessage.deleted_at,
parentMessage: formattedMessage,
replyCount: message.reply_count ?? current.replyCount,
};
// update channel on channelData change (unlikely but handled anyway)
if (message.channel) {
newData['channel'] = this.client.channel(message.channel.type, message.channel.id, message.channel);
}
return m;
return newData;
});
}
};
public updateParentMessageOrReplyLocally = (message: MessageResponse<SCG>) => {
if (message.parent_id === this.id) {
this.upsertReplyLocally({ message });
}
if (!message.parent_id && message.id === this.id) {
this.updateParentMessageLocally(message);
}
};
public markAsRead = async ({ force = false }: { force?: boolean } = {}) => {
if (this.ownUnreadCount === 0 && !force) {
return null;
}
return await this.channel.markRead({ thread_id: this.id });
};
private throttledMarkAsRead = throttle(() => this.markAsRead(), MARK_AS_READ_THROTTLE_TIMEOUT, { trailing: true });
public queryReplies = ({
limit = DEFAULT_PAGE_LIMIT,
sort = DEFAULT_SORT,
...otherOptions
}: QueryRepliesOptions<SCG> = {}) => {
return this.channel.getReplies(this.id, { limit, ...otherOptions }, sort);
};
public loadNextPage = ({ limit = DEFAULT_PAGE_LIMIT }: { limit?: number } = {}) => {
return this.loadPage(limit);
};
public loadPrevPage = ({ limit = DEFAULT_PAGE_LIMIT }: { limit?: number } = {}) => {
return this.loadPage(-limit);
};
private loadPage = async (count: number) => {
const { pagination } = this.state.getLatestValue();
const [loadingKey, cursorKey, insertionMethodKey] =
count > 0
? (['isLoadingNext', 'nextCursor', 'push'] as const)
: (['isLoadingPrev', 'prevCursor', 'unshift'] as const);
if (pagination[loadingKey] || pagination[cursorKey] === null) return;
const queryOptions = { [count > 0 ? 'id_gt' : 'id_lt']: pagination[cursorKey] };
const limit = Math.abs(count);
this.state.partialNext({ pagination: { ...pagination, [loadingKey]: true } });
try {
const data = await this.queryReplies({ ...queryOptions, limit });
const replies = data.messages.map(formatMessage);
const maybeNextCursor = replies.at(count > 0 ? -1 : 0)?.id ?? null;
this.state.next((current) => {
let nextReplies = current.replies;
// prevent re-creating array if there's nothing to add to the current one
if (replies.length > 0) {
nextReplies = [...current.replies];
nextReplies[insertionMethodKey](...replies);
}
return {
...current,
replies: nextReplies,
pagination: {
...current.pagination,
[cursorKey]: data.messages.length < limit ? null : maybeNextCursor,
[loadingKey]: false,
},
};
});
} catch (error) {
this.client.logger('error', (error as Error).message);
this.state.next((current) => ({
...current,
pagination: {
...current.pagination,
[loadingKey]: false,
},
}));
}
};
}
const formatReadState = (read: ReadResponse[]): ThreadReadState =>
read.reduce<ThreadReadState>((state, userRead) => {
state[userRead.user.id] = {
user: userRead.user,
lastReadMessageId: userRead.last_read_message_id,
unreadMessageCount: userRead.unread_messages ?? 0,
lastReadAt: new Date(userRead.last_read),
};
return state;
}, {});
const repliesPaginationFromInitialThread = (thread: ThreadResponse): ThreadRepliesPagination => {
const latestRepliesContainsAllReplies = thread.latest_replies.length === thread.reply_count;
return {
nextCursor: null,
prevCursor: latestRepliesContainsAllReplies ? null : thread.latest_replies.at(0)?.id ?? null,
isLoadingNext: false,
isLoadingPrev: false,
};
};
const ownUnreadCountSelector = (currentUserId: string | undefined) => <
SCG extends ExtendableGenerics = DefaultGenerics
>(
state: ThreadState<SCG>,
) => (currentUserId && state.read[currentUserId]?.unreadMessageCount) || 0;

@@ -283,10 +283,9 @@ import FormData from 'form-data';

/**
* formatMessage - Takes the message object. Parses the dates, sets __html
* and sets the status to received if missing. Returns a message object
* Takes the message object, parses the dates, sets `__html`
* and sets the status to `received` if missing; returns a new message object.
*
* @param {MessageResponse<StreamChatGenerics>} message a message object
*
* @param {MessageResponse<StreamChatGenerics>} message `MessageResponse` object
*/
export function formatMessage<StreamChatGenerics extends ExtendableGenerics = DefaultGenerics>(
message: MessageResponse<StreamChatGenerics>,
message: MessageResponse<StreamChatGenerics> | FormatMessageResponse<StreamChatGenerics>,
): FormatMessageResponse<StreamChatGenerics> {

@@ -299,6 +298,7 @@ return {

__html: message.html,
// parse the date..
// parse the dates
pinned_at: message.pinned_at ? new Date(message.pinned_at) : null,
created_at: message.created_at ? new Date(message.created_at) : new Date(),
updated_at: message.updated_at ? new Date(message.updated_at) : new Date(),
deleted_at: message.deleted_at ? new Date(message.deleted_at) : null,
status: message.status || 'received',

@@ -313,5 +313,64 @@ reaction_groups: maybeGetReactionGroupsFallback(

export function addToMessageList<StreamChatGenerics extends ExtendableGenerics = DefaultGenerics>(
messages: Array<FormatMessageResponse<StreamChatGenerics>>,
message: FormatMessageResponse<StreamChatGenerics>,
export const findIndexInSortedArray = <T, L>({
needle,
sortedArray,
selectValueToCompare = (e) => e,
sortDirection = 'ascending',
}: {
needle: T;
sortedArray: readonly T[];
/**
* In array of objects (like messages), pick a specific
* property to compare needle value to.
*
* @example
* ```ts
* selectValueToCompare: (message) => message.created_at.getTime()
* ```
*/
selectValueToCompare?: (arrayElement: T) => L | T;
/**
* @default ascending
* @description
* ```md
* ascending - [1,2,3,4,5...]
* descending - [...5,4,3,2,1]
* ```
*/
sortDirection?: 'ascending' | 'descending';
}) => {
if (!sortedArray.length) return 0;
let left = 0;
let right = sortedArray.length - 1;
let middle = 0;
const recalculateMiddle = () => {
middle = Math.round((left + right) / 2);
};
const actualNeedle = selectValueToCompare(needle);
recalculateMiddle();
while (left <= right) {
// if (actualNeedle === selectValueToCompare(sortedArray[middle])) return middle;
if (
(sortDirection === 'ascending' && actualNeedle < selectValueToCompare(sortedArray[middle])) ||
(sortDirection === 'descending' && actualNeedle > selectValueToCompare(sortedArray[middle]))
) {
right = middle - 1;
} else {
left = middle + 1;
}
recalculateMiddle();
}
return left;
};
export function addToMessageList<T extends FormatMessageResponse>(
messages: readonly T[],
newMessage: T,
timestampChanged = false,

@@ -322,3 +381,3 @@ sortBy: 'pinned_at' | 'created_at' = 'created_at',

const addMessageToList = addIfDoesNotExist || timestampChanged;
let messageArr = messages;
let newMessages = [...messages];

@@ -328,54 +387,48 @@ // if created_at has changed, message should be filtered and re-inserted in correct order

if (timestampChanged) {
messageArr = messageArr.filter((msg) => !(msg.id && message.id === msg.id));
newMessages = newMessages.filter((message) => !(message.id && newMessage.id === message.id));
}
// Get array length after filtering
const messageArrayLength = messageArr.length;
// for empty list just concat and return unless it's an update or deletion
if (messageArrayLength === 0 && addMessageToList) {
return messageArr.concat(message);
} else if (messageArrayLength === 0) {
return [...messageArr];
if (!newMessages.length && addMessageToList) {
return newMessages.concat(newMessage);
}
const messageTime = (message[sortBy] as Date).getTime();
const messageIsNewest = (messageArr[messageArrayLength - 1][sortBy] as Date).getTime() < messageTime;
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const messageTime = newMessage[sortBy]!.getTime();
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const messageIsNewest = newMessages.at(-1)![sortBy]!.getTime() < messageTime;
// if message is newer than last item in the list concat and return unless it's an update or deletion
if (messageIsNewest && addMessageToList) {
return messageArr.concat(message);
} else if (messageIsNewest) {
return [...messageArr];
return newMessages.concat(newMessage);
}
// find the closest index to push the new message
let left = 0;
let middle = 0;
let right = messageArrayLength - 1;
while (left <= right) {
middle = Math.floor((right + left) / 2);
if ((messageArr[middle][sortBy] as Date).getTime() <= messageTime) left = middle + 1;
else right = middle - 1;
}
const insertionIndex = findIndexInSortedArray({
needle: newMessage,
sortedArray: messages,
sortDirection: 'ascending',
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
selectValueToCompare: (m) => m[sortBy]!.getTime(),
});
// message already exists and not filtered due to timestampChanged, update and return
if (!timestampChanged && message.id) {
if (messageArr[left] && message.id === messageArr[left].id) {
messageArr[left] = message;
return [...messageArr];
// message already exists and not filtered with timestampChanged, update and return
if (!timestampChanged && newMessage.id) {
if (newMessages[insertionIndex] && newMessage.id === newMessages[insertionIndex].id) {
newMessages[insertionIndex] = newMessage;
return newMessages;
}
if (messageArr[left - 1] && message.id === messageArr[left - 1].id) {
messageArr[left - 1] = message;
return [...messageArr];
if (newMessages[insertionIndex - 1] && newMessage.id === newMessages[insertionIndex - 1].id) {
newMessages[insertionIndex - 1] = newMessage;
return newMessages;
}
}
// Do not add updated or deleted messages to the list if they do not already exist
// or have a timestamp change.
// do not add updated or deleted messages to the list if they already exist or come with a timestamp change
if (addMessageToList) {
messageArr.splice(left, 0, message);
newMessages.splice(insertionIndex, 0, newMessage);
}
return [...messageArr];
return newMessages;
}

@@ -408,2 +461,35 @@

// works exactly the same as lodash.throttle
export const throttle = <T extends (...args: unknown[]) => unknown>(
fn: T,
timeout = 200,
{ leading = true, trailing = false }: { leading?: boolean; trailing?: boolean } = {},
) => {
let runningTimeout: null | NodeJS.Timeout = null;
let storedArgs: Parameters<T> | null = null;
return (...args: Parameters<T>) => {
if (runningTimeout) {
if (trailing) storedArgs = args;
return;
}
if (leading) fn(...args);
const timeoutHandler = () => {
if (storedArgs) {
fn(...storedArgs);
storedArgs = null;
runningTimeout = setTimeout(timeoutHandler, timeout);
return;
}
runningTimeout = null;
};
runningTimeout = setTimeout(timeoutHandler, timeout);
};
};
type MessagePaginationUpdatedParams<StreamChatGenerics extends ExtendableGenerics = DefaultGenerics> = {

@@ -410,0 +496,0 @@ parentSet: MessageSet;

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc