
Research
Security News
Malicious PyPI Package Exploits Deezer API for Coordinated Music Piracy
Socket researchers uncovered a malicious PyPI package exploiting Deezer’s API to enable coordinated music piracy through API abuse and C2 server control.
genkitx-graph
Advanced tools
Firebase Genkit AI framework plugin for Graph based workflows like LangGraph.
genkitx-graph
is a community plugin for creating graph-based workflows with
Firebase Genkit. Built by The Fire Company. 🔥
Install the plugin in your project with your favorite package manager:
npm install genkitx-graph
yarn add genkitx-graph
pnpm add genkitx-graph
genkitx-graph is a TypeScript plugin for Firebase Genkit that enables developers to easily build graph-based workflows for AI agents. This plugin provides a powerful and flexible way to create complex, multi-step processes with branching logic and state management.
import { defineGraph } from 'genkitx-graph';
import { defineFlow, streamFlow, runFlow } from '@genkit-ai/flow';
import { z } from 'zod';
Use the defineGraph
function to create a new graph:
const graph = defineGraph(
{
name: 'MyGraph',
stateSchema: z.object({
// Define your state schema here
}),
inputSchema: z.object({
// Define your input schema here
}),
outputSchema: z.object({
// Define your output schema here
}),
streamSchema: z.object({
// Define your stream schema here (optional)
}),
},
async (input) => {
// Define your entrypoint logic here
return {
state: {
/* initial state */
},
nextNode: 'firstNode',
};
}
);
defineGraph
has a simillar signature to defineFlow
(because it builds an executor flow under the hood) with 2 important changes:
stateSchema
: stateSchema
defines the Schema for the state object which will be passed to every node.
entrypoint
: entrypoint
, as the name suggests, is the entrypoint of your graph. The endpoint must take in the input
and provide the initial state along with the name of the nextNode.
Use the addNode
function to add nodes to your graph:
graph.addNode(
defineFlow(
{
name: 'firstNode',
// Define input/output schemas if needed
},
async (state) => {
// Node logic here
return {
state: {
/* updated state */
},
nextNode: 'secondNode',
};
}
)
);
graph.addNode(
defineFlow(
{
name: 'secondNode',
},
async (state) => {
// Node logic here
return {
/* final output */
};
}
)
);
Node
s are the core construct of genkitx-graph
.
Each Node
is a Flow
with a specific signature. Each Node
must have the inputSchema
of stateSchema
defined in the graph and must return either an object with two properties: state
which is the modified state and nextNode
which is the name of the next node , or an object with the same schema as the Graph
's outputSchema
. If you are using typecript you don't need to add them to each Node
seperately.
Each Node
takes the current state of the workflow as an input, uses it, modifies it and returns either the updated state with next node or a final output.
This approach rather than the traditional approach of defined nodes and edges provides a high degree of flexibility to build complex Agentic workflows. We can use LLMs and traditional logic to decide which node should be next and return an output from any node.
To execute the graph, use the runFlow
function with your flow and input:
const result = await runFlow(flow, {
/* input data */
});
If any node returns an object conforming to the Graph
's outputSchema
then that value is returned as the Graph
's output and the execution finishes
We can execute any arbitrary function before the graph exution finishes using the beforeFinish
callback in defineGraph
:
const graph = defineGraph(
{
name: 'MyGraph',
stateSchema: z.object({
// Define your state schema here
}),
inputSchema: z.object({
// Define your input schema here
}),
outputSchema: z.object({
// Define your output schema here
}),
streamSchema: z.object({
// Define your stream schema here (optional)
}),
},
async (input) => {
// Define your entrypoint logic here
return {
state: {
/* initial state */
},
nextNode: 'firstNode',
};
}
async (state, output) => {
// Do anything with graph state and output
}
);
The most common usage of beforeFinish
is storing the graph state and output in a database.
Together with entrypoint
callback this enables graphs to have memory like a chat history.
// ...configure Genkit (as shown above)...
const graph = defineGraph(
{
name: 'MultiStepGraph',
inputSchema: z.object({ text: z.string(), iterations: z.number() }),
outputSchema: z.string(),
},
async (input) => {
return {
state: { text: input.text, iterations: input.iterations, count: 0 },
nextNode: 'processText',
};
}
);
graph.addNode(
defineFlow(
{
name: 'processText',
},
async (state) => {
state.text = state.text.toUpperCase();
state.count++;
return {
state,
nextNode:
state.count < state.iterations ? 'processText' : 'finalizeOutput',
};
}
)
);
graph.addNode(
defineFlow(
{
name: 'finalizeOutput',
},
async (state) => {
return `Processed ${state.count} times: ${state.text}`;
}
)
);
// Run the graph
const result = await runFlow(graph.executor, {
text: 'hello world',
iterations: 3,
});
console.log(result); // Outputs: "Processed 3 times: HELLO WORLD"
You can use the streamingCallback
function to handle streaming data from nodes:
const graph = defineGraph(
{
name: 'StreamingGraph',
inputSchema: z.string(),
streamingSchema: z.string(),
},
async (input) => {
return {
state: input,
nextNode: 'streamingNode',
};
}
);
graph.addNode(
defineFlow(
{
name: 'streamingNode',
},
async (state, streamingCallback) => {
// ...
const result = await generate({
model: //any model
prompt: `tell me a joke about ${input}`,
streamingCallback
});
// ...
}
)
);
Use streaming for long-running processes or when you need to provide real-time updates.
[!NOTE] Steaming does not stop the execution of the graph
Want to contribute to the project? That's awesome! Head over to our Contribution Guidelines.
[!NOTE] This repository depends on Google's Firebase Genkit. For issues and questions related to Genkit, please refer to instructions available in Genkit's repository.
Reach out by opening a discussion on Github Discussions.
This plugin is proudly maintained by the team at The Fire Company. 🔥
This project is licensed under the Apache 2.0 License.
FAQs
Firebase Genkit AI framework plugin for Graph based workflows like LangGraph.
We found that genkitx-graph demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 0 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Security News
Socket researchers uncovered a malicious PyPI package exploiting Deezer’s API to enable coordinated music piracy through API abuse and C2 server control.
Research
The Socket Research Team discovered a malicious npm package, '@ton-wallet/create', stealing cryptocurrency wallet keys from developers and users in the TON ecosystem.
Security News
Newly introduced telemetry in devenv 1.4 sparked a backlash over privacy concerns, leading to the removal of its AI-powered feature after strong community pushback.